1. 在之前的文章中,我们已经学习了如何使用Canal-ServerCanal-Admin。并配置了相关server配置、Instance配置。<br />那么现在的binlog数据已经顺利的可以取出来了。之后就是将这些binlog加工处理输出到目的地了。我们可以通过canal-clienttcp)直接取binlog日志然后处理,或者取消息队列中的binlog日志进行处理。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1609516/1634356938004-e7315643-2e4b-48e5-8705-e5e0e05a322a.png#clientId=ucf59f3c1-685b-4&from=paste&height=653&id=u699370d6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=653&originWidth=1250&originalType=binary&ratio=1&size=462471&status=done&style=shadow&taskId=u09409207-70e6-4ca6-bc86-23c3877ba34&width=1250)<br />这样的话需要程序员额外写代码去处理binlog。于是Canal提供了一个叫CanalAdpter的组件。他的作用就是替代图中canal-client和consumer的位置。完成从binlog->目的地的数据输送和处理逻辑。<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1609516/1634357415111-9bedc42f-43c3-46d2-a5c5-956b37523dc7.png#clientId=ucf59f3c1-685b-4&from=paste&height=655&id=uc1e44fa4&margin=%5Bobject%20Object%5D&name=image.png&originHeight=655&originWidth=1256&originalType=binary&ratio=1&size=483285&status=done&style=none&taskId=u554ad5cd-b54e-4a7b-a571-14c021eef5d&width=1256)

1.同步ES

  1. 需求

数据表:student和class。要求每新增一条学生记录同步到对应的ES中(学生和班级信息)

  1. 环境

elasticsearch7.7.1
canal-admin-1.1.5
canal-server-1.1.5
canal-adapter-1.1.5
mysql5.7.0

1.1数据库搭建

  1. create database demo;
  2. -- Table structure for class
  3. -- ----------------------------
  4. DROP TABLE IF EXISTS `class`;
  5. CREATE TABLE `class` (
  6. `id` int(11) NOT NULL,
  7. `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  8. PRIMARY KEY (`id`) USING BTREE
  9. ) ENGINE = InnoDB
  10. -- Table structure for student
  11. -- ----------------------------
  12. DROP TABLE IF EXISTS `student`;
  13. CREATE TABLE `student` (
  14. `id` int(11) NOT NULL,
  15. `name` varchar(30) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  16. `birthday` date NULL DEFAULT NULL,
  17. `class_id` int(11) NULL DEFAULT NULL,
  18. PRIMARY KEY (`id`) USING BTREE
  19. ) ENGINE = InnoDB

1.2启动并配置canal-server

  1. 启动canal-server

canal-server的启动在前面的canal-admin文章中写过。这里就不啰嗦了

  1. 配置

配置这边使用tcp模式

1.3canal-adapter操作步骤

  1. 下载并解压压缩包
  2. 拉取canal源码,修改后重新打包ex7相关jar

https://blog.csdn.net/qq_42569136/article/details/116059493

  1. 修改服务配置(官方wiki上的配置有点老) ```yaml server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null

canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: consumerProperties:

  1. # canal tcp consumer
  2. canal.tcp.server.host: 1921.68.2.5:11111 #如果是集群就一定不配,但是1.1.5好像用集群不行
  3. canal.tcp.zookeeper.hosts:
  4. canal.tcp.batch.size: 500
  5. canal.tcp.username:
  6. canal.tcp.password:

srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true username: root password: 123456 canalAdapters:

  • instance: test # instance文件名或topic名称 canal instance Name or mq topic name groups:
    • groupId: g1 outerAdapters:
      • name: logger #输出到日志中
      • name: es7 #输入到es7中 hosts: 192.168.2.1:9300 # 注意一定是这个地址 http://127.0.0.1:9200 for rest mode properties: mode: transport # or rest

        security.auth: test:123456 # only used for rest mode

        cluster.name: elasticsearch
  1. 4. 修改es同步配置
  2. ```yaml
  3. dataSourceKey: defaultDS
  4. destination: test
  5. groupId: g1
  6. esMapping:
  7. _index: mytest_student
  8. _id: _id
  9. # upsert: true
  10. # pk: id
  11. sql: "select a.id as _id,a.id,a.name as stuName,a.birthday as stuBirthday,a.class_id as stuClassId,b.name as stuClassName from student as a left join class as b on a.class_id = b.id"
  12. # objFields:
  13. # _labels: array:;
  14. etlCondition: "where a.c_time>={}"
  15. commitBatch: 3000
  1. 启动es并创建索引

    1. PUT /mytest_student
    2. {
    3. "settings" : {
    4. "number_of_shards" : 1
    5. },
    6. "mappings" : {
    7. "properties" : {
    8. "id":{ "type" : "long" },
    9. "stuName": { "type" : "text" },
    10. "stuBirthday" : { "type" : "date" },
    11. "stuClassId": { "type" : "long" },
    12. "stuClassName" : { "type" : "text" }
    13. }
    14. }
    15. }
  2. 结果演示

image.png