一、Canal技术介绍:

1.1 calnal发展介绍:

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

1.2 calnal工作原理:

1.2.1 MySQL主备复制原理

第十二章 Canal数据同步技术 - 图2

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

    1.2.2 canal 工作原理

    第十二章 Canal数据同步技术 - 图3

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

    1.2.3 canal架构说明:

    第十二章 Canal数据同步技术 - 图4
    说明:

  • server代表一个canal运行实例,对应于一个jvm

  • instance对应于一个数据队列 (1个server对应1..n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

详情参见

二、Linux环境安装Canal:

2.1 环境修改

2.1.1 进入容器,修改my.cnf内容如下(mysql支持binlog):

  1. [mysqld]
  2. log-bin=mysql-bin # 开启 binlog
  3. binlog-format=ROW # 选择 ROW 模式
  4. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal slaveId 重复

提示: 如果mysql安装在docker下,可以执行如下命令,修改上面的配置文件: ① docker exec -it mysql /bin/bash ② vi /etc/mysql/my.cnf ③ 创建mysql容器时,可以使用如下命令: 【备注1:提前创建好相关的目录】

  1. mkdir -p /mydata/mysql3/data /mydata/mysql3/conf /mydata/mysql3/logs
  2. docker run -p 3308:3306 --restart=always -v /mydata/mysql3/conf:/etc/mysql/conf.d -v /mydata/mysql3/logs:/var/log/mysql -v /mydata/mysql3/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123 --name mysql -d mysql:5.7

备注2:在/mydata/mysql3/conf中创建my.cnf文件内容如下:】

  1. [mysql]
  2. default-character-set=utf8
  3. [mysqld]
  4. character-set-server=utf8
  5. log-bin=mysql-bin
  6. binlog-format=ROW
  7. server_id=1

不要忘记重启mysql:

  1. docker restart mysql3

备注3:进行mysql内部容器,查看是否binlog日志添加成功】

  1. docker exec -it mysql3 /bin/bash
  2. mysql -uroot -p123
  3. show variables like '%bin%'

查看的最终效果如图:

image.png

备注4:进行mysql内部容器,查看是否binlog日志添加成功】

  1. 因为starter-canal中使用的client-canal的版本为1.0.25,所以,最好下载canal的版本为1.0.25以下的版酊,
  2. 否则,会失败

2.1.2 创建用户并授权:

  1. CREATE USER canal IDENTIFIED BY 'canal';
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  3. -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
  4. FLUSH PRIVILEGES;

说明: 如果不创建用户,也可以使用root用户,密码就是你的mysql密码

2.1.3 下载canal安装包:

  1. wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-{版本号}.tar.gz

说明: 下载地址

2.1.4 解压缩并修改配置文件:

修改canal/conf/canal.properties文件:
image.png
修改canal/conf/example/instance.properties文件:
image.png
image.png
image.png

mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)
常见例子:

  1. 所有表:. or .\..
    2. canal schema下所有表: canal\..

    3. canal下的以canal打头的表:canal\.canal.*
    4. canal schema下的一张表:canal\.test1
  2. 多个规则组合使用:canal\..,mysql.test1,mysql.test2 (逗号分隔) [*详细配置参见](https://github.com/alibaba/canal/wiki/AdminGuide)

2.2 启动运行:

2.2.1 执行canal/bin/startup.sh命令:

  1. cd /usr/local/canal
  2. bin/startup.sh

2.2.2 查看日志

canal/logs/canal/canal.log
image.png
canal/logs/example/example.log
image.png

备注:下载的canal版本不能高于1.0.19版本

三、【实战】在项目中将广告数据库同步到redis中

3.1 从github下载第三方的canal的spring-boot-starter:

项目地址
image.png

3.2 新建模块canal-demo

3.2.1 在模块中添加依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <!--1.引入canal的依赖-->
  7. <dependency>
  8. <groupId>com.xpand</groupId>
  9. <artifactId>starter-canal</artifactId>
  10. <version>0.0.1-SNAPSHOT</version>
  11. </dependency>
  12. </dependencies>

3.2.2 添加application.yml配置文件,

  1. server:
  2. port: 7701
  3. canal:
  4. client:
  5. instances:
  6. example:
  7. host: 192.168.56.10
  8. port: 11111
  9. spring:
  10. application:
  11. name: canal

3.2.3 添加监听器:

  1. /**
  2. * ------------------------------
  3. * 功能:
  4. * 作者:WF
  5. * 微信:hbxfwf13590332912
  6. * 创建时间:2021/8/9-20:46
  7. * ------------------------------
  8. */
  9. @CanalEventListener
  10. public class CanalEventDataListener {
  11. /**
  12. * 功能: 监听插入事件
  13. * 参数:
  14. * 返回值: void
  15. * 时间: 2021/8/10 13:07
  16. */
  17. @InsertListenPoint
  18. public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
  19. System.out.println("onEventInsert");
  20. System.out.println("------------------------------------------------------------------------------------");
  21. rowData.getAfterColumnsList().stream().forEach(c-> System.out.println(c.getName() + "->" + c.getValue()));
  22. }
  23. /**
  24. * 功能: 监听修改事件
  25. * 参数:
  26. * 返回值: void
  27. * 时间: 2021/8/10 13:07
  28. */
  29. @UpdateListenPoint
  30. public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
  31. System.out.println("onEventUpdate");
  32. System.out.println("------------------------------------------------------------------------------------");
  33. rowData.getAfterColumnsList().stream().forEach(c-> System.out.println(c.getName() + "->" + c.getValue()));
  34. }
  35. /**
  36. * 功能: 监听删除事件
  37. * 参数:
  38. * 返回值: void
  39. * 时间: 2021/8/10 13:07
  40. */
  41. @DeleteListenPoint
  42. public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
  43. System.out.println("onEventDelete");
  44. System.out.println("------------------------------------------------------------------------------------");
  45. rowData.getBeforeColumnsList().stream().forEach(c-> System.out.println( c.getName() + "->" + c.getValue()));
  46. }
  47. /**
  48. * 功能: 自定义监听
  49. * 参数:
  50. * 返回值: void
  51. * 时间: 2021/8/10 13:06
  52. */
  53. @ListenPoint(destination = "example",schema = "zeyigoudb",table = {"tb_item"},
  54. eventType = {
  55. CanalEntry.EventType.UPDATE,
  56. CanalEntry.EventType.INSERT,
  57. CanalEntry.EventType.DELETE}
  58. )
  59. public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
  60. System.out.println("onEventCustomUpdate");
  61. System.out.println("------------------------------------------------------------------------------------");
  62. rowData.getAfterColumnsList().forEach((c) -> System.out.println( c.getName() + "->" + c.getValue()));
  63. }
  64. }

3.2.4 添加启动类:

  1. /**
  2. * ------------------------------
  3. * 功能:
  4. * 作者:WF
  5. * 微信:hbxfwf13590332912
  6. * 创建时间:2021/8/9-20:55
  7. * ------------------------------
  8. */
  9. @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
  10. @EnableCanalClient //启用canal
  11. public class CancalDataApplication {
  12. public static void main(String[] args) {
  13. SpringApplication.run(CancalDataApplication.class);
  14. }
  15. }

3.3 实现当后台修改、添加、删除广告时,会自动同步到redis中

3.3.1 将原来content-service中的关于增、删、改广告时,删除redis的代码注掉:

  1. /**
  2. * 功能: 添加广告
  3. * 参数:
  4. * 返回值: void
  5. * 时间: 2021/8/2 15:00
  6. */
  7. @Override
  8. public void add(ContentEntity content) {
  9. //1. 删除redis中的缓存
  10. //redisTemplate.delete("contents");
  11. //2. 添加到数据库中
  12. this.save(content);
  13. }
  14. @Override
  15. public void update(ContentEntity content) {
  16. //1. 删除redis中的缓存
  17. //redisTemplate.delete("contents");
  18. //2. 调用 修改方法
  19. this.updateById(content);
  20. }
  21. @Override
  22. public void delete(List<Long> ids) {
  23. //1. 删除redis中的缓存
  24. //redisTemplate.delete("contents");
  25. //2. 根据id删除广告
  26. this.removeByIds(ids);
  27. }

3.3.2 在canal-demo模块中,添加自定义的事件处理:

第一步:修改依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <!--1.引入canal的依赖-->
  7. <dependency>
  8. <groupId>com.xpand</groupId>
  9. <artifactId>starter-canal</artifactId>
  10. <version>0.0.1-SNAPSHOT</version>
  11. </dependency>
  12. <!--2.引入redis-->
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-data-redis</artifactId>
  16. </dependency>
  17. </dependencies>

第二步:修改application.yml文件:

  1. spring:
  2. redis:
  3. host: 192.168.56.10
  4. port: 6379

第三步:添加redis的序列化配置:

  1. @Configuration
  2. public class RedisTemplateSerializerConfig {
  3. @Bean
  4. public RedisTemplate redisTemplate(RedisConnectionFactory factory){
  5. RedisTemplate redisTemplate = new RedisTemplate();
  6. redisTemplate.setConnectionFactory(factory);
  7. //1. 设置key的序列化器
  8. redisTemplate.setKeySerializer(new StringRedisSerializer());
  9. //2. 设置value的序列化器
  10. // redisTemplate.setValueSerializer(new GenericFastJsonRedisSerializer());
  11. redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
  12. return redisTemplate;
  13. }
  14. }

第四步:定义事件:

  1. /**
  2. * 功能: 自定义监听(测试)
  3. * 参数:
  4. * 返回值: void
  5. * 时间: 2021/8/10 13:06
  6. */
  7. @ListenPoint(destination = "example",schema = "zeyigoudb",table = {"tb_content"},
  8. eventType = {
  9. CanalEntry.EventType.UPDATE,
  10. CanalEntry.EventType.INSERT,
  11. CanalEntry.EventType.DELETE}
  12. )
  13. public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
  14. //1. 根据事件得到不同的数据
  15. //1.1 得到添加或修改事件时的数据列表
  16. //List<CanalEntry.Column> columnsListAddOrUpdate = rowData.getAfterColumnsList();
  17. //1.2 得到删除时的数据列表
  18. //List<CanalEntry.Column> columnsListDelete = rowData.getBeforeColumnsList();
  19. System.out.println("正在同步广告列表...");
  20. //直接删除redis中的数据
  21. redisTemplate.delete("contents");
  22. }

第五步:运行效果(略)