SQL Client:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/

环境设置

Config参数大全: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/

设置Job名字

java 代码设置方式: EnvironmentSettings Settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(_env, Settings); // 设置 jobName
tableEnv.getConfig().getConfiguration().setString(“pipeline.name”, “table_sql_job”)_;

  1. # https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/#execute-sql-files
  2. # https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/#define-a-custom-job-name
  3. SET 'pipeline.name' = 'SqlJob';

image.png

开启checkpoint

  1. ## Flink SQL 每隔3秒做一次 checkpoint
  2. Flink SQL> SET execution.checkpointing.interval = 3s;
  3. [INFO] Session property has been set.

设置checkpoint 保存模式

如果开启了 checkpoint ,开启方式 SET execution.checkpointing.interval = 3s; 默认: 一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint

  1. # https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints
  2. # ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
  3. # ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
  4. SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';

根据 checkpoint 断点续传(根据上一次的结束点进行下一次的启动点)

  1. 必须开启checkpoint
  2. 如果 不好模拟任务失败,那就设置成取消保存checkpoint数据吧
    1. SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
  3. 必须配置了 state.checkpoints.dir
    1. Flink CheckPoint 使用
  1. # 第一步 设置 checkpoint 时间
  2. SET execution.checkpointing.interval = 3s;
  3. # 第二步 默认 DELETE_ON_CANCELLATION
  4. # https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints
  5. # ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
  6. # ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
  7. SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
  8. # 第三步 根据保存的checkpoint恢复任务
  9. SET 'execution.savepoint.path' = '/opt/flink/ck/549543d3e61cb3bc304caa2c2d9c28f9/chk-37';

image.png

设置

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sqlclient/#initialize-session-using-sql-files

  1. SET 'parallelism.default' = '1'; -- optional: Flink's parallelism (1 by default)
  2. SET 'pipeline.max-parallelism' = '10'; -- optional: Flink's maximum parallelism

SQL File的使用

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#initialize-session-using-sql-files

命令

image.png image.png

  1. // sqlfile 为 flink能读到的目录
  2. ./bin/sql-client.sh -f sqlfile/a.sql
  1. flink docker compose
    1. version: "2.2"
    2. services:
    3. jobmanager:
    4. image: tannnn/flink:1.13.5-scala_2.12
    5. # 给容器取个名字
    6. container_name: jobflink
    7. ports:
    8. - "8081:8081"
    9. command: jobmanager
    10. volumes:
    11. - D:/docker/flink/job/artifacts:/opt/flink/usrlib # job jar 上传你自己写的工作包
    12. # - D:/docker/flink/lib:/opt/flink/lib # 环境jar, 工作中需要依赖的jar,将lib映射出来方便我随时添加新的公共依赖
    13. - D:/docker/flink/ck:/opt/flink/ck # chinkPoint 保存的路径挂载出去
    14. - D:/docker/flink/target:/opt/flink/target # 防止flink 重启 submit的jar包丢失
    15. - D:/docker/flink/sqlfile:/opt/flink/sqlfile
    16. environment:
    17. - |
    18. FLINK_PROPERTIES=
    19. jobmanager.rpc.address: jobmanager
    20. parallelism.default: 2
    21. state.backend: hashmap # chinkPoint FsStateBackend (taskmanager不用配置)
    22. state.checkpoints.dir: file:////opt/flink/ck # chinkPoint FsStateBackend 注意当前用在对这个目录是否有操作权限
    23. web.upload.dir: /opt/flink/target # 防止flink 重启 submit的jar包丢失(taskmanager不用配置)
    24. taskmanager:
    25. image: tannnn/flink:1.13.5-scala_2.12
    26. # 给容器取个名字
    27. container_name: task1flink
    28. depends_on:
    29. - jobmanager
    30. command: taskmanager
    31. volumes:
    32. - D:/docker/flink/job/artifacts:/opt/flink/usrlib
    33. # - D:/docker/flink/lib:/opt/flink/lib
    34. - D:/docker/flink/sqlfile:/opt/flink/sqlfile
    35. environment:
    36. - |
    37. FLINK_PROPERTIES=
    38. jobmanager.rpc.address: jobmanager
    39. taskmanager.numberOfTaskSlots: 16 # 1.13.5 默认只有一个
    40. parallelism.default: 8

MySQL数据库同步MySQL数据库

预备的SQL

源数据

  1. CREATE DATABASE IF NOT EXISTS `flink_source` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
  2. CREATE TABLE `flink_test` (
  3. `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  4. `t_name` varchar(100) NOT NULL COMMENT '名称',
  5. `logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  6. PRIMARY KEY (`id`) USING BTREE
  7. ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';

目标数据

  1. CREATE DATABASE IF NOT EXISTS `flink_sink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
  2. CREATE TABLE `flink_test` (
  3. `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
  4. `t_name` varchar(100) NOT NULL COMMENT '名称',
  5. `logo_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  6. PRIMARY KEY (`id`) USING BTREE
  7. ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=DYNAMIC COMMENT='test';

SQL Client File

文件格式:txt, yaml,sql (我试过都可以) 执行: ./bin/sql-client.sh -f sqlfile/a.sql 如果需要有注释:-- 这是注释

  1. SET execution.checkpointing.interval = 3s;
  2. CREATE TABLE mysql_binlog (
  3. id INT NOT NULL,
  4. t_name STRING,
  5. logo_url STRING,
  6. primary key (id) not enforced
  7. ) WITH (
  8. 'connector' = 'mysql-cdc',
  9. 'hostname' = '192.168.0.99',
  10. 'port' = '3306',
  11. 'username' = 'root',
  12. 'password' = 'root',
  13. 'database-name' = 'flink_source',
  14. 'table-name' = 'flink_test'
  15. );
  16. CREATE TABLE test_cdc_sink (
  17. id INT,
  18. t_name STRING,
  19. logo_url STRING,
  20. primary key (id) not enforced
  21. ) WITH (
  22. 'connector' = 'jdbc',
  23. 'driver' = 'com.mysql.cj.jdbc.Driver',
  24. 'url' = 'jdbc:mysql://192.168.0.51:3316/flink_sink?serverTimezone=Asia/Shanghai&useSSL=false',
  25. 'username' = 'root',
  26. 'password' = 'root',
  27. 'table-name' = 'flink_test'
  28. );
  29. -- 环境参数参考https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sqlclient/#initialize-session-using-sql-files
  30. SET 'pipeline.name' = 'test_sql_file';
  31. insert into test_cdc_sink select * from mysql_binlog;