参考

演示: SqlServer CDC 导入 Elasticsearch
ES参考:Elasticsearch SQL Connector

安装Docker

请参考安装docker

安装Flink

请参考Docker安装Flink

安装ElasticSearch

请参考Docker安装ElasticSearch

系统环境

选择对应的版本下载,如flink-connector-jdbc-1.15.2.jar,将下载的数据库连接驱动包的jar放到 flink的lib目录下
任务管理器容器

  1. docker cp D:/data/flink/lib/flink-sql-connector-sqlserver-cdc-2.2.1.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-sql-connector-sqlserver-cdc-2.2.1.jar
  1. docker cp D:/data/flink/lib/flink-sql-connector-elasticsearch7-1.15.2.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-sql-connector-elasticsearch7-1.15.2.jar
  1. docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-taskmanager-1
  2. :/opt/flink/lib/flink-connector-jdbc-1.15.2.jar

作业管理器容器

  1. docker cp D:/data/flink/lib/flink-sql-connector-sqlserver-cdc-2.2.1.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-sql-connector-sqlserver-cdc-2.2.1.jar
  1. docker cp D:/data/flink/lib/flink-sql-connector-elasticsearch7-1.15.2.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-sql-connector-elasticsearch7-1.15.2.jar
  1. docker cp D:/data/flink/lib/flink-connector-jdbc-1.15.2.jar flink-docker-jobmanager-1
  2. :/opt/flink/lib/flink-connector-jdbc-1.15.2.jar

到任务管理器容器查看是否拷贝成功

  1. cd D:\data\flink\flink-docker
  1. docker-compose exec taskmanager /bin/bash
  1. cd ./lib/
  2. ll

1665625694682.png

Sql Server同步ES示例

Sql Server源数据sql准备

创建数据库es_test,并创建V_Blood_BOutItem表

  1. CREATE TABLE [dbo].[V_Blood_BOutItem](
  2. [id] [int] NOT NULL,
  3. [deptno] [int] NOT NULL,
  4. [deptname] [varchar](65) NULL,
  5. [bloodno] [varchar](20) NULL,
  6. [bloodname] [varchar](65) NULL,
  7. [boutcount] [float] NULL,
  8. [bloodunitname] [varchar](65) NULL,
  9. [bodate] [datetime] NULL,
  10. CONSTRAINT [PK_V_Blood_BOutItem] PRIMARY KEY CLUSTERED
  11. (
  12. [id] ASC
  13. )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
  14. ) ON [PRIMARY]
  15. GO

开启数据库CDC

  1. SELECT is_cdc_enabled,CASE WHEN is_cdc_enabled=0 THEN 'CDC功能禁用' ELSE 'CDC功能启用' END 描述 FROM sys.databases WHERE name='es_test'
  1. exec sys.sp_cdc_enable_db
  1. EXEC sys.sp_cdc_disable_db

开启表CDC

  1. SELECT name, physical_name FROM sys.master_files WHERE database_id = DB_ID('es_test');
  1. -- 为该库添加名为CDC1的文件组
  2. ALTER DATABASE es_test ADD FILEGROUP CDC1;
  3. ALTER DATABASE es_test
  4. ADD FILE
  5. (
  6. NAME= 'cdctest1_cdc',
  7. FILENAME = 'D:\DATA2019\cdctest1_cdc.ndf'
  8. )
  9. TO FILEGROUP CDC1;
  1. --操作开启表CDC
  2. IF EXISTS(SELECT 1 FROM sys.tables WHERE name='V_Blood_BOutItem' AND is_tracked_by_cdc = 0)
  3. BEGIN
  4. EXEC sys.sp_cdc_enable_table
  5. @source_schema = 'dbo', -- source_schema
  6. @source_name = 'V_Blood_BOutItem', -- table_name
  7. @capture_instance = 'test_instance', -- capture_instance
  8. @supports_net_changes = 1, -- supports_net_changes
  9. @role_name = NULL, -- role_name
  10. @index_name = NULL, -- index_name
  11. @captured_column_list = NULL, -- captured_column_list
  12. @filegroup_name = 'CDC1' -- filegroup_name
  13. END; -- 开启表级别CDC
  1. select name, is_tracked_by_cdc from sys.tables where object_id = OBJECT_ID('V_Blood_BOutItem')

开启成功之后 会存在对应的作业和函数
1665651189330.png
1665651322398.png

ES 准备

在ES提前创建索引sinkboutitem

启动 Flink 集群

  1. cd D:\data\flink\flink-docker

进入到作业管理器容器

  1. docker-compose exec jobmanager /bin/bash
  1. ./bin/start-cluster.sh

Flink SQL CLI创建job

需要先开启Flink 集群,首先,开启 checkpoint,每隔3秒做一次 checkpoint

  1. cd D:\data\flink\flink-docker

进入到作业管理器容器

  1. docker-compose exec jobmanager /bin/bash

使用下面的命令启动 Flink SQL CLI

  1. ./bin/sql-client.sh

开启 checkpoint,每隔3秒做一次 checkpoint

  1. SET execution.checkpointing.interval = 3s;

使用 Flink DDL 创建表

  1. CREATE TABLE sourceboutitem (
  2. id INT NOT NULL,
  3. deptno INT NULL,
  4. deptname STRING,
  5. bloodno INT NULL,
  6. bloodname STRING,
  7. boutcount FLOAT,
  8. bloodunitname STRING,
  9. bodate STRING,
  10. primary key (id) not enforced
  11. ) WITH (
  12. 'connector' = 'sqlserver-cdc',
  13. 'hostname' = '192.168.3.40',
  14. 'port' = '1433',
  15. 'username' = 'sa',
  16. 'password' = 'longfuchu',
  17. 'database-name' = 'es_test',
  18. 'schema-name' = 'dbo',
  19. 'table-name' = 'V_Blood_BOutItem'
  20. );
  1. select * from sourceboutitem;

以上是将192.168.3.40服务器上的sql server数据库的es_test的表V_Blood_BOutItem同步到sourceboutitem上;

  1. CREATE TABLE sinkboutitem (
  2. id INT NOT NULL,
  3. deptno INT NULL,
  4. deptname STRING,
  5. bloodno INT NULL,
  6. bloodname STRING,
  7. boutcount FLOAT,
  8. bloodunitname STRING,
  9. bodate STRING,
  10. primary key (id) not enforced
  11. ) WITH (
  12. 'connector' = 'elasticsearch-7',
  13. 'hosts' = 'http://192.168.3.40:9200',
  14. 'index' = 'sinkboutitem',
  15. 'username' = 'longfc',
  16. 'password' = 'lfc123456'
  17. );

以上是将ES的索引sinkboutitem与flink cdc的sinkflinktest表映射;
开始同步

  1. insert into sinkboutitem select * from sourceboutitem;

注意:以上操作,如果同步出现异常信息,请检查mysql连接配置及端口号是否被防火墙拦截,或者与Flink数据类型映射是否正确打开http://localhost:8081/查看,因为是测试,我只是执行任务一小段时间后就结束任务了,同步到EDS数据库的记录数为
1665656608362.png