image.png

Canal服务端安装

服务端需要下载压缩包,下载地址:https://github.com/alibaba/canal/releases
目前最新的是v1.1.5 ,点击下载:
ES同步 - 图2
下载完成解压,目录如下:
ES同步 - 图3
本文使用Canal+RabbitMQ 进行数据的同步,因此下面步骤完全按照这个base进行。

1、打开MySQL的binlog日志

修改MySQL的日志文件,my.cnf 配置如下:

  1. [mysqld]
  2. log-bin=mysql-bin # 开启 binlog
  3. binlog-format=ROW # 选择 ROW 模式
  4. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal slaveId 重复

2、设置MySQL的配置

需要设置服务端配置文件中的MySQL配置,这样Canal才能知道需要监听哪个库、哪个表的日志文件。
一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。
修改canal.deployer-1.1.5\conf\example\instance.properties配置文件

  1. # url
  2. canal.instance.master.address=127.0.0.1:3306
  3. # username/password
  4. canal.instance.dbUsername=root
  5. canal.instance.dbPassword=root
  6. # 监听的数据库
  7. canal.instance.defaultDatabaseName=test
  8. # 监听的表,可以指定,多个用逗号分割,这里正则是监听所有
  9. canal.instance.filter.regex=.*\\..*

3、设置RabbitMQ的配置

服务端默认的传输方式是tcp ,需要在配置文件中设置MQ 的相关信息。
这里需要修改两处配置文件,如下;
1、canal.deployer-1.1.5\conf\canal.properties
这个配置文件主要是设置MQ相关的配置,比如URL,用户名、密码…

  1. # 传输方式:tcp, kafka, rocketMQ, rabbitMQ
  2. canal.serverMode = rabbitMQ
  3. ##################################################
  4. ######## RabbitMQ #############
  5. ##################################################
  6. rabbitmq.host = 127.0.0.1
  7. rabbitmq.virtual.host =/
  8. # exchange
  9. rabbitmq.exchange =canal.exchange
  10. # 用户名、密码
  11. rabbitmq.username =guest
  12. rabbitmq.password =guest
  13. # 是否持久化
  14. rabbitmq.deliveryMode = 2

2、canal.deployer-1.1.5\conf\example\instance.properties
这个文件设置MQ的路由KEY,这样才能路由到指定的队列中,如下:

  1. canal.mq.topic=canal.routing.key

4、RabbitMQ新建exchange和Queue

在RabbitMQ中需要新建一个canal.exchange (必须和配置中的相同)的exchange和一个名称为 canal.queue (名称随意)的队列。
其中绑定的路由KEY为:canal.routing.key (必须和配置中的相同),如下图:
ES同步 - 图4

5、启动服务端

点击bin目录下的脚本,windows直接双击startup.bat ,启动成功如下:
ES同步 - 图5

6、测试

在本地数据库test 中的oauth_client_details 插入一条数据,如下:

  1. INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');

此时查看MQ中的canal.queue 已经有了数据,如下:
ES同步 - 图6
其实就是一串JSON数据,这个JSON如下:

  1. {
  2. "data": [{
  3. "client_id": "myjszl",
  4. "resource_ids": "res1",
  5. "client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
  6. "scope": "all",
  7. "authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit",
  8. "web_server_redirect_uri": "http://www.baidu.com",
  9. "authorities": null,
  10. "access_token_validity": "1000",
  11. "refresh_token_validity": "1000",
  12. "additional_information": null,
  13. "autoapprove": "false"
  14. }],
  15. "database": "test",
  16. "es": 1640337532000,
  17. "id": 7,
  18. "isDdl": false,
  19. "mysqlType": {
  20. "client_id": "varchar(48)",
  21. "resource_ids": "varchar(256)",
  22. "client_secret": "varchar(256)",
  23. "scope": "varchar(256)",
  24. "authorized_grant_types": "varchar(256)",
  25. "web_server_redirect_uri": "varchar(256)",
  26. "authorities": "varchar(256)",
  27. "access_token_validity": "int(11)",
  28. "refresh_token_validity": "int(11)",
  29. "additional_information": "varchar(4096)",
  30. "autoapprove": "varchar(256)"
  31. },
  32. "old": null,
  33. "pkNames": ["client_id"],
  34. "sql": "",
  35. "sqlType": {
  36. "client_id": 12,
  37. "resource_ids": 12,
  38. "client_secret": 12,
  39. "scope": 12,
  40. "authorized_grant_types": 12,
  41. "web_server_redirect_uri": 12,
  42. "authorities": 12,
  43. "access_token_validity": 4,
  44. "refresh_token_validity": 4,
  45. "additional_information": 12,
  46. "autoapprove": 12
  47. },
  48. "table": "oauth_client_details",
  49. "ts": 1640337532520,
  50. "type": "INSERT"
  51. }

每个字段的意思已经很清楚了,有表名称、方法、参数、参数类型、参数值…..
客户端要做的就是监听MQ获取JSON数据,然后将其解析出来,处理自己的业务逻辑。

Canal客户端搭建

客户端很简单实现,要做的就是消费Canal服务端传递过来的消息,监听canal.queue 这个队列。

1、创建消息实体类

MQ传递过来的是JSON数据,当然要创建个实体类接收数据,如下:

  1. /**
  2. * Canal消息接收实体类
  3. */
  4. @NoArgsConstructor
  5. @Data
  6. public class CanalMessage<T> {
  7. @JsonProperty("type")
  8. private String type;
  9. @JsonProperty("table")
  10. private String table;
  11. @JsonProperty("data")
  12. private List<T> data;
  13. @JsonProperty("database")
  14. private String database;
  15. @JsonProperty("es")
  16. private Long es;
  17. @JsonProperty("id")
  18. private Integer id;
  19. @JsonProperty("isDdl")
  20. private Boolean isDdl;
  21. @JsonProperty("old")
  22. private List<T> old;
  23. @JsonProperty("pkNames")
  24. private List<String> pkNames;
  25. @JsonProperty("sql")
  26. private String sql;
  27. @JsonProperty("ts")
  28. private Long ts;
  29. }

2、MQ消息监听业务

接下来就是监听队列,一旦有Canal服务端有数据推送能够及时的消费。
代码很简单,只是给出个接收的案例,具体的业务逻辑可以根据业务实现,如下:

  1. import cn.hutool.json.JSONUtil;
  2. import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
  3. import lombok.RequiredArgsConstructor;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.annotation.Exchange;
  6. import org.springframework.amqp.rabbit.annotation.Queue;
  7. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.stereotype.Component;
  10. /**
  11. * 监听MQ获取Canal增量的数据消息
  12. */
  13. @Component
  14. @Slf4j
  15. @RequiredArgsConstructor
  16. public class CanalRabbitMQListener {
  17. @RabbitListener(bindings = {
  18. @QueueBinding(
  19. value = @Queue(value = "canal.queue", durable = "true"),
  20. exchange = @Exchange(value = "canal.exchange"),
  21. key = "canal.routing.key"
  22. )
  23. })
  24. public void handleDataChange(String message) {
  25. //将message转换为CanalMessage
  26. CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
  27. String tableName = canalMessage.getTable();
  28. log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
  29. //TODO 业务逻辑自己完善...............
  30. }
  31. }

3、测试

下面向表中插入数据,看下接收的消息是什么样的,SQL如下:

  1. INSERT INTO `oauth_client_details`
  2. VALUES
  3. ( 'myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false' );

客户端转换后的消息如下图:
ES同步 - 图7
上图可以看出所有的数据都已经成功接收到,只需要根据数据完善自己的业务逻辑即可。