安装Docker
请参考安装docker
安装Flink
安装My sql
系统环境
- win 10
- docker Desktop
- mysql 8.0.27
-
Flink CDC的lib依赖
下载下面列出的依赖包,并将它们放到目录 flink容器的/lib/ 下:
下载地址 - flink-connector-jdbc
选择对应的版本下载,如flink-connector-jdbc-1.15.2.jar,将下载的数据库连接驱动包的jar放到 flink的lib目录下
任务管理器容器
docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-taskmanager-1
:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-taskmanager-1
:/opt/flink/lib/flink-connector-jdbc-1.15.2.jar
作业管理器容器
docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-jobmanager-1
:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-jobmanager-1
:/opt/flink/lib/flink-connector-jdbc-1.15.2.jar
到任务管理器容器查看是否拷贝成功
cd D:\data\flink\flink-docker
docker-compose exec taskmanager /bin/bash
cd ./lib/
ll
MySQL同步MySQL示例
Mysql源数据sql准备
CREATE DATABASE IF NOT EXISTS `flink_source` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
CREATE TABLE `flink_test` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
`t_name` varchar(100) NOT NULL COMMENT '名称',
`logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';
Mysql目标数据sql准备
一模一样的同步
CREATE DATABASE IF NOT EXISTS `flink_sink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
CREATE TABLE `flink_test` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
`t_name` varchar(100) NOT NULL COMMENT '名称',
`logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';
不一样的同步
CREATE DATABASE IF NOT EXISTS `flink_sink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
CREATE TABLE `flink_test2` (
`name` varchar(100) NOT NULL COMMENT '名称',
`logo` varchar(200) DEFAULT NULL COMMENT '图片路径',
`UNIQUE KEY name (name`)`
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';
启动 Flink 集群
cd D:\data\flink\flink-docker
进入到作业管理器容器
docker-compose exec jobmanager /bin/bash
./bin/start-cluster.sh
Flink SQL CLI创建job
需要先开启Flink 集群,首先,开启 checkpoint,每隔3秒做一次 checkpoint
cd D:\data\flink\flink-docker
进入到作业管理器容器
docker-compose exec jobmanager /bin/bash
使用下面的命令启动 Flink SQL CLI
./bin/sql-client.sh
开启 checkpoint,每隔3秒做一次 checkpoint
SET execution.checkpointing.interval = 3s;
使用 Flink DDL 创建表
CREATE TABLE sourceflinktest (
id BIGINT NOT NULL,
t_name STRING,
logo_url STRING,
primary key (id) not enforced
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.40',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'flink_source',
'table-name' = 'flink_test'
);
select * from sourceflinktest;
以上是将192.168.3.40服务器上的mysql的数据库flink_source的表flink_test同步到sourceflinktest上;
CREATE TABLE sinkflinktest (
id BIGINT NOT NULL,
t_name STRING,
logo_url STRING,
primary key (id) not enforced
) WITH (
'connector' = 'jdbc',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://192.168.3.40:3306/flink_sink?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',
'username' = 'root',
'password' = '123456',
'table-name' = 'flink_test'
);
select * from sinkflinktest;
以上是将192.168.3.40服务器上的mysql的数据库flink_sink的表flink_test与flink cdc的sinkflinktest表映射;
开始同步
insert into sinkflinktest select * from sourceflinktest;
注意:以上操作,如果同步出现异常信息,请检查mysql连接配置及端口号是否被防火墙拦截,或者与Flink数据类型映射是否正确
示例二(百万数据的同步测试)
CREATE TABLE sourceboutitem (
id INT NOT NULL,
deptno INT NULL,
deptname STRING,
bloodno INT NULL,
bloodname STRING,
boutcount FLOAT,
bloodunitname STRING,
bodate STRING,
primary key (id) not enforced
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.3.40',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'es_test',
'table-name' = 'V_Blood_BOutItem'
);
select * from sourceboutitem;
CREATE TABLE sinkboutitem (
id INT NOT NULL,
deptno INT NULL,
deptname STRING,
bloodno INT NULL,
bloodname STRING,
boutcount FLOAT,
bloodunitname STRING,
bodate STRING,
primary key (id) not enforced
) WITH (
'connector' = 'jdbc',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://192.168.3.40:3306/flink_sink?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',
'username' = 'root',
'password' = '123456',
'table-name' = 'V_Blood_BOutItem'
);
insert into sinkboutitem select * from sourceboutitem;