配置
canal.mq.flatMessage = true # 使用文本格式(JSON)进行传输,否则 Kafka 里扔进去的是二进制数据,虽然不影响,但是看起来不方便
2. 修改kafka集群的地址
```c
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092,192.168.33.151:9092
- 指定主题
example/instance.properties
# mq config
canal.mq.topic=canal
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
配置好之后重新启动canal
这里我们打开kafka的消费
bin/kafka-console-consumer.sh --bootstrap-server 192.168.xxx.xxx:9092 --topic canal --from-beginning
然后运行MySQL
insert into hhy_test.canal_test VALUES(1,'test',now())
消费
这个时候kafka出来的数据是json格式的,我们可以在java里进行处理
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
然后直接输入kafka的java控制消费端代码
import java.util.Collections;
import java.util.Properties;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
public class Consumer {
public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.xxx.xxx:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "canal");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
JSONObject obj = JSON.parseObject(value);
if ("INSERT".equalsIgnoreCase(obj.getString("type"))
&& "hhy_test".equalsIgnoreCase(obj.getString("database"))
&& "canal_test".equalsIgnoreCase(obj.getString("table"))) {
JSONArray dataArry = obj.getJSONArray("data");
if (dataArry != null && dataArry.size() > 0) {
for (int i = 0; i < dataArry.size(); i++) {
System.out.println(dataArry.getJSONObject(i).toJSONString());
}
}
}
}
}
}
}
消息体大小设置问题
消费端 Kafka 不进数据,Canal 日志报错 org.apache.kafka.common.errors.RecordTooLargeException,认为是 Kafka 消息体大小限制造成的,需要同时修改 Kafka 与 Canal 消息体的最大限制
- 修改 Kafka 配置,server.properties 中修改或添加配置项 message.max.bytes=100000000,producer.properties 中修改或添加配置项 max.request.size=100000000,consumer.properties 中修改或添加配置项 max.partition.fetch.bytes=100000000,重启 Kafka
- 修改 Canal 配置,canal.properties 修改 canal.mq.maxRequestSize 参数值为 90000000,重启 Canal
查看 Canal 日志是否报错 Could not find first log file name in binary log index file at… 如果报错则停止 Canal ,再删除实例配置下的 meta.dat 文件,再启动 Canal 即可
多分片
多线程并发消费,如果我们单纯地用多线程并发消费的话并不能保证消息的有序性,这种binlog日志同步是需要严格有序性的,否则会导致数据错乱。那有没有办法能够保证顺序的情况下并发消费呢?答案是有的,即将指定数据发送到指定分区当中,然后起多个消费者消费不同分区的数据即可,并且Canal提供写入指定分区的配置
在Canal配置文件中的mq config里面配置如下
# mq config
canal.mq.topic=house-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
# canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=demo\\.pricing_house_info:house_code
这里面主要配置了canal.mq.partitionsNum和canal.mq.partitionHash两个参数,他们的意思如下:
- canal.mq.partitionsNum:指定当前topic的分区数
- canal.mq.partitionHash:指定到分区的分区规则,可以细化到字段
这里我们指定topic有3个分区,并且使用pricing_house_info表中的house_code字段来进行划分,即让相同house_code的数据全部到一个分区当中
修改项目代码
原先项目中只有一个消费,现在再添加两个消费的方法,让三个消费者能够消费不同分区的数据,通过@TopicPartition注解指定topic和对应的分区,并且可以同时消费多个分区的数据,三个消费者的groupId一定要保持一致,因为Kafka指定在一个group里面一条partition的消息只能被一个消费者消费
@Component
public class MessageListener {
@KafkaListener(topicPartitions = {@TopicPartition(topic = "house-topic",partitions = {"0"})}, groupId = "house-consumer-group")
public void partition1(ConsumerRecord<?, ?> record){
receive(record);
}
@KafkaListener(topicPartitions = {@TopicPartition(topic = "house-topic",partitions = {"1"})}, groupId = "house-consumer-group")
public void partition2(ConsumerRecord<?, ?> record){
receive(record);
}
@KafkaListener(topicPartitions = {@TopicPartition(topic = "house-topic",partitions = {"2"})}, groupId = "house-consumer-group")
public void partition3(ConsumerRecord<?, ?> record){
receive(record);
}
private void receive(ConsumerRecord<?, ?> record){
final String value = (String) record.value();
FlatMessage flatMessage = JSONObject.parseObject(value, FlatMessage.class);
final String houseCode = flatMessage.getData().get(0).get("house_code");
System.out.println("分区:"+record.partition()+"\t接收到数据的code:"+houseCode+"\t操作类别:"+flatMessage.getType());
}
}
通过这样的方式我们可以确保相同house_code的数据到同一个分区被同一个消费者有序消费且只消费一次,这样即可达到目的
利用Canal将数据根据字段写入不同分区且消费者消费指定分区数据,增加了消费的吞吐量,并且保证了单个消费者的消息有序性以及单条记录(同一house_code的数据)的处理有序性,本方案是在单行数据基础上来进行分区匹配的,还可以在表和数据库的基础上进行分区匹配,修改Canal参数即可。
mq的一些配置
canal.mq.partitionHash 表达式说明:
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔
例子1:test\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
例子2:.\…:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
例子3:.\…:p k pkpk 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
例子4: 匹配规则啥都不写,则默认发到0这个partition上
例子5:.\… ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
例子6: test\.test:id,.\…* , 针对test的表按照id散列,其余的表按照table散列
注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)
mq顺序性问题
binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答
canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性 ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意