前置条件
```yaml
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
整合RocketMQ的QuickStart
instance.properties配置
example/instance.properties
文件配置 ```bashsalveId
canal.instance.mysql.slaveId=1234指定数据库地址
canal.instance.master.address=127.0.0.1:3306
数据库用户名密码
canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8
enable druid Decrypt database password
canal.instance.enableDruid=false
table regex 监听哪些表和库
canal.instance.filter.regex=.\..
table black regex 屏蔽哪些库和表
canal.instance.filter.black.regex=mysql\.slave_.*
发送mq到哪个Topic
canal.mq.topic=example canal.mq.partition=0
<a name="wykEW"></a>
## canal.properties配置
1. `canal.properties`修改内容展示
```bash
# tcp, kafka, rocketMQ, rabbitMQ 修改接收方位RocketMQ
canal.serverMode = rocketMQ
...省略..
##################################################
######### RocketMQ #############
##################################################
#指定生产者组名称
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
#指定rocketMq的naverserver
rocketmq.namesrv.addr = 192.168.2.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
#设置标签
rocketmq.tag = gx
成果展示
动态Topic
前面的配置中我们将所有收集到的binlog日志都发送到了example主题上。如果我们希望区分他们,则可以通过配置动态主题来解决(配置example下的instance.properties
文件)
动态主题的配置方式就是,指定哪个库哪个表,canal会自动发送到库名_表名的主题上。
例1
mytest.user:数据库mytest1中的数据变化都会发送到主题为 mytest_user中
例2
mytest2\..*:数据库mytest2中的表变化会发送到主题 mytest2_${表名}中
canal.instance.mysql.slaveId=1234
# 指定数据库地址
canal.instance.master.address=127.0.0.1:3306
# 数据库用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex 监听哪些表和库
canal.instance.filter.regex=.*\\..*
# table black regex 屏蔽哪些库和表
canal.instance.filter.black.regex=mysql\\.slave_.*
# 发送mq到哪个Topic
canal.mq.topic=example
canal.mq.dynamicTopic=mytest.user,mytest2\\..*
canal.mq.partition=0
- 若符合
canal.mq.dynamicTopic
配置项中指定的库和表,则对应发送到${库名}_${表名}的topic中 - 若没有符合
canal.mq.dynamicTopic
配置项中指定的库和表,则还是到example
Topic中