Linux下Canal安装和使用

Canal安装和使用包括以下三部分内容:
l canal安装及配置
l kafka安装及配置
l 客户端消费kafka消息
canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。基于日志增量订阅和消费的业务包括:
l 数据库镜像
l 数据库实时备份
l 索引构建和实时维护(拆分异构索引、倒排索引等)
l 业务 cache 刷新
l 带业务逻辑的增量数据处理
注意:当前的canal支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

一、canal安装及配置

环境版本
· 操作系统:CentOS Linux release 7.2.1511 (Core)
· MySQL版本: mysql5.7(安装略)
· canal版本:canal-1.1.4

1. MySQL配置

(1) 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

  1. [mysqld]
  1. log-bin=mysql-bin # 开启 binlog
  1. binlog-format=ROW # 选择 ROW 模式
  1. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限,不需要任何权限或者 binlog 设置,可以直接跳过这一步。
(2) 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:
CREATE USER canal IDENTIFIED BY ‘canal’; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;— GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ;
FLUSH PRIVILEGES;

2. 安装、配置及启动

(1) 下载 canal, 访问 https://github.com/alibaba/canal/releases页面 , 选择需要的包下载, 如以 1.1.4 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
(2) 解压缩

  1. mkdir /tmp/canal
  1. tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal

解压完成后,进入/usr/local/canal目录,可以看到如下结构:
Linux下Canal安装和使用 - 图1
(3) 配置修改

  1. vi conf/example/instance.properties

mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .*\\..*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1。
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false。
vi conf/canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.mq.servers = 172.16.0.12:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
#canal.mq.kafka.kerberos.enable = false
#canal.mq.kafka.kerberos.krb5FilePath = “../conf/kerberos/krb5.conf”
#canal.mq.kafka.kerberos.jaasFilePath = “../conf/kerberos/jaas.conf”
(4) 启动

  1. sh bin/startup.sh

(5) 查看 server 日志

  1. vi logs/canal/canal.log
  1. 2020-12-11 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
  1. 2020-12-11 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.16.0.12:9092]
  1. 2020-12-11 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

(6) 关闭

  1. sh bin/stop.sh

二、kafka安装及配置

1. 安装Zookeeper

环境版本
· 操作系统:CentOS Linux release 7.2.1511 (Core)
· java版本: jdk1.8(安装略)
· zookeeper版本: zookeeper-3.5.8
(1) 安装zookeeper-3.5.8
官网下载地址:[http://www.apache.org/dyn/closer.cgi/zookeeper](http://www.apache.org/dyn/closer.cgi/zookeeper)

  1. wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
  1. tar zxvf apache-zookeeper-3.5.8-bin.tar.gz
  1. mv zookeeper-3.5.8-bin /usr/local/zookeeper

(2) 修改环境变量
编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:

  1. # ZooKeeper Env
  1. export ZOOKEEPER_HOME=/usr/local/zookeeper
  1. export PATH=$PATH:$ZOOKEEPER_HOME/bin

运行以下命令使环境变量生效: source /etc/profile``。
(3) 重命名配置文件
初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg, zoo.cfg

  1. mv $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

(4) 单机模式—修改配置文件
创建目录/usr/local/zookeeper/data/usr/local/zookeeper/logs 修改配置文件

  1. tickTime=2000
  1. initLimit=10
  1. syncLimit=5
  1. dataDir=/usr/local/zookeeper/data
  1. dataLogDir=/usr/local/zookeeper/logs
  1. clientPort=2181

(5)启动 ZooKeeper 服务

  1. # cd /usr/local/zookeeper/zookeeper-3.5.8/bin
  1. # ./zkServer.sh start
  1. ZooKeeper JMX enabled by default
  1. Using config: /usr/local/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
  1. Starting zookeeper ... STARTED
  1. zkServer.sh status
  1. ZooKeeper JMX enabled by default
  1. Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
  1. Mode: follower

(6) 停止 ZooKeeper 服务
想要停止 ZooKeeper 服务, 可以使用如下命令:

  1. # cd /usr/local/zookeeper/zookeeper-3.5.8/bin
  1. # ./zkServer.sh stop
  1. ZooKeeper JMX enabled by default
  1. Using config: /usr/local/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
  1. Stopping zookeeper ... STOPPED

2. 安装Kafka

环境版本
· 操作系统:CentOS Linux release 7.2.1511 (Core)
· java版本: jdk1.8(安装略)
· kafka 版本: kafka_2.11-2.4.1.tgz
官网下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
(1) 下载压缩包, 复制到固定目录并解压到官网下载压缩包

  1. wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.4.1/kafka_2.11-2.4.1.tgz
  1. mkdir -p /usr/local/kafka
  1. cp kafka_2.11-2.4.1.tgz /usr/local/kafka
  1. tar -zxvf kafka_2.11-2.4.1.tgz

(2) 修改配置文件
vim /usr/local/kafka/kafka_``2.11-``2.4.1``/config/server.properties 修改参数

  1. zookeeper.connect=localhost:2181
  1. listeners=PLAINTEXT://0.0.0.0:9092
  1. advertised.listeners=PLAINTEXT://117.186.248.41:9092 #外网连接ip(配置时删除注释)
  1. # ...

(3) 启动server

  1. bin/kafka-server-start.sh config/server.properties

查看创建了多少个topic,可以使用以下命令进行查看:

  1. bin/kafka-topics.sh --list --zookeeper localhost:2181

接下来通过生产者进行发送消息:

  1. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic

接着创建一个消费者来进行接收消息:

  1. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning

三、客户端消费kafka消息

package com.single.kafka;

import org.apache.kafka.clients.consumer.CommitFailedException;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;

import java.util.Properties;

/**

@author Javame
@date 2020/12/9 9:25
*/
public class MyConsumer {

  1. private static KafkaConsumer<String, String> _consumer_;
  2. private static Properties _properties_;<br />
  3. //初始化
  4. static {
  5. _properties _= new Properties();
  6. //建立连接broker的地址
  7. _properties_.put("bootstrap.servers", "117.186.248.41:9092");
  8. //kafka反序列化
  9. _properties_.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  10. _properties_.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  11. //指定消费者组
  12. _properties_.put("group.id", "g4");
  13. }<br />
  14. //自动提交位移:由consume自动管理提交
  15. private static void generalConsumeMessageAutoCommit() {
  16. //配置
  17. _properties_.put("enable.auto.commit", true);
  18. _consumer _= new KafkaConsumer<>(_properties_);
  19. //指定topic
  20. _consumer_.subscribe(Collections._singleton_("cpay"));
  21. try {
  22. while (true) {
  23. boolean flag = true;
  24. //拉取信息,超时时间100ms
  25. ConsumerRecords<String, String> records = _consumer_.poll(1000);
  26. //遍历打印消息
  27. for (ConsumerRecord<String, String> record : records) {
  28. System._out_.println(String._format_(
  29. "topic = %s, partition = %s, key = %s, value = %s",
  30. record.topic(), record.partition(), record.key(), record.value()
  31. ));
  32. //消息发送完成
  33. if (record.value().equals("done")) {
  34. flag = false;
  35. }
  36. }<br /> if (!flag) {
  37. break;
  38. }
  39. }<br /> } finally {
  40. _consumer_.close();
  41. }
  42. }<br /> public static void main(String[] args) {
  43. //自动提交位移
  44. _generalConsumeMessageAutoCommit_();<br /> }

}




附录
canal使用过程:
1.启动MySQL(CentOS开机自启动)
2.启动zookeeper ./zkServer.sh start
3.启动kafka bin/kafka-server-start.sh config/server.properties
4.启动canal sh bin/stop.sh sh bin/startup.sh