准备环境
canal.deployer-1.1.4.tar.gzmysql-5.7.16
修改配置文件
修改 MySQL 配置
开启 MySQL binlog
sudo vim /etc/my.cnf
server-id= 1#不能和canal的slaveId重复log-bin= mysql-bin# binlog日志的前缀mysql-bin,每次mysql重启或者到达单个文件大小的阈值时生成一个文件,按顺序编号binlog_format= row# [statement|mixed|row]binlog-do-db= mall# binlog-do-db 指定需要同步的数据库,采集多个数据库再起一行,也可使用 ignore 方式expire_logs_days= 7# binlog文件保存7天max_binlog_size= 500m# 每个binlog日志文件大小
重启 MySQL 使配置生效
sudo systemctl restart mysqld
创建 canal 用户,赋权限
set global validate_password_length=4;set global validate_password_policy=0;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
修改 Canal 配置
修改 canal 配置
vim /canal/conf/canal.properties
########################################################## common argument ############################################################### tcp bind ipcanal.ip =# register ip to zookeepercanal.register.ip =canal.port = 11111canal.metrics.pull.port = 11112# canal instance user/passwd# canal.user = canal# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config#canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.zkServers =# flush data to zkcanal.zookeeper.flush.period = 1000canal.withoutNetty = false# tcp, kafka, RocketMQcanal.serverMode = kafka# flush meta cursor/parse position to filecanal.file.data.dir = ${canal.conf.dir}canal.file.flush.period = 1000## memory store RingBuffer size, should be Math.pow(2,n)canal.instance.memory.buffer.size = 16384## memory store RingBuffer used memory unit size , default 1kbcanal.instance.memory.buffer.memunit = 1024## meory store gets mode used MEMSIZE or ITEMSIZEcanal.instance.memory.batch.mode = MEMSIZEcanal.instance.memory.rawEntry = true## detecing configcanal.instance.detecting.enable = false#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()canal.instance.detecting.sql = select 1canal.instance.detecting.interval.time = 3canal.instance.detecting.retry.threshold = 3canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions deliverycanal.instance.transaction.size = 1024# mysql fallback connected to new master should fallback timescanal.instance.fallbackIntervalInSeconds = 60# network configcanal.instance.network.receiveBufferSize = 16384canal.instance.network.sendBufferSize = 16384canal.instance.network.soTimeout = 30# binlog filter configcanal.instance.filter.druid.ddl = truecanal.instance.filter.query.dcl = falsecanal.instance.filter.query.dml = falsecanal.instance.filter.query.ddl = falsecanal.instance.filter.table.error = falsecanal.instance.filter.rows = falsecanal.instance.filter.transaction.entry = false# binlog format/image checkcanal.instance.binlog.format = ROW,STATEMENT,MIXEDcanal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolationcanal.instance.get.ddl.isolation = false# parallel parser configcanal.instance.parser.parallel = true## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()#canal.instance.parser.parallelThreadSize = 16## disruptor ringbuffer size, must be power of 2canal.instance.parser.parallelBufferSize = 256# table meta tsdb infocanal.instance.tsdb.enable = truecanal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsername = canalcanal.instance.tsdb.dbPassword = canal# dump snapshot interval, default 24 hourcanal.instance.tsdb.snapshot.interval = 24# purge snapshot expire , default 360 hour(15 days)canal.instance.tsdb.snapshot.expire = 360# aliyun ak/sk , support rds/mqcanal.aliyun.accessKey =canal.aliyun.secretKey =########################################################## destinations ##############################################################canal.destinations = example# conf root dircanal.conf.dir = ../conf# auto scan instance dir add/remove and start/stop instancecanal.auto.scan = truecanal.auto.scan.interval = 5canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = springcanal.instance.global.lazy = falsecanal.instance.global.manager.address = ${canal.admin.manager}#canal.instance.global.spring.xml = classpath:spring/memory-instance.xmlcanal.instance.global.spring.xml = classpath:spring/file-instance.xml#canal.instance.global.spring.xml = classpath:spring/default-instance.xml########################################################### MQ ###############################################################canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092canal.mq.retries = 0canal.mq.batchSize = 16384canal.mq.maxRequestSize = 1048576canal.mq.lingerMs = 100canal.mq.bufferMemory = 33554432canal.mq.canalBatchSize = 50canal.mq.canalGetTimeout = 100canal.mq.flatMessage = truecanal.mq.compressionType = nonecanal.mq.acks = all#canal.mq.properties. =canal.mq.producerGroup = test# Set this value to "cloud", if you want open message trace feature in aliyun.canal.mq.accessChannel = local# aliyun mq namespace#canal.mq.namespace =########################################################### Kafka Kerberos Info ###############################################################canal.mq.kafka.kerberos.enable = falsecanal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
canal服务中可以有多个instance,conf/下的每一个example即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例example,如果需要多个实例处理不同的MySQL数据的话,直接拷贝出多个example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties中的canal.destinations=实例1,实例2。
修改 instance.properties
################################################### mysql serverId , v1.0.26+ will autoGen# canal.instance.mysql.slaveId=0# enable gtid use true/falsecanal.instance.gtidon=false# position infocanal.instance.master.address=hadoop102:3306canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=true#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/passwordcanal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq configcanal.mq.topic=gmall-canal# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*#canal.mq.partition=0# hash partition configcanal.mq.partitionsNum=3canal.mq.partitionHash=.*\\..*#################################################
canal.mq.dynamicTopic 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema.table,多个配置之间使用逗号分隔
- 例子1:test\.test 指定匹配的单表,发送到以 test\.test为名字的topic上
- 例子2:.\..* 匹配所有表,每个表都会发送到各自表名的topic上
- 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
- 例子4:test\.* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
- 例子5:test,test1\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\.test1的表发送到对应的test1\.test1 topic上,其余的表发送到默认的canal.mq.topic值
大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力
canal.mq.partitionHash 表达式说明
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔
- 例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
- 例子2:.\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
- 例子3:.\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
- 例子4: 匹配规则啥都不写,则默认发到0这个partition上
- 例子5:.\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
- 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
- 例子6: test\.test:id,.\..* , 针对test的表按照id散列,其余的表按照table散列
注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)
mq顺序性问题
binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注。
- canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
- canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
- canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
- canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
- canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
- 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
- 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
- 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意
启动、测试
启动 Canal
/canal/bin/startup.sh
测试
开启kafka消费者,对数据库进行数据插入,获取数据{"data": [{"id": "1507294755656597508","user_id": "2","sku_id": "6","spu_id": "6","order_id": "3476","appraise": "1204","comment_txt": "评论内容:88216992174138761832315899875368944975284334465219","create_time": "2021-03-25 17:54:08","operate_time": null}],"database": "gmall","es": 1648202048000,"id": 43,"isDdl": false,"mysqlType": {"id": "bigint(20)","user_id": "bigint(20)","sku_id": "bigint(20)","spu_id": "bigint(20)","order_id": "bigint(20)","appraise": "varchar(10)","comment_txt": "varchar(2000)","create_time": "datetime","operate_time": "datetime"},"old": null,"pkNames": ["id"],"sql": "","sqlType": {"id": -5,"user_id": -5,"sku_id": -5,"spu_id": -5,"order_id": -5,"appraise": 12,"comment_txt": 12,"create_time": 93,"operate_time": 93},"table": "comment_info","ts": 1648202054045,"type": "INSERT"}
参考文档
- canal实现mysql实时数据binlog同步
- 对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart
- a1.多数据源配置通道:https://www.2cto.com/database/201609/547661.html
- a2.常见的异常通道:https://blog.csdn.net/my201110lc/article/details/77885720
