1、拉取
    CanalConnector.getWithoutAck(batchSize);

    1. @Component
    2. public class CannalClient implements InitializingBean {
    3. private final static int BATCH_SIZE = 1000;
    4. @Override
    5. public void afterPropertiesSet() throws Exception {
    6. // 创建链接
    7. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
    8. try {
    9. //打开连接
    10. connector.connect();
    11. //订阅数据库表,全部表
    12. connector.subscribe(".*\\..*");
    13. //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
    14. connector.rollback();
    15. while (true) {
    16. //尝试从master那边拉去数据batchSize条记录,有多少取多少
    17. Message message = connector.getWithoutAck(BATCH_SIZE);
    18. //获取批量ID
    19. long batchId = message.getId();
    20. //获取批量的数量
    21. int size = message.getEntries().size();
    22. //如果没有数据
    23. if (batchId == -1 || size == 0) {
    24. try {
    25. //线程休眠2秒
    26. Thread.sleep(2000);
    27. } catch (InterruptedException e) {
    28. e.printStackTrace();
    29. }
    30. } else {
    31. //如果有数据,处理数据
    32. printEntry(message.getEntries());
    33. }
    34. //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
    35. connector.ack(batchId);
    36. }
    37. } catch (Exception e) {
    38. e.printStackTrace();
    39. } finally {
    40. connector.disconnect();
    41. }
    42. }
    43. /**
    44. * 打印canal server解析binlog获得的实体类信息
    45. */
    46. private static void printEntry(List<Entry> entrys) {
    47. for (Entry entry : entrys) {
    48. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
    49. //开启/关闭事务的实体类型,跳过
    50. continue;
    51. }
    52. //RowChange对象,包含了一行数据变化的所有特征
    53. //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
    54. RowChange rowChage;
    55. try {
    56. rowChage = RowChange.parseFrom(entry.getStoreValue());
    57. } catch (Exception e) {
    58. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
    59. }
    60. //获取操作类型:insert/update/delete类型
    61. EventType eventType = rowChage.getEventType();
    62. //打印Header信息
    63. System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
    64. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
    65. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
    66. eventType));
    67. //判断是否是DDL语句
    68. if (rowChage.getIsDdl()) {
    69. System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
    70. }
    71. //获取RowChange对象里的每一行数据,打印出来
    72. for (RowData rowData : rowChage.getRowDatasList()) {
    73. //如果是删除语句
    74. if (eventType == EventType.DELETE) {
    75. printColumn(rowData.getBeforeColumnsList());
    76. //如果是新增语句
    77. } else if (eventType == EventType.INSERT) {
    78. printColumn(rowData.getAfterColumnsList());
    79. //如果是更新的语句
    80. } else {
    81. //变更前的数据
    82. System.out.println("------->; before");
    83. printColumn(rowData.getBeforeColumnsList());
    84. //变更后的数据
    85. System.out.println("------->; after");
    86. printColumn(rowData.getAfterColumnsList());
    87. }
    88. }
    89. }
    90. }
    91. private static void printColumn(List<Column> columns) {
    92. for (Column column : columns) {
    93. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
    94. }
    95. }
    96. }

    2、监听

    
    @CanalEventListener
    public class MyEventListener {
    
        @InsertListenPoint
        public void onEvent(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
            //do something...
            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
            for (CanalEntry.Column column : afterColumnsList) {
                System.out.println(column.getName() + ":" + column.getValue());
            }
        }
    
        @UpdateListenPoint
        public void onEvent1(CanalEntry.RowData rowData) {
            //do something...
        }
    
        @DeleteListenPoint
        public void onEvent3(CanalEntry.EventType eventType) {
            //do something...
        }
    
        @ListenPoint(destination = "example", schema = "canal-test", table = {"t_user", "test_table"}, eventType = CanalEntry.EventType.UPDATE)
        public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
            //do something...
        }
    }