FlinkCDC的第一次使用

安装Docker

请参考安装docker

安装Flink

请参考Docker安装Flink

安装My sql

请参考Docker 安装 Mysql

系统环境

选择对应的版本下载,如flink-connector-jdbc-1.15.2.jar,将下载的数据库连接驱动包的jar放到 flink的lib目录下
任务管理器容器

  1. docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
  1. docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-connector-jdbc-1.15.2.jar

作业管理器容器

  1. docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
  1. docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-connector-jdbc-1.15.2.jar

到任务管理器容器查看是否拷贝成功

  1. cd D:\data\flink\flink-docker
  1. docker-compose exec taskmanager /bin/bash
  1. cd ./lib/
  2. ll

1665625694682.png

MySQL同步MySQL示例

Mysql源数据sql准备

  1. CREATE DATABASE IF NOT EXISTS `flink_source` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
  1. CREATE TABLE `flink_test` (
  2. `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  3. `t_name` varchar(100) NOT NULL COMMENT '名称',
  4. `logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  5. PRIMARY KEY (`id`) USING BTREE
  6. ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';

Mysql目标数据sql准备

一模一样的同步

  1. CREATE DATABASE IF NOT EXISTS `flink_sink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
  1. CREATE TABLE `flink_test` (
  2. `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  3. `t_name` varchar(100) NOT NULL COMMENT '名称',
  4. `logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  5. PRIMARY KEY (`id`) USING BTREE
  6. ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';

不一样的同步

  1. CREATE DATABASE IF NOT EXISTS `flink_sink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
  1. CREATE TABLE `flink_test2` (
  2. `name` varchar(100) NOT NULL COMMENT '名称',
  3. `logo` varchar(200) DEFAULT NULL COMMENT '图片路径',
  4. `UNIQUE KEY name (name`)`
  5. ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';

启动 Flink 集群

  1. cd D:\data\flink\flink-docker

进入到作业管理器容器

  1. docker-compose exec jobmanager /bin/bash
  1. ./bin/start-cluster.sh

Flink SQL CLI创建job

需要先开启Flink 集群,首先,开启 checkpoint,每隔3秒做一次 checkpoint

  1. cd D:\data\flink\flink-docker

进入到作业管理器容器

  1. docker-compose exec jobmanager /bin/bash

使用下面的命令启动 Flink SQL CLI

  1. ./bin/sql-client.sh

开启 checkpoint,每隔3秒做一次 checkpoint

  1. SET execution.checkpointing.interval = 3s;

使用 Flink DDL 创建表

  1. CREATE TABLE sourceflinktest (
  2. id BIGINT NOT NULL,
  3. t_name STRING,
  4. logo_url STRING,
  5. primary key (id) not enforced
  6. ) WITH (
  7. 'connector' = 'mysql-cdc',
  8. 'hostname' = '192.168.3.40',
  9. 'port' = '3306',
  10. 'username' = 'root',
  11. 'password' = '123456',
  12. 'database-name' = 'flink_source',
  13. 'table-name' = 'flink_test'
  14. );
  1. select * from sourceflinktest;

1665638061958.png
以上是将192.168.3.40服务器上的mysql的数据库flink_source的表flink_test同步到sourceflinktest上;

  1. CREATE TABLE sinkflinktest (
  2. id BIGINT NOT NULL,
  3. t_name STRING,
  4. logo_url STRING,
  5. primary key (id) not enforced
  6. ) WITH (
  7. 'connector' = 'jdbc',
  8. 'driver' = 'com.mysql.cj.jdbc.Driver',
  9. 'url' = 'jdbc:mysql://192.168.3.40:3306/flink_sink?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',
  10. 'username' = 'root',
  11. 'password' = '123456',
  12. 'table-name' = 'flink_test'
  13. );
  1. select * from sinkflinktest;

以上是将192.168.3.40服务器上的mysql的数据库flink_sink的表flink_test与flink cdc的sinkflinktest表映射;
开始同步

  1. insert into sinkflinktest select * from sourceflinktest;

注意:以上操作,如果同步出现异常信息,请检查mysql连接配置及端口号是否被防火墙拦截,或者与Flink数据类型映射是否正确

示例二(百万数据的同步测试)

  1. CREATE TABLE sourceboutitem (
  2. id INT NOT NULL,
  3. deptno INT NULL,
  4. deptname STRING,
  5. bloodno INT NULL,
  6. bloodname STRING,
  7. boutcount FLOAT,
  8. bloodunitname STRING,
  9. bodate STRING,
  10. primary key (id) not enforced
  11. ) WITH (
  12. 'connector' = 'mysql-cdc',
  13. 'hostname' = '192.168.3.40',
  14. 'port' = '3306',
  15. 'username' = 'root',
  16. 'password' = '123456',
  17. 'database-name' = 'es_test',
  18. 'table-name' = 'V_Blood_BOutItem'
  19. );
  1. select * from sourceboutitem;
  1. CREATE TABLE sinkboutitem (
  2. id INT NOT NULL,
  3. deptno INT NULL,
  4. deptname STRING,
  5. bloodno INT NULL,
  6. bloodname STRING,
  7. boutcount FLOAT,
  8. bloodunitname STRING,
  9. bodate STRING,
  10. primary key (id) not enforced
  11. ) WITH (
  12. 'connector' = 'jdbc',
  13. 'driver' = 'com.mysql.cj.jdbc.Driver',
  14. 'url' = 'jdbc:mysql://192.168.3.40:3306/flink_sink?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',
  15. 'username' = 'root',
  16. 'password' = '123456',
  17. 'table-name' = 'V_Blood_BOutItem'
  18. );
  1. insert into sinkboutitem select * from sourceboutitem;

打开http://localhost:8081/查看
1665642569587.png
1665642592742.png