参考

演示: SqlServer CDC 导入 Elasticsearch
ES参考:Elasticsearch SQL Connector

安装Docker

请参考安装docker

安装Flink

请参考Docker安装Flink

安装My sql

请参考Docker 安装 Mysql

安装ElasticSearch

请参考Docker安装ElasticSearch

系统环境

选择对应的版本下载,如flink-connector-jdbc-1.15.2.jar,将下载的数据库连接驱动包的jar放到 flink的lib目录下
任务管理器容器

  1. docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
  1. docker cp D:/data/flink/lib/flink-sql-connector-elasticsearch7-1.15.2.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-sql-connector-elasticsearch7-1.15.2.jar
  1. docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-connector-jdbc-1.15.2.jar

作业管理器容器

  1. docker cp D:/data/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar
  1. docker cp D:/data/flink/lib/flink-sql-connector-elasticsearch7-1.15.2.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-sql-connector-elasticsearch7-1.15.2.jar
  1. docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-connector-jdbc-1.15.2.jar

到任务管理器容器查看是否拷贝成功

  1. cd D:\data\flink\flink-docker
  1. docker-compose exec taskmanager /bin/bash
  1. cd ./lib/
  2. ll

1665625694682.png

My Sql 同步ES示例

My Sql源数据sql准备

创建数据库es_test,并创建V_Blood_BOutItem表

  1. CREATE TABLE `V_Blood_BOutItem` (
  2. `id` int unsigned NOT NULL,
  3. `deptno` int NOT NULL,
  4. `deptname` varchar(65) DEFAULT NULL,
  5. `bloodno` varchar(20) DEFAULT NULL,
  6. `bloodname` varchar(65) DEFAULT NULL,
  7. `boutcount` float DEFAULT NULL,
  8. `bloodunitname` varchar(65) DEFAULT NULL,
  9. `bodate` varchar(20) DEFAULT NULL,
  10. PRIMARY KEY (`id`)
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

ES 准备

启动 Flink 集群

  1. cd D:\data\flink\flink-docker

进入到作业管理器容器

  1. docker-compose exec jobmanager /bin/bash
  1. ./bin/start-cluster.sh

Flink SQL CLI创建job

需要先开启Flink 集群,首先,开启 checkpoint,每隔3秒做一次 checkpoint

  1. cd D:\data\flink\flink-docker

进入到作业管理器容器

  1. docker-compose exec jobmanager /bin/bash

使用下面的命令启动 Flink SQL CLI

  1. ./bin/sql-client.sh

开启 checkpoint,每隔3秒做一次 checkpoint

  1. SET execution.checkpointing.interval = 3s;

使用 Flink DDL 创建表

  1. CREATE TABLE sourceboutitem (
  2. id INT NOT NULL,
  3. deptno INT NULL,
  4. deptname STRING,
  5. bloodno INT NULL,
  6. bloodname STRING,
  7. boutcount FLOAT,
  8. bloodunitname STRING,
  9. bodate STRING,
  10. primary key (id) not enforced
  11. ) WITH (
  12. 'connector' = 'mysql-cdc',
  13. 'hostname' = '192.168.3.40',
  14. 'port' = '3306',
  15. 'username' = 'root',
  16. 'password' = '123456',
  17. 'database-name' = 'es_test',
  18. 'table-name' = 'V_Blood_BOutItem'
  19. );
  1. select * from sourceboutitem;

以上是将192.168.3.40服务器上的my sql数据库的es_test的表V_Blood_BOutItem同步到sourceboutitem上;

  1. CREATE TABLE sinkboutitem (
  2. id INT NOT NULL,
  3. deptno INT NULL,
  4. deptname STRING,
  5. bloodno INT NULL,
  6. bloodname STRING,
  7. boutcount FLOAT,
  8. bloodunitname STRING,
  9. bodate STRING,
  10. primary key (id) not enforced
  11. ) WITH (
  12. 'connector' = 'elasticsearch-7',
  13. 'hosts' = 'http://192.168.3.40:9200',
  14. 'index' = 'sinkboutitem2',
  15. 'username' = 'longfc',
  16. 'password' = 'lfc123456'
  17. );

以上是将ES的索引sinkboutitem与flink cdc的sinkflinktest表映射;
开始同步

  1. insert into sinkboutitem select * from sourceboutitem;

注意:以上操作,如果同步出现异常信息,请检查mysql连接配置及端口号是否被防火墙拦截,或者与Flink数据类型映射是否正确
打开http://localhost:8081/查看
因为是测试,我只是执行任务一小段时间后就结束任务了,同步到EDS数据库的记录数为
1665656608362.png