准备环境

canal.deployer-1.1.4.tar.gz
mysql-5.7.16

修改配置文件

修改 MySQL 配置

开启 MySQL binlog

  1. sudo vim /etc/my.cnf
  1. server-id= 1
  2. #不能和canal的slaveId重复
  3. log-bin= mysql-bin
  4. # binlog日志的前缀mysql-bin,每次mysql重启或者到达单个文件大小的阈值时生成一个文件,按顺序编号
  5. binlog_format= row
  6. # [statement|mixed|row]
  7. binlog-do-db= mall
  8. # binlog-do-db 指定需要同步的数据库,采集多个数据库再起一行,也可使用 ignore 方式
  9. expire_logs_days= 7
  10. # binlog文件保存7天
  11. max_binlog_size= 500m
  12. # 每个binlog日志文件大小

重启 MySQL 使配置生效

  1. sudo systemctl restart mysqld

/var/lib/mysql目录下查看文件
image.png

创建 canal 用户,赋权限

  1. set global validate_password_length=4;
  2. set global validate_password_policy=0;
  3. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

修改 Canal 配置

修改 canal 配置

  1. vim /canal/conf/canal.properties
  1. #################################################
  2. ######### common argument #############
  3. #################################################
  4. # tcp bind ip
  5. canal.ip =
  6. # register ip to zookeeper
  7. canal.register.ip =
  8. canal.port = 11111
  9. canal.metrics.pull.port = 11112
  10. # canal instance user/passwd
  11. # canal.user = canal
  12. # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
  13. # canal admin config
  14. #canal.admin.manager = 127.0.0.1:8089
  15. canal.admin.port = 11110
  16. canal.admin.user = admin
  17. canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
  18. canal.zkServers =
  19. # flush data to zk
  20. canal.zookeeper.flush.period = 1000
  21. canal.withoutNetty = false
  22. # tcp, kafka, RocketMQ
  23. canal.serverMode = kafka
  24. # flush meta cursor/parse position to file
  25. canal.file.data.dir = ${canal.conf.dir}
  26. canal.file.flush.period = 1000
  27. ## memory store RingBuffer size, should be Math.pow(2,n)
  28. canal.instance.memory.buffer.size = 16384
  29. ## memory store RingBuffer used memory unit size , default 1kb
  30. canal.instance.memory.buffer.memunit = 1024
  31. ## meory store gets mode used MEMSIZE or ITEMSIZE
  32. canal.instance.memory.batch.mode = MEMSIZE
  33. canal.instance.memory.rawEntry = true
  34. ## detecing config
  35. canal.instance.detecting.enable = false
  36. #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
  37. canal.instance.detecting.sql = select 1
  38. canal.instance.detecting.interval.time = 3
  39. canal.instance.detecting.retry.threshold = 3
  40. canal.instance.detecting.heartbeatHaEnable = false
  41. # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
  42. canal.instance.transaction.size = 1024
  43. # mysql fallback connected to new master should fallback times
  44. canal.instance.fallbackIntervalInSeconds = 60
  45. # network config
  46. canal.instance.network.receiveBufferSize = 16384
  47. canal.instance.network.sendBufferSize = 16384
  48. canal.instance.network.soTimeout = 30
  49. # binlog filter config
  50. canal.instance.filter.druid.ddl = true
  51. canal.instance.filter.query.dcl = false
  52. canal.instance.filter.query.dml = false
  53. canal.instance.filter.query.ddl = false
  54. canal.instance.filter.table.error = false
  55. canal.instance.filter.rows = false
  56. canal.instance.filter.transaction.entry = false
  57. # binlog format/image check
  58. canal.instance.binlog.format = ROW,STATEMENT,MIXED
  59. canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
  60. # binlog ddl isolation
  61. canal.instance.get.ddl.isolation = false
  62. # parallel parser config
  63. canal.instance.parser.parallel = true
  64. ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
  65. #canal.instance.parser.parallelThreadSize = 16
  66. ## disruptor ringbuffer size, must be power of 2
  67. canal.instance.parser.parallelBufferSize = 256
  68. # table meta tsdb info
  69. canal.instance.tsdb.enable = true
  70. canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
  71. canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
  72. canal.instance.tsdb.dbUsername = canal
  73. canal.instance.tsdb.dbPassword = canal
  74. # dump snapshot interval, default 24 hour
  75. canal.instance.tsdb.snapshot.interval = 24
  76. # purge snapshot expire , default 360 hour(15 days)
  77. canal.instance.tsdb.snapshot.expire = 360
  78. # aliyun ak/sk , support rds/mq
  79. canal.aliyun.accessKey =
  80. canal.aliyun.secretKey =
  81. #################################################
  82. ######### destinations #############
  83. #################################################
  84. canal.destinations = example
  85. # conf root dir
  86. canal.conf.dir = ../conf
  87. # auto scan instance dir add/remove and start/stop instance
  88. canal.auto.scan = true
  89. canal.auto.scan.interval = 5
  90. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
  91. #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
  92. canal.instance.global.mode = spring
  93. canal.instance.global.lazy = false
  94. canal.instance.global.manager.address = ${canal.admin.manager}
  95. #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
  96. canal.instance.global.spring.xml = classpath:spring/file-instance.xml
  97. #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  98. ##################################################
  99. ######### MQ #############
  100. ##################################################
  101. canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
  102. canal.mq.retries = 0
  103. canal.mq.batchSize = 16384
  104. canal.mq.maxRequestSize = 1048576
  105. canal.mq.lingerMs = 100
  106. canal.mq.bufferMemory = 33554432
  107. canal.mq.canalBatchSize = 50
  108. canal.mq.canalGetTimeout = 100
  109. canal.mq.flatMessage = true
  110. canal.mq.compressionType = none
  111. canal.mq.acks = all
  112. #canal.mq.properties. =
  113. canal.mq.producerGroup = test
  114. # Set this value to "cloud", if you want open message trace feature in aliyun.
  115. canal.mq.accessChannel = local
  116. # aliyun mq namespace
  117. #canal.mq.namespace =
  118. ##################################################
  119. ######### Kafka Kerberos Info #############
  120. ##################################################
  121. canal.mq.kafka.kerberos.enable = false
  122. canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
  123. canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

canal服务中可以有多个instance,conf/下的每一个example即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例example,如果需要多个实例处理不同的MySQL数据的话,直接拷贝出多个example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties中的canal.destinations=实例1,实例2。

修改 instance.properties

  1. #################################################
  2. ## mysql serverId , v1.0.26+ will autoGen
  3. # canal.instance.mysql.slaveId=0
  4. # enable gtid use true/false
  5. canal.instance.gtidon=false
  6. # position info
  7. canal.instance.master.address=hadoop102:3306
  8. canal.instance.master.journal.name=
  9. canal.instance.master.position=
  10. canal.instance.master.timestamp=
  11. canal.instance.master.gtid=
  12. # rds oss binlog
  13. canal.instance.rds.accesskey=
  14. canal.instance.rds.secretkey=
  15. canal.instance.rds.instanceId=
  16. # table meta tsdb info
  17. canal.instance.tsdb.enable=true
  18. #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
  19. #canal.instance.tsdb.dbUsername=canal
  20. #canal.instance.tsdb.dbPassword=canal
  21. #canal.instance.standby.address =
  22. #canal.instance.standby.journal.name =
  23. #canal.instance.standby.position =
  24. #canal.instance.standby.timestamp =
  25. #canal.instance.standby.gtid=
  26. # username/password
  27. canal.instance.dbUsername=canal
  28. canal.instance.dbPassword=canal
  29. canal.instance.connectionCharset = UTF-8
  30. # enable druid Decrypt database password
  31. canal.instance.enableDruid=false
  32. #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
  33. # table regex
  34. canal.instance.filter.regex=.*\\..*
  35. # table black regex
  36. canal.instance.filter.black.regex=
  37. # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  38. #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
  39. # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  40. #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
  41. # mq config
  42. canal.mq.topic=gmall-canal
  43. # dynamic topic route by schema or table regex
  44. #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
  45. #canal.mq.partition=0
  46. # hash partition config
  47. canal.mq.partitionsNum=3
  48. canal.mq.partitionHash=.*\\..*
  49. #################################################

canal.mq.dynamicTopic 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table,多个配置之间使用逗号分隔

  • 例子1:test\.test 指定匹配的单表,发送到以 test\.test为名字的topic上
  • 例子2:.\..* 匹配所有表,每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1\.test1 topic上,其余的表发送到默认的canal.mq.topic值

大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

canal.mq.partitionHash 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

  • 例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
  • 例子2:.\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
  • 例子3:.\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
  • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
  • 例子5:.\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
  • 例子6: test\.test:id,.\..* , 针对test的表按照id散列,其余的表按照table散列

注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

mq顺序性问题

binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注。

  1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
  2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
  • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
  • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
  1. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
  • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
  • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
  • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意

    启动、测试

    启动 Canal

    1. /canal/bin/startup.sh
    image.png

    测试

    开启kafka消费者,对数据库进行数据插入,获取数据
    1. {
    2. "data": [
    3. {
    4. "id": "1507294755656597508",
    5. "user_id": "2",
    6. "sku_id": "6",
    7. "spu_id": "6",
    8. "order_id": "3476",
    9. "appraise": "1204",
    10. "comment_txt": "评论内容:88216992174138761832315899875368944975284334465219",
    11. "create_time": "2021-03-25 17:54:08",
    12. "operate_time": null
    13. }
    14. ],
    15. "database": "gmall",
    16. "es": 1648202048000,
    17. "id": 43,
    18. "isDdl": false,
    19. "mysqlType": {
    20. "id": "bigint(20)",
    21. "user_id": "bigint(20)",
    22. "sku_id": "bigint(20)",
    23. "spu_id": "bigint(20)",
    24. "order_id": "bigint(20)",
    25. "appraise": "varchar(10)",
    26. "comment_txt": "varchar(2000)",
    27. "create_time": "datetime",
    28. "operate_time": "datetime"
    29. },
    30. "old": null,
    31. "pkNames": [
    32. "id"
    33. ],
    34. "sql": "",
    35. "sqlType": {
    36. "id": -5,
    37. "user_id": -5,
    38. "sku_id": -5,
    39. "spu_id": -5,
    40. "order_id": -5,
    41. "appraise": 12,
    42. "comment_txt": 12,
    43. "create_time": 93,
    44. "operate_time": 93
    45. },
    46. "table": "comment_info",
    47. "ts": 1648202054045,
    48. "type": "INSERT"
    49. }

参考文档