https://blog.51cto.com/u_15023237/2558435#h0
Canal 的 dump 支持串行和并行模式两种模式,本篇重点梳理 dump 的核心流程,以便对 dump 过程有一个充分的了解,更好的理解 Canal 的实现原理与细节,下一篇中将重点关注Canal是如何引入并行模式来提高dump的性能,即并行编程相关的技巧。
一、dump流程
1、Canal中dump方法声明如下
串行处理
public void dump(String binlogfilename,Long binlogPosition,SinkFunction func) throws IOException;
并行处理模式
带有参数MultiStageCoprocessor为并行处理模式,底层使用了disruptor高性能并发框架
public void dump(String binlogfilename, Long binlogPosition, MultiStageCoprocessor coprocessor) throws IOException;
2、MysqlConnection 具体实现
public class MysqlConnection implements ErosaConnection {
......
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
updateSettings();
loadBinlogChecksum();
sendRegisterSlave();
sendBinlogDump(binlogfilename, binlogPosition);
DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
fetcher.start(connector.getChannel());
LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
LogContext context = new LogContext();
context.setFormatDescription(new FormatDescriptionLogEvent(4, binlogChecksum));
while (fetcher.fetch()) {
accumulateReceivedBytes(fetcher.limit());
LogEvent event = null;
event = decoder.decode(fetcher, context);
if (event == null) {
throw new CanalParseException("parse failed");
}
if (!func.sink(event)) {
break;
}
if (event.getSemival() == 1) {
sendSemiAck(context.getLogPosition().getFileName(), context.getLogPosition().getPosition());
}
}
}
......
}
Step1:updateSettings() 在发送dump之前先设置相关的参数。
Step2: loadBinlogChecksum() 从主库查询binlog checksum, 具体向主库发送select @@golalbal.binlog_checksum语句
Step3: sendBinlogDump() 向MySQL Master 注册从节点,告知客户端的host、port、用户名与密码、serverId,具体实现是发送命令CODE为 0x15
Step4:向 MySQL Master 发送 dump 请求,MySQL是基于请求与应答模式,发送请求命令后,就会向网络通道中写入响应请求。(在这里大家不妨先大概思考一下如何读取 dump 命令的返回值,这部分虽然涉及到网络相关的知识,我在这边会稍微简单提一下)。
Step5:构建 DirectLogFetcher对象,实现基于 socket 的日志拉取服务,并构建 LogDecoder 对象,用于解析 binlog 日志。
Step6:使用 while 循环反复拉取消息,通过通过 LogDecoder 对二进制流进行解析,提取一条完整的binlog事件,交给 SinkFunction 去处理,并且如果开启了半同步机制,则需要向master发送ACK。既然是while循环,该方法的退出条件还是值得我们关注的:
- fetch.fetch()方法返回 false
- SinkFunction 的 sink 方法 false,SinkFunction的详细处理流程将在下文介绍,这里先告知返回false的情况是 binlog 日志解析线程已停止运行。
二、核心流程解析
上面粗略的介绍了 dump 命令的几个核心关键步骤,要想详细掌握其实现细节,我们必须继续深入探讨如下几个问题:
- DirectLogFetcher 内部工作机制
- LogDecoder binlog 日志解析
-
1、DirectLogFetcher内部工作机制
2.1 DirectLogFetcher 类图
DirectLogFetcher的类继承体系如上图所示,我们来看一下其关键点: LogBuffer日志buffer,主要定义如下属性:
- byte[] buffer缓存区中数据容器。
- int origin当前buffer中的读指针
- int limit当前buffer的最大可读可写指针
- int position当前buffer的写指针。
- int semival是否需要发送ACK(用于半同步)。LogBuffer封装了字节相关的操作,不仅定义了上面的属性,也定义了字节读取相关众多API,其截图如下:
在这里插入图片描述
- LogFetcher binlog日志抓取抽象类,定义了如下关键属性与抽象方法。
- int DEFAULT_INITIAL_CAPACITYLogBuffer中的初始容量,默认为8K。
- float DEFAULT_GROWTH_FACTOR容量增长因子,默认为 2.0。
- int BIN_LOG_HEADER_SIZEbinlog日志条目 header 的长度,固定为4字节。
- float factor增长因子。
- public abstract boolean fetch()抓取binlog日志。
- public abstract void close()关闭 Fetch。
- DirectLogFetcher Canal LogFetcher模式实现类,其核心属性如下:
- SocketChannel channel网络通道,用于发送dump请求的网络通道。
boolean issemi = false是否开启半同步。
2.2 fetch流程详解
接下来我们重点剖析 DirectLogFetcher 的 fetch 方法,来探究其实现原理。在研究DirectLogFetcher的fetch方法之前,我们先重点跟踪一下其内部网络读写方法fetch0方法,该方法是具体与网络读写相关的实现。
DirectLogFetcher#fetch0
在详细介绍该方法之前先来介绍一下其参数的含义:int off从通道中读取到的内容放入到buffer中的起始位置
- int len期望从通道中读取的字节长度。该方法的实现关键点如下:
- 首先先确保接收缓存区有足够的剩余空间,如果空间不足,则进行扩容。
- 然后从通道中读取指定长度的字节。接下来我们来重点看一下DirectLogFetcher的fetch的实现流程。
DirectLogFetcher#fetch
Step1:尝试从网络通道中读取4个字节(即读取协议的头部),如果通道中还没有可读取内容,返回false,造成的效果是一次 dump 请求结束。DirectLogFetcher#fetch
Step2:从上文读到的4个字节分别读出该网络包的总长度以及当前包的序号,从这里可以看成MySQL协议头为4字节,前3个字节为网络包的总长度,第4个字节为包的序列号。再取出数据包的长度后,继续向通道中读取netlen个字节,即读取一个完整的数据包到buffer中。DirectLogFetcher#fetch
Step3:继续从数据包中读取一个字节,判断该包的状态码,是否是一个成功的响应,如果是错误的响应,会向外抛出一次,Canal 会记录dump命令执行错误的次数。DirectLogFetcher#fetch
Step4:如果一个包的长度为允许的最大包长度,则继续读取,这个主要是根据MySQL协议做的处理,即读取到一个数据包,然后返回true,表示拉取到一条日志,然后通过LogDecoder解码,然后传入到sink方法中,进行日志的后续处理。
DirectLogFetcher#fetch
Step5:这一步的目的,就是将buffer中的当前指针指向数据的开始位置。这样一次 fetch就结束了。从上面的流程来看,DirectLogFetcher#fetch 方法结束后,就将进入到LogDecoder中。经过一次DirectLogFetcher#fetch方法后,即取回一条binlog日志,即二进制流,接下来就根据binlog协议对其解析。本文暂不深入该方法,如果大家想深入数据库中间件方面,可以作为一个很好的示例,面向MySQL通信协议进行编程。
3、SinkFunction
通过 LogDecoder从中解析一个事件后,会调用SinkFunction的sink方法,如果该方法返回 false,一次dump请求将介绍,接下来我们看一下其sink方法。
AbstractEventParser#start
该方法的实现比较简单,这里不打算继续深入,我们重点来看一下 Canal.Entry 的结构:在这里插入图片描述
这个结构是基于Canal做架构设计,解决顺序消费、数据不丢失一个重要参考依据,没解析一条事务,最终放入到环形缓存区,环形缓存区尽量以一个事务提交到Sink组件,其代码如下: