网上都是基于Hadop的hdfs做的,我根据官网的说明直接在本机做的
- 这个玩意可以处理flinkcdc mysql同步时数据删除操作不同步的问题
- 在任务停止时,我对源数据进行了增删改。当任务重启启动时,我发现,删除的操作没有同步过来。
- fink cdc 默认只有两种模式
**initial**
**latest **
- 默认的initial时全量加增量模式,他第一次他只会同步原表中看得见的数据,然后在进行 binlog同步
FsStateBackend 实现chinkpoint恢复
flink-conf.yaml 全局配置
- 我用的这种用测试的
- 文件夹赋权:chown -R 用户名 文件夹路径
# 官网示例
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/
# 我的配置
state.backend: hashmap
state.checkpoints.dir: file:///opt/flink/ck # 注意当前用在对这个目录是否有操作权限
代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
# 我在idea中测试的
env.getCheckpointConfig().setCheckpointStorage("ffile:///F://project//java//flink-cdc-demo-1//doc");
根据CheckPoint启动任务
利用 命令启动 ./bin/flink run -d -s file:///opt/flink/ck/b8247a3fa6201091fccf45752ff8a6a2/chk-31/_metadata -c cn.tannn.FlinkCDCWitchSQL /xx/flink-demo-1-1.0-SNAPSHOT.jar Flink SQL CLI 设置 checkpoint恢复 :SET ‘execution.savepoint.path’ = ‘/u01/soft/flink/flinkjob/flinksqljob/checkpoint/f39957c3386e1e943f50ac16d4e3f809/chk-59’;
- Flink SQL CLI 这种方式我没试过。来源