1. Canal接收来的binlog,可以发送到MQ,也可以使用客户端进行TCP连接后从Canal中取出。本节就演示一下如何通过客户端方式取出binlog

启动Canal

按照官方网站的QuickStart快速启动Canal。

手动写一个Client

本节通过官方wiki来编写,大家可以下载github源码,到example目录中看相关最新Clinet使用方法。

总体步骤

  • 创建maven-quickstart工程
  • 导入canal依赖
  • 编写Demo程序
  • 启动程序并触发数据库变更

    操作细节

  • 创建maven-quickstart工程

image.png

  • 导入canal依赖

    1. <dependency>
    2. <groupId>com.alibaba.otter</groupId>
    3. <artifactId>canal.client</artifactId>
    4. <version>1.1.0</version>
    5. </dependency>

    这里使用1.1.0的官方是这样用的,如果用1.1.5官方Demo好像有问题

  • 编写Demo程序 ```java

import java.net.InetSocketAddress; import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message;

public class SimpleCanalClientExample {

  1. public static void main(String args[]) {
  2. // 创建链接,指定Canal地址
  3. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.2.5",
  4. 11111), "example",
  5. "", "");
  6. int batchSize = 1000;
  7. int emptyCount = 0;
  8. try {
  9. connector.connect();
  10. //订阅所有库所有表
  11. connector.subscribe(".*\\..*");
  12. connector.rollback();
  13. int totalEmptyCount = 120;
  14. while (emptyCount < totalEmptyCount) {
  15. // 获取指定数量的数据
  16. Message message = connector.getWithoutAck(batchSize);
  17. long batchId = message.getId();
  18. int size = message.getEntries().size();
  19. if (batchId == -1 || size == 0) {
  20. emptyCount++;
  21. System.out.println("empty count : " + emptyCount);
  22. try {
  23. Thread.sleep(1000);
  24. } catch (InterruptedException e) {
  25. }
  26. } else {
  27. //若有数据变动
  28. emptyCount = 0;
  29. // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
  30. //打印消息项
  31. printEntry(message.getEntries());
  32. }
  33. connector.ack(batchId); // 提交确认
  34. // connector.rollback(batchId); // 处理失败, 回滚数据
  35. }
  36. System.out.println("empty too many times, exit");
  37. } finally {
  38. connector.disconnect();
  39. }
  40. }
  41. /**
  42. * 打印消息项
  43. * @param entrys
  44. */
  45. private static void printEntry(List<CanalEntry.Entry> entrys) {
  46. for (CanalEntry.Entry entry : entrys) {
  47. if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
  48. continue;
  49. }
  50. CanalEntry.RowChange rowChage = null;
  51. try {
  52. //从entry的Stroe值中解析RowChange内容
  53. rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  54. } catch (Exception e) {
  55. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  56. e);
  57. }
  58. CanalEntry.EventType eventType = rowChage.getEventType();
  59. //打印变化内容
  60. System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
  61. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  62. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  63. eventType));
  64. for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
  65. if (eventType == CanalEntry.EventType.DELETE) {
  66. printColumn(rowData.getBeforeColumnsList());
  67. } else if (eventType == CanalEntry.EventType.INSERT) {
  68. printColumn(rowData.getAfterColumnsList());
  69. } else {
  70. System.out.println("-------&gt; before");
  71. printColumn(rowData.getBeforeColumnsList());
  72. System.out.println("-------&gt; after");
  73. printColumn(rowData.getAfterColumnsList());
  74. }
  75. }
  76. }
  77. }
  78. private static void printColumn(List<CanalEntry.Column> columns) {
  79. for (CanalEntry.Column column : columns) {
  80. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  81. }
  82. }

} ```

  • 启动程序并触发数据库变更