使用canal client-adapter完成mysql到es数据同步教程

本文为我在学习canal的client-adapter的过程中所记录下来的一些知识点与操作步骤,虽然canal官方也有对应的文档但是我在看官方的文档时感觉官方的wiki有的操作步骤写的不是很清楚,特此记录与君共勉!

官方clientAdapter文档:https://github.com/alibaba/canal/wiki/ClientAdapter
canal的client-adapter是为了让用户能快速的运行canal而开发的一个模块,通过它可以快速地完成从mysql到其他数据源的数据同步功能,本文将演示adapter中elasticsearch部分功能

环境说明

canal 版本

为了体验新功能的特性,这里采用源码的方式进行项目的运行。选择当前master分支的代码进行本地运行(当前时间为2019年8月31号

mysql版本

mysql采用的是5.7.19版本,运行于docker内。
其运行脚本为:

  1. docker pull mysql:5.7.19
  2. docker run -p 3306:3306 -v $PWD/mysql:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=root --name mysql5719 -d mysql:5.7.19
  3. # docker中将进入mysql
  4. # 查询docker所有容器 docker container ls --all
  5. # 查询docker当前运行容器 docker ps
  6. # 刚刚启动的时候设置了容器名称mysql5719,-d代表后台运行,使用命令可以进入mysql的docker
  7. # 默认账号密码为root
  8. docker exec -it mysql5719 bash
  9. # 退出命令
  10. exit

canal环境安装

开启mysql的bin_log

运行时需要注意确保mysql开启了bin_log,因为canal的原理是基于bin_log来实现的。
验证方法为:

  1. mysql> show variables like 'binlog_format';
  2. +---------------+-------+
  3. | Variable_name | Value |
  4. +---------------+-------+
  5. | binlog_format | ROW |
  6. +---------------+-------+
  7. mysql> show variables like 'log_bin';
  8. +---------------+-------+
  9. | Variable_name | Value |
  10. +---------------+-------+
  11. | log_bin | ON |
  12. +---------------+-------+

如上上面的语句返回的为ROW和ON,则说明是开启了bin_log的

如果没有开启可以参考canal的wiki,地址为:https://github.com/alibaba/canal/wiki/AdminGuide

  1. [mysqld]
  2. log-bin=mysql-bin #添加这一行就ok
  3. binlog-format=ROW #选择row模式
  4. server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

在docker中可通过进入容器的bash进行配置:

  1. docker exec -it mysql5719 bash
  2. echo '[mysqld]' >> /etc/mysql/conf.d/mysql.cnf
  3. echo 'log-bin=mysql-bin' >> /etc/mysql/conf.d/mysql.cnf
  4. echo 'binlog-format=ROW' >> /etc/mysql/conf.d/mysql.cnf
  5. echo 'server-id=123454' >> /etc/mysql/conf.d/mysql.cnf

es安装

es选择6.8.1版本,之所以选择6.8.1版本是因为这是目前还算比较靠前的es版本,同时也是许多虚拟云支持的es版本
在这里测试也采用docker来进行安装
脚本为:

  1. docker pull docker.elastic.co/elasticsearch/elasticsearch:6.8.1
  2. docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:6.8.1

kibana安装

为方便看到es的数据,这里再安装一个kibana
安装时同样安装在docker内,并让其link到es的容器内。脚本为:

  1. docker pull docker.elastic.co/kibana/kibana:6.8.1
  2. docker run --link 41f6e52d3c8b:elasticsearch -p 5601:5601 docker.elastic.co/kibana/kibana:6.8.1
  3. # 查询docker所有容器可以看到容器id
  4. docker container ls --all

其中上面的41f6e52d3c8b为elasticsearch的容器id

canal server安装与运行

canal的运行可以直接参考官方的WIKI,其地址为:https://github.com/alibaba/canal/wiki/Docker-QuickStart
下载好canal server

  1. docker pull canal/canal-server

再下载canal的运行脚本

  1. sudo wget https://github.com/alibaba/canal/blob/master/docker/run.sh
  2. # 国内无法下载该文件,window中下载下来存为sh脚本
  3. # 执行sh ./xxx.sh出现:“Syntax error: “(” unexpected”的解决方法,选no
  4. sudo dpkg-reconfigure dash
  5. # "line2: $'\r': 未找到命令" , win下的脚本放在liux下执行有问题
  6. sudo apt-get install dos2unix
  7. dos2unix **.sh

然后根据条件进行启动即可,如我这里的:

  1. run.sh -e canal.instance.master.address=127.0.0.1:3306 \
  2. -e canal.destinations=example \
  3. -e canal.instance.dbUsername=root \
  4. -e canal.instance.dbPassword=root \
  5. -e canal.instance.connectionCharset=UTF-8 \
  6. -e canal.instance.tsdb.enable=true \
  7. -e canal.instance.gtidon=false \
  8. -e canal.instance.filter.regex=.*\\\..*

canal.instance.filter.regex参数代表mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠()
常见例子:

mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠() 常见例子:

  1. 所有表:. or .\…*
  2. canal schema下所有表: canal\…*
  3. canal下的以canal打头的表:canal.canal.*
  4. canal schema下的一张表:canal.test1
  5. 多个规则组合使用:canal\…*,mysql.test1,mysql.test2 (逗号分隔)

环境配置完毕后的验证

  • mysql检查3306端口,检查bin_log是否开启
  • es检查9200,9300端口是否开启
  • kibana检查5601端口是否开启
  • canal server检查11111和11112端口是否开启,1.1.4版本的会开启11110端口

如果上面的都没有问题的话,则环境部分OK了

mysql中创建测试库和测试表及数据

为了演示的方便,这里先将mysql的数据创建好,这里简单创建一下:

  1. 建库

    1. CREATE SCHEMA `test` ;
  2. 建表导数据 ``` SET FOREIGN_KEY_CHECKS=0;


— Table structure for stu_info


DROP TABLE IF EXISTS stu_info; CREATE TABLE stu_info ( id int(11) NOT NULL AUTO_INCREMENT, name varchar(45) DEFAULT NULL, age int(11) DEFAULT NULL, update_time datetime DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;


— Records of stu_info


INSERT INTO stu_info VALUES (‘1’, ‘aaa’, ‘11’, ‘2019-08-31 03:23:11’); INSERT INTO stu_info VALUES (‘2’, ‘bbb’, ‘12’, ‘2019-08-31 04:33:22’); INSERT INTO stu_info VALUES (‘3’, ‘ccc’, ‘13’, ‘2019-08-31 05:43:33’);

  1. <a name="3c984ecd"></a>
  2. #### es索引验证及创建目标索引
  3. 1. 测试可用性<br />访问kibana,输入查询语句列出目前系统中的所有索引

GET /_cat/indices

  1. ![](https://cdn.nlark.com/yuque/0/2020/png/306561/1596855463587-24051219-ee24-4fd4-a0f6-e3f91f517ad5.png#align=left&display=inline&height=262&margin=%5Bobject%20Object%5D&originHeight=262&originWidth=1753&size=0&status=done&style=none&width=1753)
  2. 2. 创建索引
  3. 因为es连接采用的是rest客户端,需要先将索引创建好,否则会报错

put mytest_user { “mappings”: { “doc”: { “properties”: { “name”: { “type”: “text” }, “age”: { “type”: “long” }, “update_time”: { “type”: “date” } } } } }

  1. <a name="75deba97"></a>
  2. ## canal adapter在idea中测试运行
  3. 这里为了研究canal adapter方便决定在idea中进行运行<br />首选确保idea中已经导入了canal的源码(在1.1.4的下载源码包)
  4. <a name="556f1be8"></a>
  5. ### canal maven install
  6. 将canal的源码导入到idea中后,找到manven模块中有root的那个模块,然后点击install进行安装<br />也就是下图中的这个:<br />![](https://cdn.nlark.com/yuque/0/2020/png/306561/1596855463621-6e2df1f0-401f-4b55-86ae-dcf53a8a9244.png#align=left&display=inline&height=647&margin=%5Bobject%20Object%5D&originHeight=647&originWidth=1865&size=0&status=done&style=none&width=1865)<br />待安装完成后,会在对应的项目的target目录下产生相应的运行包,如果不想在开发工具中运行的话,直接拷贝对应的包即可
  7. <a name="dddbb060"></a>
  8. ### canal adapter运行
  9. <a name="796b7043"></a>
  10. #### launcher的application.yml配置
  11. 找到canal adapter的模块,修改application.yml配置文件<br />主要修改点为:
  12. 1. 指定canal server的地址,并采用tcp方式进行连接
  13. 1. 配置mysql数据源的连接信息
  14. 1. 开启esAdapter并配置es的连接信息
  15. 我这里是这样配置的:

server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null

canal.conf: mode: tcp canalServerHost: 192.168.1.66:11111 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://192.168.1.66:3306/test?useUnicode=true username: root password: 123456 canalAdapters:

  • instance: example # canal instance Name or mq topic name groups:
    • groupId: g1 outerAdapters:
      • name: logger
      • name: es hosts: 192.168.1.66:9300 # 127.0.0.1:9200 for rest mode properties: mode: transport # or rest cluster.name: docker-cluster ```

其中192.168.1.66是运行docker虚拟机的ip地址

es adapter配置

在launcher项目中的配置文件下创建es目录并加入所需要同步的配置文件即可
如我这里的mytest_user.yml

  1. dataSourceKey: defaultDS
  2. destination: example
  3. groupId: g1
  4. esMapping:
  5. _index: mytest_user
  6. _type: doc
  7. _id: _id
  8. upsert: true
  9. # pk: id
  10. sql: "SELECT a.id as _id,a.name,a.age,a.update_time from stu_info a"
  11. # objFields:
  12. # _labels: array:;
  13. etlCondition: "where a.update_time>={}"
  14. commitBatch: 3000

Cannal在docker下安装 - 图1

launcher adapter运行

配置完毕后,直接运行launcher这个springBoot项目即可,也就是运行CanalAdapterApplication这个类就可以了
Cannal在docker下安装 - 图2
当看日志提示启动成功后就代表启动成功了!

canal测试

当canal adapter启动完毕后就可以进行测试了
canal除了能实现自动增量同步数据的功能外还具有etl的功能
其同步的实现细节会根据数据量的大小自动采用多线程进行同步,也是采用的游标的方式进行查询的,在提高了性能的同时也确保了不容易发生oom,详见博文:canal源码解析之esAdapter etl功能

canal全表同步(etl功能,手动触发)

launcher项目是一个spring boot项目,在其中的rest包下有一个controller类,里面提供了一些接口,其中一个用于全量同步数据的接口

  1. /**
  2. * ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST
  3. *
  4. * @param type 类型 hbase, es
  5. * @param task 任务名对应配置文件名 mytest_person2.yml
  6. * @param params etl where条件参数, 为空全部导入
  7. */
  8. @PostMapping("/etl/{type}/{task}")
  9. public EtlResult etl(@PathVariable String type, @PathVariable String task,
  10. @RequestParam(name = "params", required = false) String params) {
  11. return etl(type, null, task, params);
  12. }

类上也有注释,我们按照注释的内容发送一个http请求即可

curl http://192.168.1.100:8081/etl/es/mytest_user.yml -X POST

其中192.168.1.100为我本地电脑的ip,mytest_user.yml为es目录下的配置文件
运行后就可以让mytest_user.yml配置的数据表的所有数据全同步到es中了.
Cannal在docker下安装 - 图3
然后再在kibana中看一下es中的数据:

get mytest_user/_search

  1. {
  2. "took" : 19,
  3. "timed_out" : false,
  4. "_shards" : {
  5. "total" : 5,
  6. "successful" : 5,
  7. "skipped" : 0,
  8. "failed" : 0
  9. },
  10. "hits" : {
  11. "total" : 3,
  12. "max_score" : 1.0,
  13. "hits" : [
  14. {
  15. "_index" : "mytest_user",
  16. "_type" : "doc",
  17. "_id" : "2",
  18. "_score" : 1.0,
  19. "_source" : {
  20. "name" : "bbb",
  21. "age" : 12,
  22. "update_time" : "2019-08-31T05:43:50+08:00"
  23. }
  24. },
  25. {
  26. "_index" : "mytest_user",
  27. "_type" : "doc",
  28. "_id" : "1",
  29. "_score" : 1.0,
  30. "_source" : {
  31. "name" : "aaa",
  32. "age" : 13,
  33. "update_time" : "2019-08-30T02:43:41+08:00"
  34. }
  35. },
  36. {
  37. "_index" : "mytest_user",
  38. "_type" : "doc",
  39. "_id" : "3",
  40. "_score" : 1.0,
  41. "_source" : {
  42. "name" : "ccc",
  43. "age" : 11,
  44. "update_time" : "2019-08-30T05:39:38+08:00"
  45. }
  46. }
  47. ]
  48. }
  49. }

canal条件同步(etl功能,手动触发)

如果想执行从某一个时刻的数据同步,在上面的测试URL后加上对应的参数就可以了,其参数由es的配置项决定的,也就是上面配置文件中的etlCondition,比如我这里写的where a.update_time>={},而{}之内的就是条件

  1. dataSourceKey: defaultDS
  2. destination: example
  3. groupId: g1
  4. esMapping:
  5. _index: mytest_user
  6. _type: doc
  7. _id: _id
  8. upsert: true
  9. # pk: id
  10. sql: "SELECT a.id as _id,a.name,a.age,a.update_time from stu_info a"
  11. # objFields:
  12. # _labels: array:;
  13. etlCondition: "where a.update_time>={}"
  14. commitBatch: 3000

那么如果只想同步stu_info表中update_time为2019-08-31 00:00:00之后的数据,那么这里执行的请求为:

curl http://192.168.1.11:8081/etl/es/mytest_user.yml -X POST -d “params=2019-08-31 00:00:00”

响应为:

  1. {"succeeded":true,"resultMessage":"导入ES 数据:3 条"}root@ubuntu:~# curl http://192.168.1.11:8081/etl/es/mytest_user.yml -X POST -d "params=2019-08-31 00:00:00"
  2. {"succeeded":true,"resultMessage":"导入ES 数据:1 条"}root@ubuntu:~#

这个controller的源码如下,想要操作其他功能,也可以自行看源码进行了解:

  1. /**
  2. * 适配器操作Rest
  3. *
  4. * @author rewerma @ 2018-10-20
  5. * @version 1.0.0
  6. */
  7. @RestController
  8. public class CommonRest {
  9. private static Logger logger = LoggerFactory.getLogger(CommonRest.class);
  10. private static final String ETL_LOCK_ZK_NODE = "/sync-etl/";
  11. private ExtensionLoader<OuterAdapter> loader;
  12. @Resource
  13. private SyncSwitch syncSwitch;
  14. @Resource
  15. private EtlLock etlLock;
  16. @Resource
  17. private AdapterCanalConfig adapterCanalConfig;
  18. @PostConstruct
  19. public void init() {
  20. loader = ExtensionLoader.getExtensionLoader(OuterAdapter.class);
  21. }
  22. /**
  23. * ETL curl http://127.0.0.1:8081/etl/rdb/oracle1/mytest_user.yml -X POST
  24. *
  25. * @param type 类型 hbase, es
  26. * @param key adapter key
  27. * @param task 任务名对应配置文件名 mytest_user.yml
  28. * @param params etl where条件参数, 为空全部导入
  29. */
  30. @PostMapping("/etl/{type}/{key}/{task}")
  31. public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
  32. @RequestParam(name = "params", required = false) String params) {
  33. OuterAdapter adapter = loader.getExtension(type, key);
  34. String destination = adapter.getDestination(task);
  35. String lockKey = destination == null ? task : destination;
  36. boolean locked = etlLock.tryLock(ETL_LOCK_ZK_NODE + type + "-" + lockKey);
  37. if (!locked) {
  38. EtlResult result = new EtlResult();
  39. result.setSucceeded(false);
  40. result.setErrorMessage(task + " 有其他进程正在导入中, 请稍后再试");
  41. return result;
  42. }
  43. try {
  44. boolean oriSwitchStatus;
  45. if (destination != null) {
  46. oriSwitchStatus = syncSwitch.status(destination);
  47. if (oriSwitchStatus) {
  48. syncSwitch.off(destination);
  49. }
  50. } else {
  51. // task可能为destination,直接锁task
  52. oriSwitchStatus = syncSwitch.status(task);
  53. if (oriSwitchStatus) {
  54. syncSwitch.off(task);
  55. }
  56. }
  57. try {
  58. List<String> paramArray = null;
  59. if (params != null) {
  60. paramArray = Arrays.asList(params.trim().split(";"));
  61. }
  62. return adapter.etl(task, paramArray);
  63. } finally {
  64. if (destination != null && oriSwitchStatus) {
  65. syncSwitch.on(destination);
  66. } else if (destination == null && oriSwitchStatus) {
  67. syncSwitch.on(task);
  68. }
  69. }
  70. } finally {
  71. etlLock.unlock(ETL_LOCK_ZK_NODE + type + "-" + lockKey);
  72. }
  73. }
  74. /**
  75. * ETL curl http://127.0.0.1:8081/etl/hbase/mytest_person2.yml -X POST
  76. *
  77. * @param type 类型 hbase, es
  78. * @param task 任务名对应配置文件名 mytest_person2.yml
  79. * @param params etl where条件参数, 为空全部导入
  80. */
  81. @PostMapping("/etl/{type}/{task}")
  82. public EtlResult etl(@PathVariable String type, @PathVariable String task,
  83. @RequestParam(name = "params", required = false) String params) {
  84. return etl(type, null, task, params);
  85. }
  86. /**
  87. * 统计总数 curl http://127.0.0.1:8081/count/rdb/oracle1/mytest_user.yml
  88. *
  89. * @param type 类型 hbase, es
  90. * @param key adapter key
  91. * @param task 任务名对应配置文件名 mytest_person2.yml
  92. * @return
  93. */
  94. @GetMapping("/count/{type}/{key}/{task}")
  95. public Map<String, Object> count(@PathVariable String type, @PathVariable String key, @PathVariable String task) {
  96. OuterAdapter adapter = loader.getExtension(type, key);
  97. return adapter.count(task);
  98. }
  99. /**
  100. * 统计总数 curl http://127.0.0.1:8081/count/hbase/mytest_person2.yml
  101. *
  102. * @param type 类型 hbase, es
  103. * @param task 任务名对应配置文件名 mytest_person2.yml
  104. * @return
  105. */
  106. @GetMapping("/count/{type}/{task}")
  107. public Map<String, Object> count(@PathVariable String type, @PathVariable String task) {
  108. return count(type, null, task);
  109. }
  110. /**
  111. * 返回所有实例 curl http://127.0.0.1:8081/destinations
  112. */
  113. @GetMapping("/destinations")
  114. public List<Map<String, String>> destinations() {
  115. List<Map<String, String>> result = new ArrayList<>();
  116. Set<String> destinations = adapterCanalConfig.DESTINATIONS;
  117. for (String destination : destinations) {
  118. Map<String, String> resMap = new LinkedHashMap<>();
  119. boolean status = syncSwitch.status(destination);
  120. String resStatus;
  121. if (status) {
  122. resStatus = "on";
  123. } else {
  124. resStatus = "off";
  125. }
  126. resMap.put("destination", destination);
  127. resMap.put("status", resStatus);
  128. result.add(resMap);
  129. }
  130. return result;
  131. }
  132. /**
  133. * 实例同步开关 curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
  134. *
  135. * @param destination 实例名称
  136. * @param status 开关状态: off on
  137. * @return
  138. */
  139. @PutMapping("/syncSwitch/{destination}/{status}")
  140. public Result etl(@PathVariable String destination, @PathVariable String status) {
  141. if (status.equals("on")) {
  142. syncSwitch.on(destination);
  143. logger.info("#Destination: {} sync on", destination);
  144. return Result.createSuccess("实例: " + destination + " 开启同步成功");
  145. } else if (status.equals("off")) {
  146. syncSwitch.off(destination);
  147. logger.info("#Destination: {} sync off", destination);
  148. return Result.createSuccess("实例: " + destination + " 关闭同步成功");
  149. } else {
  150. Result result = new Result();
  151. result.setCode(50000);
  152. result.setMessage("实例: " + destination + " 操作失败");
  153. return result;
  154. }
  155. }
  156. /**
  157. * 获取实例开关状态 curl http://127.0.0.1:8081/syncSwitch/example
  158. *
  159. * @param destination 实例名称
  160. * @return
  161. */
  162. @GetMapping("/syncSwitch/{destination}")
  163. public Map<String, String> etl(@PathVariable String destination) {
  164. boolean status = syncSwitch.status(destination);
  165. String resStatus;
  166. if (status) {
  167. resStatus = "on";
  168. } else {
  169. resStatus = "off";
  170. }
  171. Map<String, String> res = new LinkedHashMap<>();
  172. res.put("stauts", resStatus);
  173. return res;
  174. }
  175. }

增量同步测试(自动触发)

canal增量同步是通过监听mysql的bin log进行实现了,那么当数据表里的内容有变化时canal client就会从canal server处获取到监听的内容
这里我做几个测试来对数据进行验证

  1. 新增记录

    INSERT INTO test.stu_info (id, name, age, update_time) VALUES (‘4’, ‘ddd’, ‘13’, ‘2019-08-31 11:28:11’);

结果:数据插入后,es立即同步过去了

  1. 删除记录

    DELETE from stu_info where id=4

结果:数据执行了删除后es中id为4的数据也立即进行了删除

  1. 修改记录

    update stu_info set age =23 where id=3

结果:es中id为3的数据也立即进行了变更

  1. 修改表结构
    新增一列后
    结果:es数据无变化
  2. 修改表结构
    删除一列后
    结果:es数据无变化

canal admin监控(本地启动)

canal为了管理和监控的方便也提供了ui界面模块,其模块为canal-admin

导入canal-admin配置库

找到canal-admin-server模块资源目录下的canal_manager.sql,将其导入到mysql中

  1. CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
  2. USE `canal_manager`;
  3. SET NAMES utf8mb4;
  4. SET FOREIGN_KEY_CHECKS = 0;
  5. -- ----------------------------
  6. -- Table structure for canal_adapter_config
  7. -- ----------------------------
  8. DROP TABLE IF EXISTS `canal_adapter_config`;
  9. CREATE TABLE `canal_adapter_config` (
  10. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  11. `category` varchar(45) NOT NULL,
  12. `name` varchar(45) NOT NULL,
  13. `status` varchar(45) DEFAULT NULL,
  14. `content` text NOT NULL,
  15. `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  16. PRIMARY KEY (`id`)
  17. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  18. -- ----------------------------
  19. -- Table structure for canal_cluster
  20. -- ----------------------------
  21. DROP TABLE IF EXISTS `canal_cluster`;
  22. CREATE TABLE `canal_cluster` (
  23. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  24. `name` varchar(63) NOT NULL,
  25. `zk_hosts` varchar(255) NOT NULL,
  26. `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  27. PRIMARY KEY (`id`)
  28. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  29. -- ----------------------------
  30. -- Table structure for canal_config
  31. -- ----------------------------
  32. DROP TABLE IF EXISTS `canal_config`;
  33. CREATE TABLE `canal_config` (
  34. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  35. `cluster_id` bigint(20) DEFAULT NULL,
  36. `server_id` bigint(20) DEFAULT NULL,
  37. `name` varchar(45) NOT NULL,
  38. `status` varchar(45) DEFAULT NULL,
  39. `content` text NOT NULL,
  40. `content_md5` varchar(128) NOT NULL,
  41. `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  42. PRIMARY KEY (`id`),
  43. UNIQUE KEY `sid_UNIQUE` (`server_id`)
  44. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  45. -- ----------------------------
  46. -- Table structure for canal_instance_config
  47. -- ----------------------------
  48. DROP TABLE IF EXISTS `canal_instance_config`;
  49. CREATE TABLE `canal_instance_config` (
  50. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  51. `cluster_id` bigint(20) DEFAULT NULL,
  52. `server_id` bigint(20) DEFAULT NULL,
  53. `name` varchar(45) NOT NULL,
  54. `status` varchar(45) DEFAULT NULL,
  55. `content` text NOT NULL,
  56. `content_md5` varchar(128) DEFAULT NULL,
  57. `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  58. PRIMARY KEY (`id`),
  59. UNIQUE KEY `name_UNIQUE` (`name`)
  60. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  61. -- ----------------------------
  62. -- Table structure for canal_node_server
  63. -- ----------------------------
  64. DROP TABLE IF EXISTS `canal_node_server`;
  65. CREATE TABLE `canal_node_server` (
  66. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  67. `cluster_id` bigint(20) DEFAULT NULL,
  68. `name` varchar(63) NOT NULL,
  69. `ip` varchar(63) NOT NULL,
  70. `admin_port` int(11) DEFAULT NULL,
  71. `tcp_port` int(11) DEFAULT NULL,
  72. `metric_port` int(11) DEFAULT NULL,
  73. `status` varchar(45) DEFAULT NULL,
  74. `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  75. PRIMARY KEY (`id`)
  76. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  77. -- ----------------------------
  78. -- Table structure for canal_user
  79. -- ----------------------------
  80. DROP TABLE IF EXISTS `canal_user`;
  81. CREATE TABLE `canal_user` (
  82. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  83. `username` varchar(31) NOT NULL,
  84. `password` varchar(128) NOT NULL,
  85. `name` varchar(31) NOT NULL,
  86. `roles` varchar(31) NOT NULL,
  87. `introduction` varchar(255) DEFAULT NULL,
  88. `avatar` varchar(255) DEFAULT NULL,
  89. `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  90. PRIMARY KEY (`id`)
  91. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  92. SET FOREIGN_KEY_CHECKS = 1;
  93. -- ----------------------------
  94. -- Records of canal_user
  95. -- ----------------------------
  96. BEGIN;
  97. INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');
  98. COMMIT;
  99. SET FOREIGN_KEY_CHECKS = 1;

上面的脚本将会在mysql中建立一个名为canal_manager的数据库,并创建好canal库的基本数据表

canal-admin-server运行

修改canal-admin-server项目的application.yml,主要修改配置库的数据源,我这里的配置如下:

  1. server:
  2. port: 8089
  3. spring:
  4. jackson:
  5. date-format: yyyy-MM-dd HH:mm:ss
  6. time-zone: GMT+8
  7. spring.datasource:
  8. url: jdbc:mysql://192.168.1.61:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  9. username: root
  10. password: 123456
  11. driver-class-name: com.mysql.jdbc.Driver
  12. hikari:
  13. maximum-pool-size: 10
  14. minimum-idle: 1
  15. canal:
  16. adminUser: admin
  17. adminPasswd: admin

然后启动canal-admin-server的项目即可,即运行CanalAdminApplication,如果运行成功,则会开启8089端口遍可以进行访问了,如我这里的:
http://localhost:8089
Cannal在docker下安装 - 图4

默认账号为:admin 密码为:123456
登录后再手动添加之前的server节点,遍可对此canal server进行一监控和管理了

Cannal在docker下安装 - 图5

通过canal client-adapter中esAdapter的实践了解了canal的特性,通过canal可以快速完成mysql到其他数据源的数据同步,并支持全量和增量同步的功能,同是canal也提供了canal-admin-server作为ui界面可供用户对canal进行管理和监控,本文中为了方便部署的是单节点的canal,对于canal的高可用其官方也有对应的支持.

cannal admin监控(docker) [有问题]

  1. # 下载镜像
  2. docker pull canal/canal-admin:v1.1.4
  3. # wget https://github.com/alibaba/canal/blob/master/docker/run_admin.sh 官网文件,被墙
  4. # 类似canal server的安装,window上传后转换为linx中的文件,会出现没有最新版本的提示
  5. # 修改脚本中最后的cmd中启动镜像命令的canal/canal-admin指定为canal/canal-admin:v1.1.4
  6. sh run_admin.sh -e server.port=8089 \
  7. -e canal.adminUser=admin \
  8. -e canal.adminPasswd=admin
  9. # 查看容器
  10. docker ps
  11. # 访问地址:http://IP地址:8089/,账号admin,初始密码123456(登陆密码并不是admin,连接密码才是admin)

Cannal在docker下安装 - 图6

canal-admin的核心模型主要有:

  1. instance,对应canal-server里的instance,一个最小的订阅mysql的队列
  2. server,对应canal-server,一个server里可以包含多个instance
  3. 集群,对应一组canal-server,组合在一起面向高可用HA的运维

我们用docker安装启动canal-admin后,就会在本地创建一个数据库,ip是容器所在服务器本地ip,端口3306,这里我的是192.168.45.129:3306,账号密码都是canal