Checkpoint 和 Savepoint 都是用来存储 Flink Job的状态。如果你要用 Flink来上生产,那么 Checkpoint 和 Savepoint是必不可少的。Checkpoint 和 Savepoint 的主要使用场景有以下两种:
- 在一个Flink Job生命周期内 (JobManager 没有退出),如果某个Task挂掉了,Flink会自动重新启动整个Job,重启Job所使用的状态就是Checkpoint数据 (这个过程也就是我们俗称的Fail over)。
- 如果一个 Flink Job 挂了(JobManager已经退出),这个时候你要重新启动作业的话(比如用 flink run 重新跑作业),也是可以从Checkpoint 或者 Savepoint 来恢复到原来的状态。
启用Checkpoint
默认情况下,你的Flink Job是没有checkpoint的。在Zeppelin里,你可以通过以下两种方式来启用Checkpoint
- Scala API 启用checkpoint
- %flink.conf 启用checkpoint
下面分别是两个例子启用Checkpoint,只是做最简单的配置,更多的配置选项请参考Flink官网文档,https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#checkpointing
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/checkpointing.html
%flink
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.runtime.state.filesystem.FsStateBackend
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
senv.enableCheckpointing(10 * 1000)
senv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
val chkConfig = senv.getCheckpointConfig
chkConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
%flink.conf
pipeline.time-characteristic EventTime
execution.checkpointing.interval 10000
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION
state.backend filesystem
state.checkpoints.dir file:///tmp/flink/checkpoints
请务必配置 execution.checkpointing.externalized-checkpoint-retention 为 RETAIN_ON_CANCELLATION,否则的话当你的Flink 集群退出以后,checkpoint数据也就丢失了,只有设置了 RETAIN_ON_CANCELLATION 才会保留checkpoint数据。
从Checkpoint恢复Job
最新Zeppelin支持以下两种从Checkpoint恢复Job的方式:
- 手动配置Checkpoint恢复Job
- 自动Checkpoint 恢复Job
手动配置Checkpoint恢复Job
如果你启用了Checkpoint,那么在Flink UI中你可以看到checkpoint的所有信息,如下图:
你可以选择从其中的任意一个 checkpoint 恢复Job (默认只保留一个,所以你只能选择最近的一个checkpoint,如果要保留多个,需要设置 state.checkpoints.num-retained )。可以通过如下方式来手动配置checkpoint 恢复 job。
你可以在%flink.conf 设置 execution.savepoint.path
也可以在pararaph里设置 execution.savepoint.path
在%flink.conf 里设置有一个限制,你的flink session cluster里只能有一个flink job,因为你只能在%flink.conf 设置一次 execution.savepoint.path ,而如果是在paragraph里设置,那么可以针对每个paragraph 设置不同的 execution.savepoint.path
自动Checkpoint恢复Job
上面的手动恢复方式相对来来说比较复杂,也容易出错。Zeppelin提供了一种自动恢复的方式,在启动checkpoint的前提下,Zeppelin会通过Flink rest api 定时拿到最新的checkpoint路径,并且存到 note文件里。下面截取了 note 文件里的一小段json代码 ,其中的 latest_checkpoint_path 就是当前这个job的最新checkpoint路径(这是现实细节,只是方便大家理解,大家不用太关心这个实现细节)。
"config": {
"template": "\u003ch1\u003eTotal {0} {1}\u003c/h1\u003e",
"jobName": "hello world",
"colWidth": 12.0,
"editorMode": "ace/mode/sql",
"type": "single",
"{1} \u003c/h1\u003e": "{1} \u003c/h1\u003e",
"enabled": true,
"latest_checkpoint_path": "file:/tmp/flink/checkpoints/6d4c697bc3b99b314d5342f2f6bed800/chk-4",
"editorSetting": {
"language": "sql",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
如果下次用户要从这个自动记录下来的 checkpoint 路径恢复作业的话,只需要在paragraph 中设置resumeFromLatestCheckpoint 为 true 就可以了,如下图:
从Savepoint 恢复 Job
Savepoint 是另外一种存放flink job状态的方法。Savepoint 只有在用户cancel job的时候才做,不像checkpoint会定期做一次checkpoint。
启用Savepoint也很方便,你只要在Paragraph里指定 savepointDir就可以,如下图
当你cancel 这个paragraph的时候,flink会做savepoint,然后将savepoint信息存放到note文件里,如下图里的savepoint_path(这是现实细节,只是方便大家理解,大家不用太关心这个实现细节):
"config": {
"template": "\u003ch1\u003eTotal {0} {1}\u003c/h1\u003e",
"jobName": "hello world",
"colWidth": 12.0,
"editorMode": "ace/mode/sql",
"savepoint_path": "file:/tmp/flink_savepoint/savepoint-eff863-4610b069bf39",
"type": "single",
"{1} \u003c/h1\u003e": "{1} \u003c/h1\u003e",
"enabled": true,
"editorSetting": {
"language": "sql",
"editOnDblClick": false,
"completionKey": "TAB",
"completionSupport": true
},
下次再启动job的时候,只要指定 resumeFromSavepoint 为 true 就会从note中存放的 savepoint_path 来恢复job。
如果 checkpoint,savepoint 数据丢失怎么办 ?
有时候你的checkpoint 或者 savepoint 数据丢失的话,那么你恢复job的时候就会出现下面这种错误
那种情况下你有2种处理办法:
- 如果你还有可用的checkpoint或者savepoint数据 (比如你做了备份),那么你可以直接在paragraph里指定 execution.savepoint.path 就可以。
- 如果你没有可用的checkpoint或者savepoint数据,那么只能重新开始job,这个时候你需要设置 resumeFromLatestCheckpoint 和 resumeFromSavepoint 都为 false。
钉钉群+公众号