SQL Client:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/
环境设置
Config参数大全: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/
设置Job名字
java 代码设置方式: EnvironmentSettings Settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(_env, Settings); // 设置 jobName
tableEnv.getConfig().getConfiguration().setString(“pipeline.name”, “table_sql_job”)_;
# https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/#execute-sql-files# https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/#define-a-custom-job-nameSET 'pipeline.name' = 'SqlJob';
开启checkpoint
## Flink SQL 每隔3秒做一次 checkpointFlink SQL> SET execution.checkpointing.interval = 3s;[INFO] Session property has been set.
设置checkpoint 保存模式
如果开启了 checkpoint ,开启方式
SET execution.checkpointing.interval = 3s;默认: 一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
# https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints# ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint# ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpointSET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
根据 checkpoint 断点续传(根据上一次的结束点进行下一次的启动点)
- 必须开启checkpoint
 - 如果 不好模拟任务失败,那就设置成取消保存checkpoint数据吧
 
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';- 必须配置了 state.checkpoints.dir
 
# 第一步 设置 checkpoint 时间SET execution.checkpointing.interval = 3s;# 第二步 默认 DELETE_ON_CANCELLATION# https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints# ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint# ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpointSET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';# 第三步 根据保存的checkpoint恢复任务SET 'execution.savepoint.path' = '/opt/flink/ck/549543d3e61cb3bc304caa2c2d9c28f9/chk-37';

设置
SET 'parallelism.default' = '1'; -- optional: Flink's parallelism (1 by default)SET 'pipeline.max-parallelism' = '10'; -- optional: Flink's maximum parallelism
SQL File的使用
命令
// sqlfile 为 flink能读到的目录./bin/sql-client.sh -f sqlfile/a.sql
- flink docker compose
version: "2.2"services:jobmanager:image: tannnn/flink:1.13.5-scala_2.12# 给容器取个名字container_name: jobflinkports:- "8081:8081"command: jobmanagervolumes:- D:/docker/flink/job/artifacts:/opt/flink/usrlib # job jar 上传你自己写的工作包# - D:/docker/flink/lib:/opt/flink/lib # 环境jar, 工作中需要依赖的jar,将lib映射出来方便我随时添加新的公共依赖- D:/docker/flink/ck:/opt/flink/ck # chinkPoint 保存的路径挂载出去- D:/docker/flink/target:/opt/flink/target # 防止flink 重启 submit的jar包丢失- D:/docker/flink/sqlfile:/opt/flink/sqlfileenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagerparallelism.default: 2state.backend: hashmap # chinkPoint FsStateBackend (taskmanager不用配置)state.checkpoints.dir: file:////opt/flink/ck # chinkPoint FsStateBackend 注意当前用在对这个目录是否有操作权限web.upload.dir: /opt/flink/target # 防止flink 重启 submit的jar包丢失(taskmanager不用配置)taskmanager:image: tannnn/flink:1.13.5-scala_2.12# 给容器取个名字container_name: task1flinkdepends_on:- jobmanagercommand: taskmanagervolumes:- D:/docker/flink/job/artifacts:/opt/flink/usrlib# - D:/docker/flink/lib:/opt/flink/lib- D:/docker/flink/sqlfile:/opt/flink/sqlfileenvironment:- |FLINK_PROPERTIES=jobmanager.rpc.address: jobmanagertaskmanager.numberOfTaskSlots: 16 # 1.13.5 默认只有一个parallelism.default: 8
 
MySQL数据库同步MySQL数据库
预备的SQL
源数据
CREATE DATABASE IF NOT EXISTS `flink_source` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;CREATE TABLE `flink_test` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',`t_name` varchar(100) NOT NULL COMMENT '名称',`logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';
目标数据
CREATE DATABASE IF NOT EXISTS `flink_sink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;CREATE TABLE `flink_test` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',`t_name` varchar(100) NOT NULL COMMENT '名称',`logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';
SQL Client File
文件格式:txt, yaml,sql (我试过都可以) 执行:
./bin/sql-client.sh -f sqlfile/a.sql如果需要有注释:-- 这是注释
SET execution.checkpointing.interval = 3s;CREATE TABLE mysql_binlog (id INT NOT NULL,t_name STRING,logo_url STRING,primary key (id) not enforced) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.0.99','port' = '3306','username' = 'root','password' = 'root','database-name' = 'flink_source','table-name' = 'flink_test');CREATE TABLE test_cdc_sink (id INT,t_name STRING,logo_url STRING,primary key (id) not enforced) WITH ('connector' = 'jdbc','driver' = 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://192.168.0.51:3316/flink_sink?serverTimezone=Asia/Shanghai&useSSL=false','username' = 'root','password' = 'root','table-name' = 'flink_test');-- 环境参数参考https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#initialize-session-using-sql-filesSET 'pipeline.name' = 'test_sql_file';insert into test_cdc_sink select * from mysql_binlog;

