Java SpringBoot Canal
canal
译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
从这句话理解到了什么?
基于MySQL,并且通过MySQL日志进行的增量解析,这也就意味着对原有的业务代码完全是无侵入性的。
工作原理:解析MySQL的binlog日志,提供增量数据。
基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
官方文档:https://github.com/alibaba/canal

Canal数据如何传输?

先来一张官方图:
Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步 - 图1
Canal分为服务端和客户端,这也是阿里常用的套路,比如前面讲到的注册中心Nacos

  • 服务端:负责解析MySQL的binlog日志,传递增量数据给客户端或者消息中间件
  • 客户端:负责解析服务端传过来的数据,然后定制自己的业务处理。

目前为止支持的消息中间件很全面了,比如KafkaRocketMQRabbitMQ

数据同步还有其他中间件吗?

有,当然有,还有一些开源的中间件也是相当不错的,比如Bifrost
常见的几款中间件的区别如下:

Canal Debezium DataX Databus Flinkx Bifrost
实时同步 支持 支持 不支持 支持 支持 支持
增量同步 支持 支持 不支持 支持 支持 支持
写业务逻辑 自己写保存变更数据的代码 自己写保存变更数据的代码 不用写 自己写保存变更数据的代码 自己写保存变更数据的代码 不用写
支持MySQL 支持 支持 支持 支持 支持 支持
活跃度 不高 一般 可以

Canal服务端安装

服务端需要下载压缩包,下载地址:https://github.com/alibaba/canal/releases
目前最新的是v1.1.5,点击下载:
image.png
下载完成解压,目录如下:
image.png
本文使用Canal+RabbitMQ进行数据的同步,因此下面步骤完全按照这个base进行。

1、打开MySQL的binlog日志

修改MySQL的日志文件,my.cnf 配置如下:

  1. [mysqld]
  2. log-bin=mysql-bin # 开启 binlog
  3. binlog-format=ROW # 选择 ROW 模式
  4. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2、设置MySQL的配置

需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。
一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,这里用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。
修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

  1. # url
  2. canal.instance.master.address=127.0.0.1:3306
  3. # username/password
  4. canal.instance.dbUsername=root
  5. canal.instance.dbPassword=root
  6. # 监听的数据库
  7. canal.instance.defaultDatabaseName=test
  8. # 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
  9. canal.instance.filter.regex=.*\\..*

3、设置RabbitMQ的配置

服务端默认的传输方式是tcp,需要在配置文件中设置MQ的相关信息。
这里需要修改两处配置文件,如下;

1、canal.deployer-1.1.5\conf\canal.properties

这个配置文件主要是设置MQ相关的配置,比如URL,用户名、密码…

  1. # 传输方式:tcp, kafka, rocketMQ, rabbitMQ
  2. canal.serverMode = rabbitMQ
  3. ##################################################
  4. ######### RabbitMQ #############
  5. ##################################################
  6. rabbitmq.host = 127.0.0.1
  7. rabbitmq.virtual.host =/
  8. # exchange
  9. rabbitmq.exchange =canal.exchange
  10. # 用户名、密码
  11. rabbitmq.username =guest
  12. rabbitmq.password =guest
  13. ## 是否持久化
  14. rabbitmq.deliveryMode = 2

2、canal.deployer-1.1.5\conf\example\instance.properties

这个文件设置MQ的路由KEY,这样才能路由到指定的队列中,如下:

  1. canal.mq.topic=canal.routing.key

4、RabbitMQ新建exchange和Queue

在RabbitMQ中需要新建一个canal.exchange(必须和配置中的相同)的exchange和一个名称为 canal.queue(名称随意)的队列。
其中绑定的路由KEY为:canal.routing.key(必须和配置中的相同),如下图:
Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步 - 图4

5、启动服务端

点击bin目录下的脚本,windows直接双击startup.bat,启动成功如下:
image.png

6、测试

在本地数据库test中的oauth_client_details插入一条数据,如下:

  1. INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');

此时查看MQ中的canal.queue已经有了数据,如下:
Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步 - 图6
其实就是一串JSON数据,这个JSON如下:

  1. {
  2. "data": [{
  3. "client_id": "myjszl",
  4. "resource_ids": "res1",
  5. "client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
  6. "scope": "all",
  7. "authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit",
  8. "web_server_redirect_uri": "http://www.baidu.com",
  9. "authorities": null,
  10. "access_token_validity": "1000",
  11. "refresh_token_validity": "1000",
  12. "additional_information": null,
  13. "autoapprove": "false"
  14. }],
  15. "database": "test",
  16. "es": 1640337532000,
  17. "id": 7,
  18. "isDdl": false,
  19. "mysqlType": {
  20. "client_id": "varchar(48)",
  21. "resource_ids": "varchar(256)",
  22. "client_secret": "varchar(256)",
  23. "scope": "varchar(256)",
  24. "authorized_grant_types": "varchar(256)",
  25. "web_server_redirect_uri": "varchar(256)",
  26. "authorities": "varchar(256)",
  27. "access_token_validity": "int(11)",
  28. "refresh_token_validity": "int(11)",
  29. "additional_information": "varchar(4096)",
  30. "autoapprove": "varchar(256)"
  31. },
  32. "old": null,
  33. "pkNames": ["client_id"],
  34. "sql": "",
  35. "sqlType": {
  36. "client_id": 12,
  37. "resource_ids": 12,
  38. "client_secret": 12,
  39. "scope": 12,
  40. "authorized_grant_types": 12,
  41. "web_server_redirect_uri": 12,
  42. "authorities": 12,
  43. "access_token_validity": 4,
  44. "refresh_token_validity": 4,
  45. "additional_information": 12,
  46. "autoapprove": 12
  47. },
  48. "table": "oauth_client_details",
  49. "ts": 1640337532520,
  50. "type": "INSERT"
  51. }

每个字段的意思已经很清楚了,有表名称、方法、参数、参数类型、参数值…..
客户端要做的就是监听MQ获取JSON数据,然后将其解析出来,处理自己的业务逻辑。

Canal客户端搭建

客户端很简单实现,要做的就是消费Canal服务端传递过来的消息,监听canal.queue这个队列。

1、创建消息实体类

MQ传递过来的是JSON数据,当然要创建个实体类接收数据,如下:

  1. /**
  2. * Canal消息接收实体类
  3. */
  4. @NoArgsConstructor
  5. @Data
  6. public class CanalMessage<T> {
  7. @JsonProperty("type")
  8. private String type;
  9. @JsonProperty("table")
  10. private String table;
  11. @JsonProperty("data")
  12. private List<T> data;
  13. @JsonProperty("database")
  14. private String database;
  15. @JsonProperty("es")
  16. private Long es;
  17. @JsonProperty("id")
  18. private Integer id;
  19. @JsonProperty("isDdl")
  20. private Boolean isDdl;
  21. @JsonProperty("old")
  22. private List<T> old;
  23. @JsonProperty("pkNames")
  24. private List<String> pkNames;
  25. @JsonProperty("sql")
  26. private String sql;
  27. @JsonProperty("ts")
  28. private Long ts;
  29. }

2、MQ消息监听业务

接下来就是监听队列,一旦有Canal服务端有数据推送能够及时的消费。
代码很简单,只是给出个接收的案例,具体的业务逻辑可以根据业务实现,如下:

  1. import cn.hutool.json.JSONUtil;
  2. import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
  3. import lombok.RequiredArgsConstructor;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.annotation.Exchange;
  6. import org.springframework.amqp.rabbit.annotation.Queue;
  7. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.stereotype.Component;
  10. /**
  11. * 监听MQ获取Canal增量的数据消息
  12. */
  13. @Component
  14. @Slf4j
  15. @RequiredArgsConstructor
  16. public class CanalRabbitMQListener {
  17. @RabbitListener(bindings = {
  18. @QueueBinding(
  19. value = @Queue(value = "canal.queue", durable = "true"),
  20. exchange = @Exchange(value = "canal.exchange"),
  21. key = "canal.routing.key"
  22. )
  23. })
  24. public void handleDataChange(String message) {
  25. //将message转换为CanalMessage
  26. CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
  27. String tableName = canalMessage.getTable();
  28. log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
  29. //TODO 业务逻辑自己完善...............
  30. }
  31. }

3、测试

下面向表中插入数据,看下接收的消息是什么样的,SQL如下:

  1. INSERT INTO `oauth_client_details`
  2. VALUES
  3. ( 'myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false' );

客户端转换后的消息如下图:
Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步 - 图7
上图可以看出所有的数据都已经成功接收到,只需要根据数据完善自己的业务逻辑即可。