Canal接收来的binlog,可以发送到MQ,也可以使用客户端进行TCP连接后从Canal中取出。本节就演示一下如何通过客户端方式取出binlog。
启动Canal
手动写一个Client
本节通过官方wiki来编写,大家可以下载github源码,到example目录中看相关最新Clinet使用方法。
总体步骤

导入canal依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency>
这里使用1.1.0的官方是这样用的,如果用1.1.5官方Demo好像有问题
编写Demo程序 ```java
import java.net.InetSocketAddress; import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message;
public class SimpleCanalClientExample {
public static void main(String args[]) {// 创建链接,指定Canal地址CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.2.5",11111), "example","", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();//订阅所有库所有表connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {// 获取指定数量的数据Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {//若有数据变动emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);//打印消息项printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}/*** 打印消息项* @param entrys*/private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {//从entry的Stroe值中解析RowChange内容rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();//打印变化内容System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}
} ```
- 启动程序并触发数据库变更
 
