canal.properties配置
canal.id = 1canal.ip =canal.port= 11111canal.metrics.pull.port = 11112#指定zk集群的位置canal.zkServers = xxxxxx:2181canal.zookeeper.flush.period = 1000canal.withoutNetty = false# 使用rocketMQcanal.serverMode = rocketmq# flush meta cursor/parse position to filecanal.file.data.dir = ${canal.conf.dir}canal.file.flush.period = 1000## memory store RingBuffer size, should be Math.pow(2,n)canal.instance.memory.buffer.size = 16384## memory store RingBuffer used memory unit size , default 1kbcanal.instance.memory.buffer.memunit = 1024##内存模式canal.instance.memory.batch.mode = MEMSIZEcanal.instance.memory.rawEntry = true## canal检测instance的一些配置canal.instance.detecting.enable = falsecanal.instance.detecting.sql = select 1canal.instance.detecting.interval.time = 3canal.instance.detecting.retry.threshold = 3canal.instance.detecting.heartbeatHaEnable = false# 支持最大事务 这些无所谓canal.instance.transaction.size = 1024canal.instance.fallbackIntervalInSeconds = 60# 网络连接的一些配置canal.instance.network.receiveBufferSize = 16384canal.instance.network.sendBufferSize = 16384canal.instance.network.soTimeout = 30# binlog filter configcanal.instance.filter.druid.ddl = truecanal.instance.filter.query.dcl = truecanal.instance.filter.query.dml = falsecanal.instance.filter.query.ddl = truecanal.instance.filter.table.error = truecanal.instance.filter.rows = falsecanal.instance.filter.transaction.entry = true# 只支持binlog的 ROW模式canal.instance.binlog.format = ROWcanal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolationcanal.instance.get.ddl.isolation = false# parallel parser configcanal.instance.parser.parallel = true## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()#canal.instance.parser.parallelThreadSize = 16## disruptor ringbuffer size, must be power of 2canal.instance.parser.parallelBufferSize = 256# table meta tsdb infocanal.instance.tsdb.enable=falsecanal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsername=canalcanal.instance.tsdb.dbPassword=canal# dump snapshot interval, default 24 hourcanal.instance.tsdb.snapshot.interval=24# purge snapshot expire , default 360 hour(15 days)canal.instance.tsdb.snapshot.expire=360# rds oss binlog accountcanal.instance.rds.accesskey =canal.instance.rds.secretkey =#指定具体的instancecanal.destinations = account,balance,distribution,inventory,member,product,order# conf root dircanal.conf.dir = ../conf# 自动检测也开启canal.auto.scan = truecanal.auto.scan.interval = 5canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml#canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml#配置文件模式是springcanal.instance.global.mode = springcanal.instance.global.lazy = false#使用default-instance.xml这个xmlcanal.instance.global.spring.xml = classpath:spring/default-instance.xml
总结: 整体看下来canal.properties的配置的关键点也能读懂,主要是以下几点
- binlog只支持ROW模式
- instance配置了account,balance,….,order等
- 指定了zk
其他的配置可能由于我还不太熟练,所以影响不深
account/instance.properties解析
#指定slaveIdcanal.instance.mysql.slaveId=1101# 直接关掉 gtidoncanal.instance.gtidon=false# 监听的数据库canal.instance.master.address=10.11.7.24:3306canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=false#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=T9UKK3ZL#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=#配置用户名密码canal.instance.dbUsername=canalcanal.instance.dbPassword=canal-rfaDv5X3Vk92canal.instance.connectionCharset = utf8canal.instance.defaultDatabaseName = account_db# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# 同步的表数据canal.instance.filter.regex = account_db.sys_tenant,account_db.sys_user,account_db.sys_user_tenant# table black regexcanal.instance.filter.black.regex=
总结:instance.properties的配置相对而言好读,配置少,除了个别,比如gtid到底是个啥?tsdb?等其他都简单易懂
- 指定了监听的表信息:sys_tenant,sys_user,sys_user_tenant
- 指定数据库的连接地址用户名和密码
MQ的配置呢?
canal中已经配置rocketMQ模式,在canal-1.1.5已经可以在instance.properties中配置啦,但是这里锥智使用的是canal-1.1.1版本,所以需要在mq.yml中配置,这个yml也是canal自带的。
servers: xxxxxx:9876retries: 0batchSize: 16384lingerMs: 1bufferMemory: 33554432# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)canalBatchSize: 50# Canal get数据的超时时间, 单位: 毫秒, 0为不限超时canalGetTimeout: 100flatMessage: truecanalDestinations:- canalDestination: producttopic: product-sync-canalpartition: 0- canalDestination: ordertopic: order-sync-canalpartition: 0- canalDestination: membertopic: member-sync-canalpartition: 0- canalDestination: balancetopic: balance-sync-canalpartition: 0- canalDestination: inventorytopic: inventory-sync-canalpartition: 0- canalDestination: distributiontopic: distribution-sync-canalpartition: 0- canalDestination: accounttopic: account-sync-canalpartition: 0# #对应topic分区数量# partitionsNum: 3# partitionHash:# #库名.表名: 唯一主键# mytest.person: id
其他
其他的instance配置都类似就不一个一个看啦
