canal 的EventStore基于本地内存存储实现。数据的存储、读取和ack采用类似Disruptor的RingBuffer的实现思路:
RingBuffer定义了3个cursor:
- Put : Sink模块进行数据存储的最后一次写入位置
- Get : 数据订阅获取的最后一次提取位置
- Ack : 数据消费成功的最后一次消费位置
canal客户端在数据消费支持并行消费、批量消费和异步消费,增大消费处理能力。
面示例代码演示了一个从canal server集群中循环拉取消息的过程:首先设置canal server的zk地址和destination(对应一个canal server)以及订阅的库表(可以通过filter可以对destination进一步过滤筛选)等信息,然后通过调用getWithoutAck方法批量读取,如果读取并且处理没有异常抛出,就可以通过ack确认进行下一批次读取。
(Canal~ES)执行时,速率太慢,导致数据积压
MQ的消费是按照顺序消费的,为提高消费速率,且保证顺序消费,可以先把数据同步到MQ中,然后执行消费逻辑