参考
kafka参考:Apache Kafka SQL 连接器
kafka表配置: Kafka | Apache Flink
Upsert Kafka SQL 连接器配置:upsert-kafka
kafka-clients驱动下载: kafka-clients
安装Docker
请参考安装docker
安装Flink
安装My sql
安装Kafka
系统环境
- win 10
- docker Desktop
- mysql 8.0.27
- kafka 2.8.1
-
Flink CDC的lib依赖
下载下面列出的依赖包,下载地址
- flink-connector-kafka-1.15.2.jar
- kafka-clients-2.8.1.jar (不能少这个,一定要下载,且版本与安装的kafka 版本对应上)
- flink-connector-jdbc
需要自己选择对应的版本下载,如flink的版本为1.15.2,就下载flink-connector-jdbc-1.15.2.jar,将下载的数据库连接驱动包的jar放到 flink的lib目录下:
任务管理器容器
docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-taskmanager-1
:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
docker cp D:/data/flink/lib/flink-connector-kafka-1.15.2.jar flink-docker-taskmanager-1
:/opt/flink/lib/flink-connector-kafka-1.15.2.jar
docker cp D:/data/flink/lib/kafka-clients-2.8.1.jar flink-docker-taskmanager-1
:/opt/flink/lib/kafka-clients-2.8.1.jar
docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-taskmanager-1
:/opt/flink/lib/flink-connector-jdbc-1.15.2.jar
作业管理器容器
docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-jobmanager-1
:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
docker cp D:/data/flink/lib/flink-connector-kafka-1.15.2.jar flink-docker-jobmanager-1
:/opt/flink/lib/flink-connector-kafka-1.15.2.jar
docker cp D:/data/flink/lib/kafka-clients-2.8.1.jar flink-docker-jobmanager-1
:/opt/flink/lib/kafka-clients-2.8.1.jar
docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-jobmanager-1
:/opt/flink/lib/flink-connector-jdbc-1.15.2.jar
到任务管理器容器查看是否拷贝成功
cd D:\data\flink\flink-docker
docker-compose exec taskmanager /bin/bash
cd ./lib/
ll
My Sql 同步Kafka示例
My Sql源数据sql准备
创建数据库es_test,并创建V_Blood_BOutItem表
CREATE TABLE `V_Blood_BOutItem` (
`id` int unsigned NOT NULL,
`deptno` int NOT NULL,
`deptname` varchar(65) DEFAULT NULL,
`bloodno` varchar(20) DEFAULT NULL,
`bloodname` varchar(65) DEFAULT NULL,
`boutcount` float DEFAULT NULL,
`bloodunitname` varchar(65) DEFAULT NULL,
`bodate` varchar(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
Kafka准备
启动 Flink 集群
cd D:\data\flink\flink-docker
进入到作业管理器容器
docker-compose exec jobmanager /bin/bash
./bin/start-cluster.sh
Flink SQL CLI创建job
需要先开启Flink 集群,首先,开启 checkpoint,每隔3秒做一次 checkpoint
cd D:\data\flink\flink-docker
进入到作业管理器容器
docker-compose exec jobmanager /bin/bash
使用下面的命令启动 Flink SQL CLI
./bin/sql-client.sh
开启 checkpoint,每隔3秒做一次 checkpoint
SET execution.checkpointing.interval = 3s;
使用 Flink DDL 创建表
CREATE TABLE sourceboutitem (
id INT,
deptno INT,
deptname STRING,
bloodno STRING,
bloodname STRING,
boutcount FLOAT,
bloodunitname STRING,
bodate STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.40',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'es_test',
'table-name' = 'V_Blood_BOutItem3'
);
select * from sourceboutitem4;
以上是将192.168.3.40服务器上的my sql数据库的es_test的表V_Blood_BOutItem同步到sourceboutitem上;
CREATE TABLE sinkboutitem (
id INT,
deptno INT,
deptname STRING,
bloodno STRING,
bloodname STRING,
boutcount FLOAT,
bloodunitname STRING,
bodate STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'sinkboutitem',
'properties.bootstrap.servers' = 'kafka1:9091,kafka2:9092,kafka3:9093',
'properties.group.id' = 'sinkboutitem',
'scan.startup.mode' = 'group-offsets',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.fields' = 'id',
'value.format' = 'debezium-json'
);
:::warning scan.startup.mode 配置项决定了 Kafka consumer 的启动模式。有效值为:
group-offsets
:从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。earliest-offset
:从可能的最早偏移量开始。latest-offset
:从最末尾偏移量开始。timestamp
:从用户为每个 partition 指定的时间戳开始。specific-offsets
:从用户为每个 partition 指定的偏移量开始。 :::insert into sinkboutitem select * from sourceboutitem;
CREATE TABLE sinkboutitem3 (
id INT,
deptno INT,
deptname STRING,
bloodno STRING,
bloodname STRING,
boutcount FLOAT,
bloodunitname STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'sinkboutitem',
'properties.bootstrap.servers' = 'kafka1:9091,kafka2:9092,kafka3:9093',
'properties.group.id' = 'sinkboutitem',
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'value.format' = 'json'
);
检查验证
select * from sinkboutitem;
以上是将Kafka与flink cdc的sinkflinktest表映射;开始同步
insert into sinkboutitem3 select * from sourceboutitem;
注意:以上操作,如果同步出现异常信息,请检查mysql连接配置及端口号是否被防火墙拦截,或者与Flink数据类型映射是否正确!打开http://localhost:8081/查看,因为是测试,我只是执行任务一小段时间后就结束任务了,同步到EDS数据库的记录数为