一般需求,我们的 Checkpoint 时间间隔可以设置为分钟级别(1 ~5 分钟)。对于状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次 Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔,例如设置两次 Checkpoint 之间至少暂停 4 或 8 分钟。同时,也需要考虑时效性的要求,需要在时效性和性能之间做一平衡,如果时效性要求高,结合 end- to-end 时长,设置秒级或毫秒级。如果 Checkpoint语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,
    可以通过 Flink Web UI 的 Checkpoint 选项卡来查看 Checkpoint 过程中各阶段的耗时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。
    RocksDB 相关参数在前面已说明,可以在 flink-conf.yaml 指定,也可以在 Job 的代码中调用 API 单独指定,

    1. // 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 Checkpoint
    2. RocksDBStateBackend rocksDBStateBackend = new
    3. RocksDBStateBackend("hdfs://hadoop1:8020/flink/checkpoints", true);
    4. env.setStateBackend(rocksDBStateBackend);
    5. // 开启 Checkpoint,间隔为 3 分钟
    6. env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
    7. // 配置 Checkpoint
    8. CheckpointConfig checkpointConf = env.getCheckpointConfig();
    9. checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    10. // 最小间隔 4 分钟
    11. checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))
    12. // 超时时间 10 分钟
    13. checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
    14. // 保存 checkpoint
    15. checkpointConf.enableExternalizedCheckpoints(
    16. CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);