参考

kafka参考:Apache Kafka SQL 连接器
kafka表配置: Kafka | Apache Flink
Upsert Kafka SQL 连接器配置:upsert-kafka
kafka-clients驱动下载: kafka-clients

安装Docker

请参考安装docker

安装Flink

请参考Docker安装Flink

安装My sql

请参考Docker 安装 Mysql

安装Kafka

请参考Docker安装Kafka

系统环境

需要自己选择对应的版本下载,如flink的版本为1.15.2,就下载flink-connector-jdbc-1.15.2.jar,将下载的数据库连接驱动包的jar放到 flink的lib目录下:
任务管理器容器

  1. docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
  1. docker cp D:/data/flink/lib/flink-connector-kafka-1.15.2.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-connector-kafka-1.15.2.jar
  1. docker cp D:/data/flink/lib/kafka-clients-2.8.1.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/kafka-clients-2.8.1.jar
  1. docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-connector-jdbc-1.15.2.jar

作业管理器容器

  1. docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
  1. docker cp D:/data/flink/lib/flink-connector-kafka-1.15.2.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-connector-kafka-1.15.2.jar
  1. docker cp D:/data/flink/lib/kafka-clients-2.8.1.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/kafka-clients-2.8.1.jar
  1. docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-connector-jdbc-1.15.2.jar

到任务管理器容器查看是否拷贝成功

  1. cd D:\data\flink\flink-docker
  1. docker-compose exec taskmanager /bin/bash
  1. cd ./lib/
  2. ll

1665625694682.png

My Sql 同步Kafka示例

My Sql源数据sql准备

创建数据库es_test,并创建V_Blood_BOutItem表

  1. CREATE TABLE `V_Blood_BOutItem` (
  2. `id` int unsigned NOT NULL,
  3. `deptno` int NOT NULL,
  4. `deptname` varchar(65) DEFAULT NULL,
  5. `bloodno` varchar(20) DEFAULT NULL,
  6. `bloodname` varchar(65) DEFAULT NULL,
  7. `boutcount` float DEFAULT NULL,
  8. `bloodunitname` varchar(65) DEFAULT NULL,
  9. `bodate` varchar(20) DEFAULT NULL,
  10. PRIMARY KEY (`id`)
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

Kafka准备

启动 Flink 集群

  1. cd D:\data\flink\flink-docker

进入到作业管理器容器

  1. docker-compose exec jobmanager /bin/bash
  1. ./bin/start-cluster.sh

Flink SQL CLI创建job

需要先开启Flink 集群,首先,开启 checkpoint,每隔3秒做一次 checkpoint

  1. cd D:\data\flink\flink-docker

进入到作业管理器容器

  1. docker-compose exec jobmanager /bin/bash

使用下面的命令启动 Flink SQL CLI

  1. ./bin/sql-client.sh

开启 checkpoint,每隔3秒做一次 checkpoint

  1. SET execution.checkpointing.interval = 3s;

使用 Flink DDL 创建表

  1. CREATE TABLE sourceboutitem (
  2. id INT,
  3. deptno INT,
  4. deptname STRING,
  5. bloodno STRING,
  6. bloodname STRING,
  7. boutcount FLOAT,
  8. bloodunitname STRING,
  9. bodate STRING,
  10. PRIMARY KEY (id) NOT ENFORCED
  11. ) WITH (
  12. 'connector' = 'mysql-cdc',
  13. 'hostname' = '192.168.3.40',
  14. 'port' = '3306',
  15. 'username' = 'root',
  16. 'password' = '123456',
  17. 'database-name' = 'es_test',
  18. 'table-name' = 'V_Blood_BOutItem3'
  19. );
  1. select * from sourceboutitem4;

以上是将192.168.3.40服务器上的my sql数据库的es_test的表V_Blood_BOutItem同步到sourceboutitem上;

  1. CREATE TABLE sinkboutitem (
  2. id INT,
  3. deptno INT,
  4. deptname STRING,
  5. bloodno STRING,
  6. bloodname STRING,
  7. boutcount FLOAT,
  8. bloodunitname STRING,
  9. bodate STRING,
  10. PRIMARY KEY (id) NOT ENFORCED
  11. ) WITH (
  12. 'connector' = 'kafka',
  13. 'topic' = 'sinkboutitem',
  14. 'properties.bootstrap.servers' = 'kafka1:9091,kafka2:9092,kafka3:9093',
  15. 'properties.group.id' = 'sinkboutitem',
  16. 'scan.startup.mode' = 'group-offsets',
  17. 'key.format' = 'json',
  18. 'key.json.ignore-parse-errors' = 'true',
  19. 'key.fields' = 'id',
  20. 'value.format' = 'debezium-json'
  21. );

:::warning scan.startup.mode 配置项决定了 Kafka consumer 的启动模式。有效值为:

  • group-offsets:从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。
  • earliest-offset:从可能的最早偏移量开始。
  • latest-offset:从最末尾偏移量开始。
  • timestamp:从用户为每个 partition 指定的时间戳开始。
  • specific-offsets:从用户为每个 partition 指定的偏移量开始。 :::
    1. insert into sinkboutitem select * from sourceboutitem;
    1. CREATE TABLE sinkboutitem3 (
    2. id INT,
    3. deptno INT,
    4. deptname STRING,
    5. bloodno STRING,
    6. bloodname STRING,
    7. boutcount FLOAT,
    8. bloodunitname STRING,
    9. PRIMARY KEY (id) NOT ENFORCED
    10. ) WITH (
    11. 'connector' = 'upsert-kafka',
    12. 'topic' = 'sinkboutitem',
    13. 'properties.bootstrap.servers' = 'kafka1:9091,kafka2:9092,kafka3:9093',
    14. 'properties.group.id' = 'sinkboutitem',
    15. 'key.format' = 'json',
    16. 'key.json.ignore-parse-errors' = 'true',
    17. 'value.format' = 'json'
    18. );

检查验证

  1. select * from sinkboutitem;

以上是将Kafka与flink cdc的sinkflinktest表映射;开始同步

  1. insert into sinkboutitem3 select * from sourceboutitem;

注意:以上操作,如果同步出现异常信息,请检查mysql连接配置及端口号是否被防火墙拦截,或者与Flink数据类型映射是否正确!打开http://localhost:8081/查看,因为是测试,我只是执行任务一小段时间后就结束任务了,同步到EDS数据库的记录数为
1665656608362.png