检查点

概览

Overview

检查点通过恢复任务状态和流的位置使flink达到容错的特性,从而为flink任务提供与无故障执行相同的语句。

Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, thereby giving the application the same semantics as a failure-free execution.

查看检查点如何在你的flink任务开启和设置checkpoints机制。

See Checkpointing for how to enable and configure checkpoints for your program.

检查点保留机制

Retained Checkpoints

默认情况下,检查点不会保留,只有在任务失败的情况下才会使用。当程序被取消,它们也会被删除。但是你可以配置要保留的检查点。根据配置,当任务失败或取消时,检查点不会自动被清除,这样你可以通过检查点来恢复失败的任务。

Checkpoints are by default not retained and are only used to resume a job from failures. They are deleted when a program is cancelled. You can, however, configure periodic checkpoints to be retained. Depending on the configuration these retained checkpoints are not automatically cleaned up when the job fails or is canceled. This way, you will have a checkpoint around to resume from if your job fails.

  1. CheckpointConfig config = env.getCheckpointConfig();
  2. config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

ExternalizedCheckpointCleanup: 模式指定了当你取消任务的时候会发生什么。

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当任务被取消保留检查点。注意在这种情况下,任务被取消后你必须手动去清除检查点状态。

  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 当任务被取消删除检查点。只有当任务失败检查点的状态才会有效。

The ExternalizedCheckpointCleanup mode configures what happens with checkpoints when you cancel the job:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: Retain the checkpoint when the job is cancelled. Note that you have to manually clean up the checkpoint state after cancellation in this case.

  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails.

目录结构

Directory Structure

保存点相同,检查点文件包含一个元数据文件以及一些其他数据文件,具体取决于状态后端。元数据文件和数据文件保存的目录由state.checkpoints.dir 参数配置,也可以再每个任务的代码中单独指定。

Similarly to savepoints, a checkpoint consists of a meta data file and, depending on the state backend, some additional data files. The meta data file and data files are stored in the directory that is configured via state.checkpoints.dir in the configuration files, and also can be specified for per job in the code.

通过配置文件设置全局路径

  1. state.checkpoints.dir: hdfs:///checkpoints/

Configure globally via configuration files

  1. state.checkpoints.dir: hdfs:///checkpoints/

在构造状态后端时为每一个任务配置检查点路径

  1. env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");

Configure for per job when constructing the state backend

  1. env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");

与保存点的差异

Difference to Savepoints

检查点与保存点有一些不同,主要在

  • 使用状态后端特定的数据格式,可能是增量的。

  • 不支持flink的特定功能,例如重新缩放。

Checkpoints have a few differences from savepoints. They

  • use a state backend specific (low-level) data format, may be incremental.
  • do not support Flink specific features like rescaling.

通过保留的检查点恢复任务

Resuming from a retained checkpoint

通过使用检查点的元数据文件,可以从检查点恢复作业,就像从保存点恢复一样(请参阅保存点恢复指南)。注意如果元数据文件不是自包含的,作业管理器需要访问它指向的数据文件(查看上面的目录结构)。

  1. $ bin/flink run -s :checkpointMetaDataPath [:runArgs]

A job may be resumed from a checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the savepoint restore guide). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see Directory Structure above).

  1. $ bin/flink run -s :checkpointMetaDataPath [:runArgs]