本文由 简悦 SimpRead 转码, 原文地址 mp.weixin.qq.com

阿里开源MySQL中间件Canal快速入门 - 图1

阿里开源MySQL中间件Canal快速入门 - 图2

前言

距离上一篇文章发布又过去了两周,这次先填掉上一篇秒杀系统文章结尾处开的坑,介绍一下数据库中间件 Canal 的使用。

「Canal 用途很广,并且上手非常简单,小伙伴们在平时完成公司的需求时,很有可能会用到。」

举个例子:

公司目前有多个开发人员正在开发一套服务,为了缩短调用延时,对部分接口数据加入了缓存。一旦这些数据在数据库中进行了更新操作,缓存就成了旧数据,必须及时删除。

删除缓存的代码「理所当然可以写在更新数据的业务代码里」,但有时候者写操作是在别的项目代码里,你可能无权修改,亦或者别人不愿你在他代码里写这种业务之外的代码。(毕竟多人协作中间会产生各种配合问题)。又或者就是单纯的删除缓存的操作失败了,缓存依然是旧数据。

正如上篇文章缓存与数据库双写一致性实战里面所说,我们可以将缓存更新操作完全独立出来,形成一套单独的系统。「Canal 正是这么一个很好的帮手。」 能帮我们实现像下图这样的系统:

阿里开源MySQL中间件Canal快速入门 - 图3

「本篇文章的要点如下:」

  • Canal 是什么
  • Canal 工作原理
  • 数据库的读写分离
  • 数据库主从同步
  • 数据库主从同步一致性问题
  • 异步复制
  • 全同步复制
  • 半同步复制
  • Canal 实战
  • 开启 MySQL Binlog
  • 配置 Canal 服务
  • 运行 Canal 服务
  • Java 客户端 Demo

❝ 欢迎关注我的个人公众号获取最全的原创文章:「后端技术漫谈」(二维码见文章底部) ❞

Canal 是什么

众所周知,阿里是国内比较早地大量使用 MySQL 的互联网企业(去 IOE 化:去掉 IBM 的小型机、Oracle 数据库、EMC 存储设备,代之以自己在开源软件基础上开发的系统),并且基于阿里巴巴 / 淘宝的业务,从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

Canal 应运而生,它通过伪装成数据库的从库,读取主库发来的 binlog,用来实现「数据库增量订阅和消费业务需求」

「Canal 用途:」

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护 (拆分异构索引、倒排索引等)
  • 「业务 cache 缓存刷新」
  • 带业务逻辑的增量数据处理

开源项目地址:

https://github.com/alibaba/canal

在这里就不再摘抄项目简介了,提炼几个值得注意的点:

  • canal 使用 client-server 模式,数据传输协议使用 protobuf 3.0(很多 RPC 框架也在使用例如 gRPC)
  • 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
  • canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ。

Canal 工作原理

Canal 实际是将自己伪装成数据库的从库,来读取 Binlog。我们先补习下关于「MySQL 数据库主从数据库」的基础知识,这样就能更快的理解 Canal。

数据库的读写分离

为了应对高并发场景,MySQL 支持把一台数据库主机分为单独的一台写主库(主要负责写操作),而把读的数据库压力分配给读的从库,而且读从库可以变为多台,这就是读写分离的典型场景。

阿里开源MySQL中间件Canal快速入门 - 图4

数据库主从同步

实现数据库的读写分离,是通过数据库主从同步,让从数据库监听主数据库 Binlog 实现的。大体流程如下图:

❝ MySQL master 将数据变更写入二进制日志 (binary log, 其中记录叫做二进制日志事件 binary log events,可以通过 show binlog events 进行查看) MySQL slave 将 master 的 binary log events 拷贝到它的中继日志 (relay log) MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据 ❞

阿里开源MySQL中间件Canal快速入门 - 图5

详细主从同步原理在这里就不展开细说了。

可以看到,这种架构下会有一个问题,「数据库主从同步会存在延迟,那么就会有短暂的时间,主从数据库的数据是不一致的。」

这种不一致大多数情况下非常短暂,很多时候我们可以忽略他。

但一旦要求数据一致,就会引申出如何解决这个问题的思考。

数据库主从同步一致性问题

我们通常使用 MySQL 主从复制来解决 MySQL 的单点故障问题,其通过逻辑复制的方式把主库的变更同步到从库,主备之间无法保证严格一致的模式,

于是,MySQL 的主从复制带来了主从 “数据一致性” 的问题。「MySQL 的复制分为:异步复制、半同步复制、全同步复制。」

异步复制

MySQL 默认的复制即是异步复制,主库在执行完客户端提交的事务后会立即将结果返给给客户端,并不关心从库是否已经接收并处理,这样就会有一个问题,「主如果 crash 掉了,此时主上已经提交的事务可能并没有传到从库上,如果此时,强行将从提升为主,可能导致新主上的数据不完整。」

❝ 主库将事务 Binlog 事件写入到 Binlog 文件中,此时主库只会通知一下 Dump 线程发送这些新的 Binlog,然后主库就会继续处理提交操作,而此时不会保证这些 Binlog 传到任何一个从库节点上。 ❞

全同步复制

指当主库执行完一个事务,所有的从库都执行了该事务才返回给客户端。「因为需要等待所有从库执行完该事务才能返回」,所以全同步复制的性能必然会收到严重的影响。

❝ 当主库提交事务之后,所有的从库节点必须收到、APPLY 并且提交这些事务,然后主库线程才能继续做后续操作。但缺点是,主库完成一个事务的时间会被拉长,性能降低。 ❞

半同步复制

是介于全同步复制与全异步复制之间的一种,「主库只需要等待至少一个从库节点收到」并且 Flush Binlog 到 Relay Log 文件即可,主库不需要等待所有从库给主库反馈。同时,「这里只是一个收到的反馈,而不是已经完全完成并且提交的反馈」,如此,节省了很多时间。

❝ 介于异步复制和全同步复制之间,主库在执行完客户端提交的事务后不是立刻返回给客户端,而是等待至少一个从库接收到并写到 relay log 中才返回给客户端。相对于异步复制,半同步复制提高了数据的安全性,「同时它也造成了一定程度的延迟,这个延迟最少是一个 TCP/IP 往返的时间。所以,半同步复制最好在低延时的网络中使用。」

阿里开源MySQL中间件Canal快速入门 - 图6

「事实上,半同步复制并不是严格意义上的半同步复制,MySQL 半同步复制架构中,主库在等待备库 ack 时候,如果超时会退化为异步后,也可能导致 “数据不一致”。」

❝ 当半同步复制发生超时时(由 rpl_semi_sync_master_timeout 参数控制,单位是毫秒,默认为 10000,即 10s),会暂时关闭半同步复制,转而使用异步复制。当 master dump 线程发送完一个事务的所有事件之后,如果在 rpl_semi_sync_master_timeout 内,收到了从库的响应,则主从又重新恢复为半同步复制。 ❞

关于半同步复制的详细原理分析可以看这篇引申文章,在此不展开:

https://www.cnblogs.com/ivictor/p/5735580.html

回到 Canal 的工作原理

回顾了数据库从库的数据同步原理,理解 Canal 十分简单,直接引用官网原文:

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal)
  • canal 解析 binary log 对象 (原始为 byte 流)

Canal 实战

开启 MySQL Binlog

这个步骤我在之前的文章教你使用 Binlog 日志恢复误删的 MySQL 数据已经提到过,这里完善了一下,再贴一下,方便大家。

首先进入数据库控制台,运行指令:

  1. mysql> show variables like'log_bin%';
  2. +---------------------------------+-------+
  3. | Variable_name | Value |
  4. +---------------------------------+-------+
  5. | log_bin | OFF |
  6. | log_bin_basename | |
  7. | log_bin_index | |
  8. | log_bin_trust_function_creators | OFF |
  9. | log_bin_use_v1_row_events | OFF |
  10. +---------------------------------+-------+
  11. 5 rows in set (0.00 sec)

可以看到我们的 binlog 是关闭的,都是 OFF。接下来我们需要修改 Mysql 配置文件,执行命令:

  1. sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf

在文件末尾添加:

  1. log-bin=/var/lib/mysql/mysql-bin
  2. binlog-format=ROW

保存文件,重启 mysql 服务:

  1. sudo service mysql restart

重启完成后,查看下 mysql 的状态:

  1. systemctl status mysql.service

这时,如果你的 mysql 版本在 5.7 或更高版本,就会报错:

  1. Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190791Z 0 [Warning] Changed limits: max_open_files: 1024 (requested 5000)
  2. Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190839Z 0 [Warning] Changed limits: table_open_cache: 431 (requested 2000)
  3. Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.359713Z 0 [Warning] TIMESTAMP with implicit DEFAULT value is deprecated. Please use --explicit_defaults_for_timestamp server option (se
  4. Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.361395Z 0 [Note] /usr/sbin/mysqld (mysqld 5.7.28-0ubuntu0.16.04.2-log) starting as process 5930 ...
  5. Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363017Z 0 [ERROR] You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server
  6. Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363747Z 0 [ERROR] Aborting
  7. Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363922Z 0 [Note] Binlog end
  8. Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.364108Z 0 [Note] /usr/sbin/mysqld: Shutdown complete
  9. Jan 06 15:49:58 VM-0-11-ubuntu systemd[1]: mysql.service: Main process exited, code=exited, status=1/FAILURE

「You have enabled the binary log, but you haven’t provided the mandatory server-id. Please refer to the proper server」

之前我们的配置,对于 5.7 以下版本应该是可以的。但对于高版本,我们需要指定 server-id。

我们给这个 MySQL 指定为 2(只要不与其他库 id 重复):

  1. server-id=2

创建数据库 Canal 使用账号

  1. mysql> select user, host from user;
  2. +------------------+-----------+
  3. | user | host |
  4. +------------------+-----------+
  5. | root | % |
  6. | debian-sys-maint | localhost |
  7. | mysql.session | localhost |
  8. | mysql.sys | localhost |
  9. | root | localhost |
  10. +------------------+-----------+
  11. 5 rows in set
  1. CREATE USER canal IDENTIFIED BY 'xxxx'; (填写密码)
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  3. FLUSH PRIVILEGES;
  4. show grants for 'canal'

配置 Canal 服务

去 Github 下载最近的 Canal 稳定版本包:

解压缩:

  1. mkdir /tmp/canal
  2. tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal

配置文件设置:

主要有两个文件配置,一个是conf/canal.properties一个是conf/example/instance.properties

为了快速运行 Demo,只修改conf/example/instance.properties里的数据库连接账号密码即可

  1. # username/password
  2. canal.instance.dbUsername=canal
  3. canal.instance.dbPassword=xxxxxxx
  4. canal.instance.connectionCharset = UTF-8

运行 Canal 服务

请先确保机器上有 JDK,接着运行 Canal 启动脚本:

  1. sh bin/startup.sh

下图即成功运行:

阿里开源MySQL中间件Canal快速入门 - 图7

Java 客户端代码

我在秒杀系统系列文章的代码仓库里(miaosha-job)编写了如下客户端代码

仓库源码地址:https://github.com/qqxx6661/miaosha

  1. package job;
  2. import com.alibaba.otter.canal.client.CanalConnector;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.protocol.CanalEntry.*;
  5. import com.alibaba.otter.canal.protocol.Message;
  6. import com.google.protobuf.InvalidProtocolBufferException;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import java.net.InetSocketAddress;
  10. import java.util.List;
  11. import java.util.concurrent.atomic.AtomicInteger;
  12. public class CanalClient {
  13. private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);
  14. public static void main(String[] args) {
  15. // 第一步:与canal进行连接
  16. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
  17. "example", "", "");
  18. connector.connect();
  19. // 第二步:开启订阅
  20. connector.subscribe();
  21. // 第三步:循环订阅
  22. while (true) {
  23. try {
  24. // 每次读取 1000 条
  25. Message message = connector.getWithoutAck(1000);
  26. long batchID = message.getId();
  27. int size = message.getEntries().size();
  28. if (batchID == -1 || size == 0) {
  29. LOGGER.info("当前暂时没有数据,休眠1秒");
  30. Thread.sleep(1000);
  31. } else {
  32. LOGGER.info("-------------------------- 有数据啦 -----------------------");
  33. printEntry(message.getEntries());
  34. }
  35. connector.ack(batchID);
  36. } catch (Exception e) {
  37. LOGGER.error("处理出错");
  38. } finally {
  39. try {
  40. Thread.sleep(1000);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. }
  46. }
  47. /**
  48. * 获取每条打印的记录
  49. */
  50. public static void printEntry(List<Entry> entrys) {
  51. for (Entry entry : entrys) {
  52. // 第一步:拆解entry 实体
  53. Header header = entry.getHeader();
  54. EntryType entryType = entry.getEntryType();
  55. // 第二步:如果当前是RowData,那就是我需要的数据
  56. if (entryType == EntryType.ROWDATA) {
  57. String tableName = header.getTableName();
  58. String schemaName = header.getSchemaName();
  59. RowChange rowChange = null;
  60. try {
  61. rowChange = RowChange.parseFrom(entry.getStoreValue());
  62. } catch (InvalidProtocolBufferException e) {
  63. e.printStackTrace();
  64. }
  65. EventType eventType = rowChange.getEventType();
  66. LOGGER.info(String.format("当前正在操作表 %s.%s, 执行操作= %s", schemaName, tableName, eventType));
  67. // 如果是‘查询’ 或者 是 ‘DDL’ 操作,那么sql直接打出来
  68. if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
  69. LOGGER.info("执行了查询语句:[{}]", rowChange.getSql());
  70. return;
  71. }
  72. // 第三步:追踪到 columns 级别
  73. rowChange.getRowDatasList().forEach((rowData) -> {
  74. // 获取更新之前的column情况
  75. List<Column> beforeColumns = rowData.getBeforeColumnsList();
  76. // 获取更新之后的 column 情况
  77. List<Column> afterColumns = rowData.getAfterColumnsList();
  78. // 当前执行的是 删除操作
  79. if (eventType == EventType.DELETE) {
  80. printColumn(beforeColumns);
  81. }
  82. // 当前执行的是 插入操作
  83. if (eventType == EventType.INSERT) {
  84. printColumn(afterColumns);
  85. }
  86. // 当前执行的是 更新操作
  87. if (eventType == EventType.UPDATE) {
  88. printColumn(afterColumns);
  89. // 进行删除缓存操作
  90. deleteCache(afterColumns, tableName, schemaName);
  91. }
  92. });
  93. }
  94. }
  95. }
  96. /**
  97. * 每个row上面的每一个column 的更改情况
  98. * @param columns
  99. */
  100. public static void printColumn(List<Column> columns) {
  101. columns.forEach((column) -> {
  102. String columnName = column.getName();
  103. String columnValue = column.getValue();
  104. String columnType = column.getMysqlType();
  105. // 判断 该字段是否更新
  106. boolean isUpdated = column.getUpdated();
  107. LOGGER.info(String.format("数据列:column, columnName, columnValue, columnType, isUpdated));
  108. });
  109. }
  110. /**
  111. * 秒杀下单接口删除库存缓存
  112. */
  113. public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
  114. if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
  115. AtomicInteger id = new AtomicInteger();
  116. columns.forEach((column) -> {
  117. String columnName = column.getName();
  118. String columnValue = column.getValue();
  119. if ("id".equals(columnName)) {
  120. id.set(Integer.parseInt(columnValue));
  121. }
  122. });
  123. // TODO: 删除缓存
  124. LOGGER.info("Canal删除stockid:[{}] 的库存缓存", id);
  125. }
  126. }
  127. }

代码中有详细的注释,就不做解释了。

我们跑起代码,紧接着我们在数据库中进行更改 UPDATE 操作,把法外狂徒张三改成张三 1,然后再改回张三,见下图。

阿里开源MySQL中间件Canal快速入门 - 图8

Canal 成功收到了两条更新操作:

阿里开源MySQL中间件Canal快速入门 - 图9

紧接着我们模拟一个删除 Cache 缓存的业务,在代码中有:

  1. /**
  2. * 秒杀下单接口删除库存缓存
  3. */
  4. public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
  5. if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
  6. AtomicInteger id = new AtomicInteger();
  7. columns.forEach((column) -> {
  8. String columnName = column.getName();
  9. String columnValue = column.getValue();
  10. if ("id".equals(columnName)) {
  11. id.set(Integer.parseInt(columnValue));
  12. }
  13. });
  14. // TODO: 删除缓存
  15. LOGGER.info("Canal删除stock表id:[{}] 的库存缓存", id);
  16. }
  17. }

「在上面的代码中,在收到 m4a_miaosha.stock 表的更新操作后,我们刷新库存缓存。效果如下:」

阿里开源MySQL中间件Canal快速入门 - 图10阿里开源MySQL中间件Canal快速入门 - 图11

简单的 Canal 使用就介绍到这里,剩下的发挥空间留给各位读者大大们。

总结

本文总结了 Canal 的基本原理和简单的使用。

「总结如下几点:」

  • Canal 实际是将自己伪装成数据库的从库,来读取主数据库发来的 Binlog。
  • Canal 用途很广,比如数据库实时备份、索引构建和实时维护 (拆分异构索引、倒排索引等)、业务 cache 缓存刷新。
  • Canal 可以推送至非常多数据源,并支持推送到消息队列,方便多语言使用。

「希望大家多多支持我的原创技术文章公众号:后端技术漫谈,我最全的原创文章都在这里首发。」

参考

往期推荐

[

SQL 调优 | SQL 书写规范及优化技巧(上)

](http://mp.weixin.qq.com/s?__biz=MzU1NTA0NTEwMg==&mid=2247484216&idx=1&sn=aa84d1e95fbc26c94ccd4ec5ebb7c618&chksm=fbdb1ab9ccac93af5e09cd6bbb01c8cecee6db845dcb80df9d16a69033012899d442d2b249ba&scene=21#wechat_redirect)[

如何删库跑路?教你使用 Binlog 日志恢复误删的 MySQL 数据

](http://mp.weixin.qq.com/s?__biz=MzU1NTA0NTEwMg==&mid=2247484126&idx=1&sn=cb55e74d5e6621f3763052c515393555&chksm=fbdb1b5fccac9249cd54bc9bdf8c86a14a14280d2bb3b56fd9ee904d803875b52afe49f33222&scene=21#wechat_redirect)[

面试前必须要知道的 Redis 面试题

](http://mp.weixin.qq.com/s?__biz=MzU1NTA0NTEwMg==&mid=2247483926&idx=1&sn=73c86708512a97c801034d2583ac03d4&chksm=fbdb1b97ccac928195b0e1b8801b0758fdd0c45ec5638f9769f95e7bfa1d421f847e149215c5&scene=21#wechat_redirect)[

秒杀系统实战(四)| 缓存与数据库双写问题的争议

](http://mp.weixin.qq.com/s?__biz=MzU1NTA0NTEwMg==&mid=2247484200&idx=1&sn=6b6c7251ee83fe8ef9201373aafcffdd&chksm=fbdb1aa9ccac93bfe26655f89056b0d25b3a536f6b11148878fe96ffdf1d8349d44659cad784&scene=21#wechat_redirect)[

【剑指 offer 题解】二维数组中的查找

](http://mp.weixin.qq.com/s?__biz=MzU1NTA0NTEwMg==&mid=2247484068&idx=1&sn=8f78e1812502aa40b81433e4ad3864e8&chksm=fbdb1b25ccac923314e58551d98fd1267e3688b2e4656271234972c435522319c6398680aaf6&scene=21#wechat_redirect)[

[WebSocket] 使用 WebSocket 实现实时多人答题对战游戏

](http://mp.weixin.qq.com/s?__biz=MzU1NTA0NTEwMg==&mid=2247484044&idx=1&sn=6a5d2d98e43334dac95782b224f9f2b5&chksm=fbdb1b0dccac921b49c4a62d181ebe3435b76f05d772023fcce277c5074ca51f102b286c3418&scene=21#wechat_redirect)[

Springboot 实战:发送邮件 / 重置密码业务

](http://mp.weixin.qq.com/s?__biz=MzU1NTA0NTEwMg==&mid=2247484053&idx=2&sn=9460510e3d7569b7bc0b495ada9e1923&chksm=fbdb1b14ccac92023afc5956bcd66e7a0c91597ace58e66f192723746b9c5b953024eb0d9087&scene=21#wechat_redirect)

[SpringBoot] 快速配置多数据源(整合 MyBatis)

关注我

我是一名后端开发工程师。主要关注后端开发,数据安全,爬虫,物联网,边缘计算等方向,欢迎交流。

各大平台都可以找到我

  • 「微信公众号:后端技术漫谈」
  • 「Github:@qqxx6661」
  • CSDN:@Rude3knife
  • 知乎:@后端技术漫谈
  • 简书:@蛮三刀把刀
  • 掘金:@蛮三刀把刀

原创文章主要内容

  • 后端开发相关技术文章
  • Java 面试复习手册
  • 设计模式 / 数据结构 / LeetCode 算法题解
  • 爬虫 / 边缘计算相关技术文章
  • 逸闻趣事 / 好书分享 / 个人生活

个人公众号:后端技术漫谈

阿里开源MySQL中间件Canal快速入门 - 图12个人公众号:后端技术漫谈

「如果文章对你有帮助,不妨收藏,转发,在看起来~」