- Linux下Canal安装和使用
- 一、canal安装及配置
- mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .*\\..*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1。
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false。
vi conf/canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.mq.servers = 172.16.0.12:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
#canal.mq.kafka.kerberos.enable = false
#canal.mq.kafka.kerberos.krb5FilePath = “../conf/kerberos/krb5.conf”
#canal.mq.kafka.kerberos.jaasFilePath = “../conf/kerberos/jaas.conf”
(4) 启动 - 二、kafka安装及配置
- 三、客户端消费kafka消息
Linux下Canal安装和使用
Canal安装和使用包括以下三部分内容:
l canal安装及配置
l kafka安装及配置
l 客户端消费kafka消息
canal主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。基于日志增量订阅和消费的业务包括:
l 数据库镜像
l 数据库实时备份
l 索引构建和实时维护(拆分异构索引、倒排索引等)
l 业务 cache 刷新
l 带业务逻辑的增量数据处理
注意:当前的canal支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
一、canal安装及配置
环境版本
· 操作系统:CentOS Linux release 7.2.1511 (Core)
· MySQL版本: mysql5.7(安装略)
· canal版本:canal-1.1.4
1. MySQL配置
(1) 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限,不需要任何权限或者 binlog 设置,可以直接跳过这一步。
(2) 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:
CREATE USER canal IDENTIFIED BY ‘canal’; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’@’%’;— GRANT ALL PRIVILEGES ON . TO ‘canal’@’%’ ;
FLUSH PRIVILEGES;
2. 安装、配置及启动
(1) 下载 canal, 访问 https://github.com/alibaba/canal/releases页面 , 选择需要的包下载, 如以 1.1.4 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
(2) 解压缩
mkdir /tmp/canal
tar zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal
解压完成后,进入/usr/local/canal
目录,可以看到如下结构:
(3) 配置修改
vi conf/example/instance.properties
mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .*\\..*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1。
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false。
vi conf/canal.properties
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.mq.servers = 172.16.0.12:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
#canal.mq.kafka.kerberos.enable = false
#canal.mq.kafka.kerberos.krb5FilePath = “../conf/kerberos/krb5.conf”
#canal.mq.kafka.kerberos.jaasFilePath = “../conf/kerberos/jaas.conf”
(4) 启动
sh bin/startup.sh
(5) 查看 server 日志
vi logs/canal/canal.log
2020-12-11 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2020-12-11 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.16.0.12:9092]
2020-12-11 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
(6) 关闭
sh bin/stop.sh
二、kafka安装及配置
1. 安装Zookeeper
环境版本
· 操作系统:CentOS Linux release 7.2.1511 (Core)
· java版本: jdk1.8(安装略)
· zookeeper版本: zookeeper-3.5.8
(1) 安装zookeeper-3.5.8
官网下载地址:[http://www.apache.org/dyn/closer.cgi/zookeeper](http://www.apache.org/dyn/closer.cgi/zookeeper)
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar zxvf apache-zookeeper-3.5.8-bin.tar.gz
mv zookeeper-3.5.8-bin /usr/local/zookeeper
(2) 修改环境变量
编辑 /etc/profile 文件, 在文件末尾添加以下环境变量配置:
# ZooKeeper Env
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
运行以下命令使环境变量生效: source /etc/profile``。
(3) 重命名配置文件
初次使用 ZooKeeper 时,需要将$ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg, zoo.cfg
mv $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg
(4) 单机模式—修改配置文件
创建目录/usr/local/zookeeper/data
和/usr/local/zookeeper/logs
修改配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
(5)
启动 ZooKeeper 服务
# cd /usr/local/zookeeper/zookeeper-3.5.8/bin
# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower
(6) 停止 ZooKeeper 服务
想要停止 ZooKeeper 服务, 可以使用如下命令:
# cd /usr/local/zookeeper/zookeeper-3.5.8/bin
# ./zkServer.sh stop
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
2. 安装Kafka
环境版本
· 操作系统:CentOS Linux release 7.2.1511 (Core)
· java版本: jdk1.8(安装略)
· kafka 版本: kafka_2.11-2.4.1.tgz
官网下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
(1) 下载压缩包, 复制到固定目录并解压到官网下载压缩包
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.4.1/kafka_2.11-2.4.1.tgz
mkdir -p /usr/local/kafka
cp kafka_2.11-2.4.1.tgz /usr/local/kafka
tar -zxvf kafka_2.11-2.4.1.tgz
(2) 修改配置文件vim /usr/local/kafka/kafka_``2.11-``2.4.1``/config/server.properties
修改参数
zookeeper.connect=localhost:2181
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://117.186.248.41:9092 #外网连接ip(配置时删除注释)
# ...
(3) 启动server
bin/kafka-server-start.sh config/server.properties
查看创建了多少个topic,可以使用以下命令进行查看:
bin/kafka-topics.sh --list --zookeeper localhost:2181
接下来通过生产者进行发送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
接着创建一个消费者来进行接收消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning
三、客户端消费kafka消息
package com.single.kafka;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
@author Javame
@date 2020/12/9 9:25
*/
public class MyConsumer {
private static KafkaConsumer<String, String> _consumer_;
private static Properties _properties_;<br />
//初始化
static {
_properties _= new Properties();
//建立连接broker的地址
_properties_.put("bootstrap.servers", "117.186.248.41:9092");
//kafka反序列化
_properties_.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
_properties_.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//指定消费者组
_properties_.put("group.id", "g4");
}<br />
//自动提交位移:由consume自动管理提交
private static void generalConsumeMessageAutoCommit() {
//配置
_properties_.put("enable.auto.commit", true);
_consumer _= new KafkaConsumer<>(_properties_);
//指定topic
_consumer_.subscribe(Collections._singleton_("cpay"));
try {
while (true) {
boolean flag = true;
//拉取信息,超时时间100ms
ConsumerRecords<String, String> records = _consumer_.poll(1000);
//遍历打印消息
for (ConsumerRecord<String, String> record : records) {
System._out_.println(String._format_(
"topic = %s, partition = %s, key = %s, value = %s",
record.topic(), record.partition(), record.key(), record.value()
));
//消息发送完成
if (record.value().equals("done")) {
flag = false;
}
}<br /> if (!flag) {
break;
}
}<br /> } finally {
_consumer_.close();
}
}<br /> public static void main(String[] args) {
//自动提交位移
_generalConsumeMessageAutoCommit_();<br /> }
}
附录
canal使用过程:
1.启动MySQL(CentOS开机自启动)
2.启动zookeeper ./zkServer.sh start
3.启动kafka bin/kafka-server-start.sh config/server.properties
4.启动canal sh bin/stop.sh sh bin/startup.sh