依赖

  1. <properties>
  2. <canal.version>1.1.4</canal.version>
  3. </properties>
  4. <!-- canal common -->
  5. <dependency>
  6. <groupId>com.alibaba.otter</groupId>
  7. <artifactId>canal.common</artifactId>
  8. <version>${canal.version}</version>
  9. </dependency>
  10. <!-- canal client -->
  11. <dependency>
  12. <groupId>com.alibaba.otter</groupId>
  13. <artifactId>canal.client</artifactId>
  14. <version>${canal.version}</version>
  15. </dependency>
  16. <!-- canal module -->
  17. <dependency>
  18. <groupId>com.alibaba.otter</groupId>
  19. <artifactId>canal.protocol</artifactId>
  20. <version>${canal.version}</version>
  21. </dependency>

连接 CanalServer

  1. @Configuration
  2. public class CanalClient implements DisposableBean {
  3. private static final Logger log = LoggerFactory.getLogger(CanalClient.class);
  4. private canalConnector;
  5. @Bean
  6. public CanalConnector getCanalConnector() {
  7. // 也可以用 CanalConnectors.newSingleConnector
  8. canalConnector = CanalConnectors.newClusterConnector(
  9. Lists.newArrayList(new InetSocketAddress("127.0.0.1", 11111)),
  10. // new InetSocketAddress("127.0.0.1", 11111),
  11. // 集群名称
  12. "example", // canal server 的实例
  13. "", // canal server 账号
  14. "" // canal server 密码
  15. );
  16. // 1. 连接
  17. canalConnector.connect();
  18. // 2. 指定 filter, 格式 {database}.{table}
  19. // canalConnector.subscribe("{search}.{test}");
  20. canalConnector.subscribe();
  21. // 3. 回滚寻找上次终端的位置
  22. canalConnector.rollback();
  23. return canalConnector;
  24. }
  25. @Override
  26. public void destroy() throws Exception {
  27. log.info("close CanalClient");
  28. }
  29. }

定时查询

  1. @Component
  2. public class CanalScheduling implements Runnable {
  3. private static final Logger log = LoggerFactory.getLogger(CanalScheduling.class);
  4. @Autowired
  5. private CanalConnector canalConnector;
  6. // 100ms 查询一次
  7. @Override
  8. @Scheduled(fixedDelay = 100)
  9. public void run() {
  10. long batchId = -1;
  11. try {
  12. // 1. 批量获取 binlog 消息(需要手动 ack)
  13. int batchSize = 100;
  14. Message message = canalConnector.getWithoutAck(batchSize);
  15. // 2. 获得消息的id和内容
  16. batchId = message.getId();
  17. List<CanalEntry.Entry> entryList = message.getEntries();
  18. // 2.1 如果存在消息且内容不会空
  19. if (batchId != -1 && ! entryList.isEmpty()) {
  20. entryList.forEach(entry -> {
  21. if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
  22. // 解析处理
  23. publishCanalEvent(entry);
  24. }
  25. });
  26. // 3. 手动 ack 消费的 binlog 消息
  27. canalConnector.ack(batchId);
  28. }
  29. } catch (Exception e) {
  30. log.warn(e.getLocalizedMessage(), e);
  31. // 回滚消息
  32. if (batchId != -1) {
  33. canalConnector.rollback(batchId);
  34. }
  35. }
  36. }
  37. private void publishCanalEvent(CanalEntry.Entry entry) {
  38. // 1.1 获得该 entry 的类型: INSERT/UPDATE/DELETE
  39. CanalEntry.EventType eventType = entry.getHeader().getEventType();
  40. // 1.2 获得该 entry 涉及的 数据库
  41. String database = entry.getHeader().getSchemaName();
  42. // 1.3 获得该 entry 涉及的 表
  43. String table = entry.getHeader().getTableName();
  44. CanalEntry.RowChange change = null;
  45. try {
  46. change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  47. } catch (InvalidProtocolBufferException e) {
  48. e.printStackTrace();
  49. return;
  50. }
  51. // 1个binlog event事件可对应多条变更,比如批处理
  52. change.getRowDatasList().forEach(rowData -> {
  53. List<CanalEntry.Column> columns = rowData.getAfterColumnsList();
  54. // 获得主键所在列
  55. String primaryKey = "id";
  56. CanalEntry.Column idColumn = null;
  57. for (CanalEntry.Column column : columns) {
  58. if (column.getIsKey() && primaryKey.equals(column.getName())) {
  59. idColumn = column;
  60. }
  61. }
  62. // 映射列名
  63. Map<String, Object> map = parseColumnsToMap(columns);
  64. log.info("eventType: {}, database: {}, table: {}, map:{}", eventType, database, table, map);
  65. if (idColumn != null ) {
  66. log.info("primaryKey: {}, primaryKeyValue: {}", idColumn.getName(), idColumn.getValue());
  67. }
  68. });
  69. }
  70. /**
  71. * 将行记录映射为 map
  72. * 列名: 列值
  73. *
  74. * @param columns
  75. * @return
  76. */
  77. Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {
  78. Map<String, Object> map = new HashMap<>(columns.size());
  79. columns.forEach(column -> {
  80. if (column == null) {
  81. return;
  82. }
  83. map.put(column.getName(), column.getValue());
  84. });
  85. return map;
  86. }
  87. }

CanalEntry.Entry 的结构

  1. Entry
  2. Header
  3. logfileName [binlog文件名]
  4. logfileOffset [binlog position]
  5. executeTime [发生的变更]
  6. schemaName
  7. tableName
  8. eventType [insert/update/delete类型]
  9. entryType [事务头BEGIN/事务尾END/数据ROWDATA]
  10. storeValue [byte数据,可展开,对应的类型为RowChange]
  11. RowChange
  12. isDdl [是否是ddl变更操作,比如create table/drop table]
  13. sql [具体的ddl sql]
  14. rowDatas [具体insert/update/delete的变更数据,可为多条,1binlog event事件可对应多条变更,比如批处理]
  15. beforeColumns [Column类型的数组]
  16. afterColumns [Column类型的数组]
  17. Column
  18. index
  19. sqlType [jdbc type]
  20. name [column name]
  21. isKey [是否为主键]
  22. updated [是否发生过变更]
  23. isNull [值是否为null]
  24. value [具体的内容,注意为文本]