依赖
<properties>
<canal.version>1.1.4</canal.version>
</properties>
<!-- canal common -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.common</artifactId>
<version>${canal.version}</version>
</dependency>
<!-- canal client -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<!-- canal module -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal.version}</version>
</dependency>
连接 CanalServer
@Configuration
public class CanalClient implements DisposableBean {
private static final Logger log = LoggerFactory.getLogger(CanalClient.class);
private canalConnector;
@Bean
public CanalConnector getCanalConnector() {
// 也可以用 CanalConnectors.newSingleConnector
canalConnector = CanalConnectors.newClusterConnector(
Lists.newArrayList(new InetSocketAddress("127.0.0.1", 11111)),
// new InetSocketAddress("127.0.0.1", 11111),
// 集群名称
"example", // canal server 的实例
"", // canal server 账号
"" // canal server 密码
);
// 1. 连接
canalConnector.connect();
// 2. 指定 filter, 格式 {database}.{table}
// canalConnector.subscribe("{search}.{test}");
canalConnector.subscribe();
// 3. 回滚寻找上次终端的位置
canalConnector.rollback();
return canalConnector;
}
@Override
public void destroy() throws Exception {
log.info("close CanalClient");
}
}
定时查询
@Component
public class CanalScheduling implements Runnable {
private static final Logger log = LoggerFactory.getLogger(CanalScheduling.class);
@Autowired
private CanalConnector canalConnector;
// 100ms 查询一次
@Override
@Scheduled(fixedDelay = 100)
public void run() {
long batchId = -1;
try {
// 1. 批量获取 binlog 消息(需要手动 ack)
int batchSize = 100;
Message message = canalConnector.getWithoutAck(batchSize);
// 2. 获得消息的id和内容
batchId = message.getId();
List<CanalEntry.Entry> entryList = message.getEntries();
// 2.1 如果存在消息且内容不会空
if (batchId != -1 && ! entryList.isEmpty()) {
entryList.forEach(entry -> {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 解析处理
publishCanalEvent(entry);
}
});
// 3. 手动 ack 消费的 binlog 消息
canalConnector.ack(batchId);
}
} catch (Exception e) {
log.warn(e.getLocalizedMessage(), e);
// 回滚消息
if (batchId != -1) {
canalConnector.rollback(batchId);
}
}
}
private void publishCanalEvent(CanalEntry.Entry entry) {
// 1.1 获得该 entry 的类型: INSERT/UPDATE/DELETE
CanalEntry.EventType eventType = entry.getHeader().getEventType();
// 1.2 获得该 entry 涉及的 数据库
String database = entry.getHeader().getSchemaName();
// 1.3 获得该 entry 涉及的 表
String table = entry.getHeader().getTableName();
CanalEntry.RowChange change = null;
try {
change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
return;
}
// 1个binlog event事件可对应多条变更,比如批处理
change.getRowDatasList().forEach(rowData -> {
List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
// 获得主键所在列
String primaryKey = "id";
CanalEntry.Column idColumn = null;
for (CanalEntry.Column column : columns) {
if (column.getIsKey() && primaryKey.equals(column.getName())) {
idColumn = column;
}
}
// 映射列名
Map<String, Object> map = parseColumnsToMap(columns);
log.info("eventType: {}, database: {}, table: {}, map:{}", eventType, database, table, map);
if (idColumn != null ) {
log.info("primaryKey: {}, primaryKeyValue: {}", idColumn.getName(), idColumn.getValue());
}
});
}
/**
* 将行记录映射为 map
* 列名: 列值
*
* @param columns
* @return
*/
Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
Map<String, Object> map = new HashMap<>(columns.size());
columns.forEach(column -> {
if (column == null) {
return;
}
map.put(column.getName(), column.getValue());
});
return map;
}
}
CanalEntry.Entry
的结构
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [发生的变更]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组]
afterColumns [Column类型的数组]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为文本]