SQL Client:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/
环境设置
Config参数大全: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/
设置Job名字
java 代码设置方式: EnvironmentSettings Settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(_env, Settings); // 设置 jobName
tableEnv.getConfig().getConfiguration().setString(“pipeline.name”, “table_sql_job”)_;
# https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/#execute-sql-files
# https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/#define-a-custom-job-name
SET 'pipeline.name' = 'SqlJob';
开启checkpoint
## Flink SQL 每隔3秒做一次 checkpoint
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
设置checkpoint 保存模式
如果开启了 checkpoint ,开启方式
SET execution.checkpointing.interval = 3s;
默认: 一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
# https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints
# ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
# ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
根据 checkpoint 断点续传(根据上一次的结束点进行下一次的启动点)
- 必须开启checkpoint
- 如果 不好模拟任务失败,那就设置成取消保存checkpoint数据吧
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
- 必须配置了 state.checkpoints.dir
# 第一步 设置 checkpoint 时间
SET execution.checkpointing.interval = 3s;
# 第二步 默认 DELETE_ON_CANCELLATION
# https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints
# ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
# ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
# 第三步 根据保存的checkpoint恢复任务
SET 'execution.savepoint.path' = '/opt/flink/ck/549543d3e61cb3bc304caa2c2d9c28f9/chk-37';
设置
SET 'parallelism.default' = '1'; -- optional: Flink's parallelism (1 by default)
SET 'pipeline.max-parallelism' = '10'; -- optional: Flink's maximum parallelism
SQL File的使用
命令
// sqlfile 为 flink能读到的目录
./bin/sql-client.sh -f sqlfile/a.sql
- flink docker compose
version: "2.2"
services:
jobmanager:
image: tannnn/flink:1.13.5-scala_2.12
# 给容器取个名字
container_name: jobflink
ports:
- "8081:8081"
command: jobmanager
volumes:
- D:/docker/flink/job/artifacts:/opt/flink/usrlib # job jar 上传你自己写的工作包
# - D:/docker/flink/lib:/opt/flink/lib # 环境jar, 工作中需要依赖的jar,将lib映射出来方便我随时添加新的公共依赖
- D:/docker/flink/ck:/opt/flink/ck # chinkPoint 保存的路径挂载出去
- D:/docker/flink/target:/opt/flink/target # 防止flink 重启 submit的jar包丢失
- D:/docker/flink/sqlfile:/opt/flink/sqlfile
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
parallelism.default: 2
state.backend: hashmap # chinkPoint FsStateBackend (taskmanager不用配置)
state.checkpoints.dir: file:////opt/flink/ck # chinkPoint FsStateBackend 注意当前用在对这个目录是否有操作权限
web.upload.dir: /opt/flink/target # 防止flink 重启 submit的jar包丢失(taskmanager不用配置)
taskmanager:
image: tannnn/flink:1.13.5-scala_2.12
# 给容器取个名字
container_name: task1flink
depends_on:
- jobmanager
command: taskmanager
volumes:
- D:/docker/flink/job/artifacts:/opt/flink/usrlib
# - D:/docker/flink/lib:/opt/flink/lib
- D:/docker/flink/sqlfile:/opt/flink/sqlfile
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 16 # 1.13.5 默认只有一个
parallelism.default: 8
MySQL数据库同步MySQL数据库
预备的SQL
源数据
CREATE DATABASE IF NOT EXISTS `flink_source` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
CREATE TABLE `flink_test` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
`t_name` varchar(100) NOT NULL COMMENT '名称',
`logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';
目标数据
CREATE DATABASE IF NOT EXISTS `flink_sink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
CREATE TABLE `flink_test` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
`t_name` varchar(100) NOT NULL COMMENT '名称',
`logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';
SQL Client File
文件格式:txt, yaml,sql (我试过都可以) 执行:
./bin/sql-client.sh -f sqlfile/a.sql
如果需要有注释:-- 这是注释
SET execution.checkpointing.interval = 3s;
CREATE TABLE mysql_binlog (
id INT NOT NULL,
t_name STRING,
logo_url STRING,
primary key (id) not enforced
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.0.99',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'flink_source',
'table-name' = 'flink_test'
);
CREATE TABLE test_cdc_sink (
id INT,
t_name STRING,
logo_url STRING,
primary key (id) not enforced
) WITH (
'connector' = 'jdbc',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://192.168.0.51:3316/flink_sink?serverTimezone=Asia/Shanghai&useSSL=false',
'username' = 'root',
'password' = 'root',
'table-name' = 'flink_test'
);
-- 环境参数参考https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#initialize-session-using-sql-files
SET 'pipeline.name' = 'test_sql_file';
insert into test_cdc_sink select * from mysql_binlog;