依赖
<properties> <canal.version>1.1.4</canal.version></properties><!-- canal common --><dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.common</artifactId> <version>${canal.version}</version></dependency><!-- canal client --><dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>${canal.version}</version></dependency><!-- canal module --><dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>${canal.version}</version></dependency>
连接 CanalServer
@Configurationpublic class CanalClient implements DisposableBean { private static final Logger log = LoggerFactory.getLogger(CanalClient.class); private canalConnector; @Bean public CanalConnector getCanalConnector() { // 也可以用 CanalConnectors.newSingleConnector canalConnector = CanalConnectors.newClusterConnector( Lists.newArrayList(new InetSocketAddress("127.0.0.1", 11111)), // new InetSocketAddress("127.0.0.1", 11111), // 集群名称 "example", // canal server 的实例 "", // canal server 账号 "" // canal server 密码 ); // 1. 连接 canalConnector.connect(); // 2. 指定 filter, 格式 {database}.{table} // canalConnector.subscribe("{search}.{test}"); canalConnector.subscribe(); // 3. 回滚寻找上次终端的位置 canalConnector.rollback(); return canalConnector; } @Override public void destroy() throws Exception { log.info("close CanalClient"); }}
定时查询
@Componentpublic class CanalScheduling implements Runnable { private static final Logger log = LoggerFactory.getLogger(CanalScheduling.class); @Autowired private CanalConnector canalConnector; // 100ms 查询一次 @Override @Scheduled(fixedDelay = 100) public void run() { long batchId = -1; try { // 1. 批量获取 binlog 消息(需要手动 ack) int batchSize = 100; Message message = canalConnector.getWithoutAck(batchSize); // 2. 获得消息的id和内容 batchId = message.getId(); List<CanalEntry.Entry> entryList = message.getEntries(); // 2.1 如果存在消息且内容不会空 if (batchId != -1 && ! entryList.isEmpty()) { entryList.forEach(entry -> { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { // 解析处理 publishCanalEvent(entry); } }); // 3. 手动 ack 消费的 binlog 消息 canalConnector.ack(batchId); } } catch (Exception e) { log.warn(e.getLocalizedMessage(), e); // 回滚消息 if (batchId != -1) { canalConnector.rollback(batchId); } } } private void publishCanalEvent(CanalEntry.Entry entry) { // 1.1 获得该 entry 的类型: INSERT/UPDATE/DELETE CanalEntry.EventType eventType = entry.getHeader().getEventType(); // 1.2 获得该 entry 涉及的 数据库 String database = entry.getHeader().getSchemaName(); // 1.3 获得该 entry 涉及的 表 String table = entry.getHeader().getTableName(); CanalEntry.RowChange change = null; try { change = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); return; } // 1个binlog event事件可对应多条变更,比如批处理 change.getRowDatasList().forEach(rowData -> { List<CanalEntry.Column> columns = rowData.getAfterColumnsList(); // 获得主键所在列 String primaryKey = "id"; CanalEntry.Column idColumn = null; for (CanalEntry.Column column : columns) { if (column.getIsKey() && primaryKey.equals(column.getName())) { idColumn = column; } } // 映射列名 Map<String, Object> map = parseColumnsToMap(columns); log.info("eventType: {}, database: {}, table: {}, map:{}", eventType, database, table, map); if (idColumn != null ) { log.info("primaryKey: {}, primaryKeyValue: {}", idColumn.getName(), idColumn.getValue()); } }); } /** * 将行记录映射为 map * 列名: 列值 * * @param columns * @return */ Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) { Map<String, Object> map = new HashMap<>(columns.size()); columns.forEach(column -> { if (column == null) { return; } map.put(column.getName(), column.getValue()); }); return map; }}
CanalEntry.Entry 的结构
Entry Header logfileName [binlog文件名] logfileOffset [binlog position] executeTime [发生的变更] schemaName tableName eventType [insert/update/delete类型] entryType [事务头BEGIN/事务尾END/数据ROWDATA] storeValue [byte数据,可展开,对应的类型为RowChange] RowChange isDdl [是否是ddl变更操作,比如create table/drop table] sql [具体的ddl sql] rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理] beforeColumns [Column类型的数组] afterColumns [Column类型的数组] Column index sqlType [jdbc type] name [column name] isKey [是否为主键] updated [是否发生过变更] isNull [值是否为null] value [具体的内容,注意为文本]