Checkpoint 和 Savepoint 都是用来存储 Flink Job的状态。如果你要用 Flink来上生产,那么 Checkpoint 和 Savepoint是必不可少的。Checkpoint 和 Savepoint 的主要使用场景有以下两种:

  • 在一个Flink Job生命周期内 (JobManager 没有退出),如果某个Task挂掉了,Flink会自动重新启动整个Job,重启Job所使用的状态就是Checkpoint数据 (这个过程也就是我们俗称的Fail over)。
  • 如果一个 Flink Job 挂了(JobManager已经退出),这个时候你要重新启动作业的话(比如用 flink run 重新跑作业),也是可以从Checkpoint 或者 Savepoint 来恢复到原来的状态。

启用Checkpoint

默认情况下,你的Flink Job是没有checkpoint的。在Zeppelin里,你可以通过以下两种方式来启用Checkpoint

  • Scala API 启用checkpoint
  • %flink.conf 启用checkpoint

下面分别是两个例子启用Checkpoint,只是做最简单的配置,更多的配置选项请参考Flink官网文档,https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#checkpointing
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/checkpointing.html

  1. %flink
  2. import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
  3. import org.apache.flink.streaming.api.TimeCharacteristic
  4. import org.apache.flink.runtime.state.filesystem.FsStateBackend
  5. senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  6. senv.enableCheckpointing(10 * 1000)
  7. senv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
  8. val chkConfig = senv.getCheckpointConfig
  9. chkConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  1. %flink.conf
  2. pipeline.time-characteristic EventTime
  3. execution.checkpointing.interval 10000
  4. execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
  5. state.backend filesystem
  6. state.checkpoints.dir file:///tmp/flink/checkpoints

请务必配置 execution.checkpointing.externalized-checkpoint-retention 为 RETAIN_ON_CANCELLATION,否则的话当你的Flink 集群退出以后,checkpoint数据也就丢失了,只有设置了 RETAIN_ON_CANCELLATION 才会保留checkpoint数据。

从Checkpoint恢复Job

最新Zeppelin支持以下两种从Checkpoint恢复Job的方式:

  • 手动配置Checkpoint恢复Job
  • 自动Checkpoint 恢复Job

手动配置Checkpoint恢复Job

如果你启用了Checkpoint,那么在Flink UI中你可以看到checkpoint的所有信息,如下图:
image.png
你可以选择从其中的任意一个 checkpoint 恢复Job (默认只保留一个,所以你只能选择最近的一个checkpoint,如果要保留多个,需要设置 state.checkpoints.num-retained )。可以通过如下方式来手动配置checkpoint 恢复 job。
你可以在%flink.conf 设置 execution.savepoint.path
image.png
也可以在pararaph里设置 execution.savepoint.path
image.png
在%flink.conf 里设置有一个限制,你的flink session cluster里只能有一个flink job,因为你只能在%flink.conf 设置一次 execution.savepoint.path ,而如果是在paragraph里设置,那么可以针对每个paragraph 设置不同的 execution.savepoint.path

自动Checkpoint恢复Job

上面的手动恢复方式相对来来说比较复杂,也容易出错。Zeppelin提供了一种自动恢复的方式,在启动checkpoint的前提下,Zeppelin会通过Flink rest api 定时拿到最新的checkpoint路径,并且存到 note文件里。下面截取了 note 文件里的一小段json代码 ,其中的 latest_checkpoint_path 就是当前这个job的最新checkpoint路径(这是现实细节,只是方便大家理解,大家不用太关心这个实现细节)。

  1. "config": {
  2. "template": "\u003ch1\u003eTotal {0} {1}\u003c/h1\u003e",
  3. "jobName": "hello world",
  4. "colWidth": 12.0,
  5. "editorMode": "ace/mode/sql",
  6. "type": "single",
  7. "{1} \u003c/h1\u003e": "{1} \u003c/h1\u003e",
  8. "enabled": true,
  9. "latest_checkpoint_path": "file:/tmp/flink/checkpoints/6d4c697bc3b99b314d5342f2f6bed800/chk-4",
  10. "editorSetting": {
  11. "language": "sql",
  12. "editOnDblClick": false,
  13. "completionKey": "TAB",
  14. "completionSupport": true
  15. },

如果下次用户要从这个自动记录下来的 checkpoint 路径恢复作业的话,只需要在paragraph 中设置resumeFromLatestCheckpoint 为 true 就可以了,如下图:
image.png

从Savepoint 恢复 Job

Savepoint 是另外一种存放flink job状态的方法。Savepoint 只有在用户cancel job的时候才做,不像checkpoint会定期做一次checkpoint。
启用Savepoint也很方便,你只要在Paragraph里指定 savepointDir就可以,如下图image.png
当你cancel 这个paragraph的时候,flink会做savepoint,然后将savepoint信息存放到note文件里,如下图里的savepoint_path(这是现实细节,只是方便大家理解,大家不用太关心这个实现细节):

  1. "config": {
  2. "template": "\u003ch1\u003eTotal {0} {1}\u003c/h1\u003e",
  3. "jobName": "hello world",
  4. "colWidth": 12.0,
  5. "editorMode": "ace/mode/sql",
  6. "savepoint_path": "file:/tmp/flink_savepoint/savepoint-eff863-4610b069bf39",
  7. "type": "single",
  8. "{1} \u003c/h1\u003e": "{1} \u003c/h1\u003e",
  9. "enabled": true,
  10. "editorSetting": {
  11. "language": "sql",
  12. "editOnDblClick": false,
  13. "completionKey": "TAB",
  14. "completionSupport": true
  15. },

下次再启动job的时候,只要指定 resumeFromSavepoint 为 true 就会从note中存放的 savepoint_path 来恢复job。

如果 checkpoint,savepoint 数据丢失怎么办 ?

有时候你的checkpoint 或者 savepoint 数据丢失的话,那么你恢复job的时候就会出现下面这种错误
image.png
那种情况下你有2种处理办法:

  • 如果你还有可用的checkpoint或者savepoint数据 (比如你做了备份),那么你可以直接在paragraph里指定 execution.savepoint.path 就可以。
  • 如果你没有可用的checkpoint或者savepoint数据,那么只能重新开始job,这个时候你需要设置 resumeFromLatestCheckpoint 和 resumeFromSavepoint 都为 false。

钉钉群+公众号

Flink on Zeppelin 3群钉钉.JPG image.png