Savepoint State

您可以将保存点视为持有每个状态 Operator ID -> StateID->状态的映射:

  1. Operator ID | State
  2. ------------+------------------------
  3. source-id | State of StatefulSource
  4. mapper-id | State of StatefulMapper

In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program.


  1. 停止前状态

image.png

  1. 停止任务
  • bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

image.png

  1. cancel这个job
  • bin/flink cancel -s [:targetDirectory] :jobId

image.pngimage.png

  1. 重启任务 (我是yarn 启动)
  • bin/flink run -s :savepointPath [:runArgs]

image.png

  1. 测试重启后的状态

image.png

  1. 测试通过

遇到问题

java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph
  • 必须在FlinkYarnSessionCli(客户端)节点才能提交成功。
  • 解决这种问题的方案就是使用jps命令查看下是否是FlinkYarnSessionCli节点,找到FlinkYarnSessionCli节点提交就好了。

— 这种?

  1. 可能由于jobgraph过大,jobmanager内存不足引起,可增大jobmanager内存
  2. 可能形成flink web在时间内没启动好,可在flink配置文件把web.timeout参数调大 ```
  3. web.timeout: 1000000
  4. akka.client.timeout:600s
  5. akka.ask.timeout:600s ```

突然想知道 触发 SavePoint 生成的文件里面的内容
/flink/flink-checkpoints/savepoint-d319a9-7d9a107adc05 路径下
image.png

Task State 文件
image.png
_meteState 文件
image.png