canal是阿里巴巴的开源组件,主要工作原理是基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

本次项目实践,只需要通过canal获取MySQL的binlog日志数据,并通过Python将获取到的binlog数据落到文件中。

canal官方文档

canal Python客户端

注意:在配置MySQL的my.cnf文件后,一定记得重启MySQL的服务,使之生效。

构建canal python客户端

  1. pip install canal-python

但运行Python Sample的代码时,导入包from canal.client import Client报错:

Traceback (most recent call last):
File “”, line 1, in
File “C:\ProgramData\Anaconda3\lib\site-packages\canal\client.py”, line 6, in
from .protocol import CanalProtocol_pb2
File “C:\ProgramData\Anaconda3\lib\site-packages\canal\protocol\CanalProtocol_pb2.py”, line 7, in
from google.protobuf.internal import enum_type_wrapper
ModuleNotFoundError: No module named ‘google’

网上查询资料发现是缺少包,所以再安装个包解决问题:pip install protobuf

配合MySQL做测试

1、开启canal客户端

更改client.py运行参数client.connect(host='192.168.142.102'),提示如下信息,说明成功:

connected to 192.168.142.102:11111
Auth succed
Subscribe succed

2、MySQL操作数据
  1. use test;
  2. create table baseinfo(
  3. id int,
  4. name varchar(50)
  5. );
  6. insert into baseinfo values (1, 'tom');
  7. update baseinfo set name = 'ethan' where id = 1;
  8. delete from baseinfo where id = 1;

Python显示如下信息:

{‘db’: ‘test’, ‘table’: ‘baseinfo’, ‘event_type’: 1, ‘data’: {‘name’: ‘tom’}}
{‘db’: ‘test’, ‘table’: ‘baseinfo’, ‘event_type’: 2, ‘data’: {‘before’: {‘id’: ‘1’, ‘name’: ‘ethan’}, ‘after’: {‘id’: ‘1’, ‘name’: ‘ethan’}}}
{‘db’: ‘test’, ‘table’: ‘baseinfo’, ‘event_type’: 3, ‘data’: {‘name’: ‘ethan’}}

可以看到,event_type为MySQL事件类型,1—INSERT,2—UPDATE,3—DELETE。

但是注意到,这个Python的返回信息有些问题:

  • INSERT和DELETE时,不能显示完整的字段信息
  • 当UPDATE时,返回的before数据是错误的
  • 信息中没有必要的时间信息

所以,还需要对代码进行修改和优化。

改完后再进行测试:

  1. insert into baseinfo values (1, 'tom');
  2. update baseinfo set name = 'ethan' where id = 1;
  3. delete from baseinfo where id = 1;

Python显示如下信息:

{‘db’: ‘test’, ‘table’: ‘baseinfo’, ‘event_type’: 1, ‘execute_time’: ‘2020-09-27 09:27:20’, ‘data’: {‘id’: ‘1’, ‘name’: ‘tom’}}
{‘db’: ‘test’, ‘table’: ‘baseinfo’, ‘event_type’: 2, ‘execute_time’: ‘2020-09-27 09:27:20’, ‘data’: {‘before’: {‘id’: ‘1’, ‘name’: ‘tom’}, ‘after’: {‘id’: ‘1’, ‘name’: ‘ethan’}}}
{‘db’: ‘test’, ‘table’: ‘baseinfo’, ‘event_type’: 3, ‘execute_time’: ‘2020-09-27 09:27:20’, ‘data’: {‘id’: ‘1’, ‘name’: ‘ethan’}}

显示无误!