MySQL
很多时候业务数据有变更需要及时加载到缓存、ES 或者发送到消息队列中通知下游服务。
一般遇到这种情况下,在实时性要求不高的场景有两种处理模式,一种是写任务定时推送数据同步到缓存中,另一个是下游服务定时自动拉取。这两种模式都依赖服务自己的定时周期时间,很多时候不好设定具体要多久执行一次,定时时间太短在数据没有变化的时候会有很多无效的操作,如果定时时间太长可能很多时候数据的延迟会比较大,某些时候影响也不好。
那有没有一种比较好的方式可以解决这个问题呢?答案当然是肯定的。今天就给大家介绍一下 Canal,基于 MySQL 的 bin log 日志来实时监听数据变化。

什么是 Canal

官方的解释是:canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
通过官方的解释看到,是针对 MySQL 数据库增量日志解析的,MySQL 的日志是通过 bin log 的形式存储的二进制文件,提供数据订阅和消费就是说提供对二进制文件数据的监听。当日志数据发生变化的时候就会被监听到,从而程序就可以实时获取到有变化的数据。拿到变化的数据后就可以更新进缓存,ES 或发送到消息队列中通知下游服务了。

原理

2021-07-24-18-52-30-382164.png
上面介绍了 Canal 的基本概念,现在看看 Canal是怎么实现的,都知道 MySQL 是支持主从同步的,而且 Slave 也是通过 bin log 日志的形式同步 master 实例数据的。所以 Canal 就巧妙的运用了这个原理,把自己模拟成一个 Slave,给 MySQL 的 master 发送 dump 协议,当 master 接受到 dump 协议的时候就以为 Canal 是一个 Slave 就会推送 bin log 给 Canal。

使用方式

开启 MySQL 的binlog

MySQL 的安装这里就不演示了,要是 macOS 的话,终端里面输入brew install mysql 就能搞定。
安装完成过后看下是否开启了 bin log ,如果没有开启则修改 my.cnf 增加 log-bin=mysql-bin 即可开启。输入命令mysql> show variables like 'log_bin'; 从图中可以看到这里是开启了 bin log 日志的。
2021-07-24-18-52-30-596170.png
接下来创建一个 canal 的账号,用于 canal 使用。创建一个 canal的账号,同时密码也是 canal。

  1. CREATE USER canal IDENTIFIED BY 'canal';
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  3. -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
  4. FLUSH PRIVILEGES;

安装 Canal

这里安装 1.1.5 的版本,可以直接 wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz 也可以在 GitHub 上面直接下载。
2021-07-24-18-52-30-829186.png
下载完解压后目录如图,需要修改配置文件,将账号密码以及 bin log 文件名配上
2021-07-24-18-52-31-426091.png2021-07-24-18-52-31-850091.png
配置完成过后,通过 bin 目录下的脚本进行启动,并且通过日志可以看到启动成功。
2021-07-24-18-52-32-280089.png
服务端启动成功后,就需要使用客户端去获取数据了,这里可以参考 Canal 的 GitHub 官网中提供的 example 样例去进行模拟。
这里有个坑要注意下,如果 MySQL 的版本是 8.0 以下应该没有这个问题,如果是 8.0 版本的,通过查看tail -f example.log 日志会发现如下异常Caused by: java.io.IOException: caching_sha2_password Auth failed。
2021-07-24-18-52-32-509141.png
经过在官方 GitHub 上面的 issue 中搜索到相关的错误信息 https://github.com/alibaba/canal/issues/1700,里面有大佬给了解决方案,在 MySQL 中执行如下命令即可解决

  1. ALTER USER 'canal'@'%' IDENTIFIED BY 'canal' PASSWORD EXPIRE NEVER;
  2. ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
  3. FLUSH PRIVILEGES;

如果没有遇到这个问题就可以直接忽略,接下来通过官方源码中的 example 示例来测试功能。把源码下载下来后找到com.alibaba.otter.canal.example.SimpleCanalClientTest 类,正常来说不需要修改什么内容,如果密码有变化的话这里可以调整,然后直接运行 main 函数即可。这个时候 MySQL,Canal,以及测试类都已经启动了,下面通过执行 SQL 来创建数据库和表以及插入相应的数据,观察控制台的输出情况。

数据变更

创建数据库

  1. mysql> create database canal_test;
  2. Query OK, 1 row affected (0.01 sec)
  3. mysql> use canal_test;
  4. Database changed
  5. mysql> show tables;
  6. Empty set (0.00 sec)

通过语句create database canal_test; 创建了数据库过后,可以看到控制有如下输出,已经监听到了 bin log 的变化了。
2021-07-24-18-52-32-756092.png

创建测试表

再执行如下语句创建数据表

  1. CREATE TABLE `example`
  2. (
  3. `id` INT(11) NOT NULL
  4. ,`username` VARCHAR(32) DEFAULT NULL COMMENT '用户名称'
  5. ,` age` INT(11) DEFAULT 0 COMMENT '用户年龄'
  6. ,` sex` INT(11) DEFAULT 0 COMMENT '用户性别 0 男 1 女'
  7. ,PRIMARY KEY (`id`)
  8. )
  9. ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户信息表';

2021-07-24-18-52-32-928106.png
可以看到成功的监听到了数据表的创建,接下来再试试插入数据和更新数据

  1. ## 插入语句
  2. INSERT INTO example VALUES(1,'张三', 18,0),(2,'李四', 19,0),(3,'王五', 20,1);
  3. ## 更新语句
  4. update example set username = '张小三' where id = 1;

2021-07-24-18-52-33-269098.png
从上图中可以看到插入的数据以及更新的数据都被实时的监听到了。监听到数据过后,就可以根据事件类型以及相应的库和表名来进行过滤操作了。可以通过配置 filter 来过滤需要监听的数据库和数据表或者字段,这个都是可以实现的,避免无用的数据变更带来的影响。