网上都是基于Hadop的hdfs做的,我根据官网的说明直接在本机做的

  • 这个玩意可以处理flinkcdc mysql同步时数据删除操作不同步的问题
    • 在任务停止时,我对源数据进行了增删改。当任务重启启动时,我发现,删除的操作没有同步过来。
    • fink cdc 默认只有两种模式 **initial** **latest **
      • 默认的initial时全量加增量模式,他第一次他只会同步原表中看得见的数据,然后在进行 binlog同步

FsStateBackend 实现chinkpoint恢复

官网使用案例

flink-conf.yaml 全局配置

  • 我用的这种用测试的
  • 文件夹赋权:chown -R 用户名 文件夹路径
  1. # 官网示例
  2. state.backend: hashmap
  3. state.checkpoints.dir: file:///checkpoint-dir/
  4. # 我的配置
  5. state.backend: hashmap
  6. state.checkpoints.dir: file:///opt/flink/ck # 注意当前用在对这个目录是否有操作权限

代码配置

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStateBackend(new HashMapStateBackend());
  3. # 我在idea中测试的
  4. env.getCheckpointConfig().setCheckpointStorage("ffile:///F://project//java//flink-cdc-demo-1//doc");

根据CheckPoint启动任务

利用 命令启动 ./bin/flink run -d -s file:///opt/flink/ck/b8247a3fa6201091fccf45752ff8a6a2/chk-31/_metadata -c cn.tannn.FlinkCDCWitchSQL /xx/flink-demo-1-1.0-SNAPSHOT.jar Flink SQL CLI 设置 checkpoint恢复 :SET ‘execution.savepoint.path’ = ‘/u01/soft/flink/flinkjob/flinksqljob/checkpoint/f39957c3386e1e943f50ac16d4e3f809/chk-59’;

  • Flink SQL CLI 这种方式我没试过。来源

image.png
image.png