前置条件

  • MySQL已经安装完毕,binlog已经开启,canal canal账号已经创建
  • canal.deployer-1.1.5.tar.gz 已下载

    简单的QuickStart

    • 创建目录并解压canal的tar包
    • 修改conf/example/instance.properties
    • 执行启动sh脚本 ```bash

      下载Canal的tar包,创建目录并解压到该目录下

      mkdir canal-1.1.5 tar -zxvf canal.deployer-1.1.5.tar.gz -C canal-1.1.5

      默认的instance.properties不用做任何修改就可以启动,前提,数据库用户名和密码是canal

      sh bin/startup.sh
  1. ```yaml
  2. ## mysql serverId
  3. canal.instance.mysql.slaveId = 1234
  4. #position info,需要改成自己的数据库信息
  5. canal.instance.master.address = 127.0.0.1:3306
  6. canal.instance.master.journal.name =
  7. canal.instance.master.position =
  8. canal.instance.master.timestamp =
  9. #username/password,需要改成自己的数据库信息
  10. canal.instance.dbUsername = canal
  11. canal.instance.dbPassword = canal
  12. canal.instance.defaultDatabaseName =
  13. canal.instance.connectionCharset = UTF-8
  14. #table regex
  15. canal.instance.filter.regex = .\*\\\\..\*

整合RocketMQ的QuickStart

instance.properties配置

  1. example/instance.properties文件配置 ```bash

    salveId

    canal.instance.mysql.slaveId=1234

    指定数据库地址

    canal.instance.master.address=127.0.0.1:3306

数据库用户名密码

canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8

enable druid Decrypt database password

canal.instance.enableDruid=false

table regex 监听哪些表和库

canal.instance.filter.regex=.\..

table black regex 屏蔽哪些库和表

canal.instance.filter.black.regex=mysql\.slave_.*

发送mq到哪个Topic

canal.mq.topic=example canal.mq.partition=0

  1. <a name="wykEW"></a>
  2. ## canal.properties配置
  3. 1. `canal.properties`修改内容展示
  4. ```bash
  5. # tcp, kafka, rocketMQ, rabbitMQ 修改接收方位RocketMQ
  6. canal.serverMode = rocketMQ
  7. ...省略..
  8. ##################################################
  9. ######### RocketMQ #############
  10. ##################################################
  11. #指定生产者组名称
  12. rocketmq.producer.group = test
  13. rocketmq.enable.message.trace = false
  14. rocketmq.customized.trace.topic =
  15. rocketmq.namespace =
  16. #指定rocketMq的naverserver
  17. rocketmq.namesrv.addr = 192.168.2.1:9876
  18. rocketmq.retry.times.when.send.failed = 0
  19. rocketmq.vip.channel.enabled = false
  20. #设置标签
  21. rocketmq.tag = gx

成果展示

image.png

动态Topic

前面的配置中我们将所有收集到的binlog日志都发送到了example主题上。如果我们希望区分他们,则可以通过配置动态主题来解决(配置example下的instance.properties文件)
动态主题的配置方式就是,指定哪个库哪个表,canal会自动发送到库名_表名的主题上。

例1

mytest.user:数据库mytest1中的数据变化都会发送到主题为 mytest_user中

例2

mytest2\..*:数据库mytest2中的表变化会发送到主题 mytest2_${表名}中

  1. canal.instance.mysql.slaveId=1234
  2. # 指定数据库地址
  3. canal.instance.master.address=127.0.0.1:3306
  4. # 数据库用户名密码
  5. canal.instance.dbUsername=canal
  6. canal.instance.dbPassword=canal
  7. canal.instance.connectionCharset = UTF-8
  8. # enable druid Decrypt database password
  9. canal.instance.enableDruid=false
  10. # table regex 监听哪些表和库
  11. canal.instance.filter.regex=.*\\..*
  12. # table black regex 屏蔽哪些库和表
  13. canal.instance.filter.black.regex=mysql\\.slave_.*
  14. # 发送mq到哪个Topic
  15. canal.mq.topic=example
  16. canal.mq.dynamicTopic=mytest.user,mytest2\\..*
  17. canal.mq.partition=0
  1. 若符合canal.mq.dynamicTopic配置项中指定的库和表,则对应发送到${库名}_${表名}的topic中
  2. 若没有符合canal.mq.dynamicTopic配置项中指定的库和表,则还是到exampleTopic中