译者:flink.sojb.cn

概览

保存点是外部存储的自包含检查点,可用于停止和恢复或更新Flink程序。他们使用Flink的检查点机制来创建流程序状态的(非增量)SNAPSHOT,并将检查点数据和元数据写入外部文件系统。

此页面介绍了触发,恢复和处理保存点所涉及的所有步骤。有关Flink如何处理状态和故障的更多详细信息,请查看Streaming Programs页面中的State

注意:为了允许程序和Flink版本之间的升级,请务必查看以下有关为算子分配ID的部分。

分配算子ID

这是强烈建议您调整自己的方案,作为为了能够在将来升级你的程序在本节中描述。主要的必要更改是通过该uid(String)方法手动指定算子ID 。这些ID用于确定每个 算子的状态。

  1. DataStream<String> stream = env.
  2. // Stateful source (e.g. Kafka) with ID
  3. .addSource(new StatefulSource())
  4. .uid("source-id") // ID for the source operator
  5. .shuffle()
  6. // Stateful mapper with ID
  7. .map(new StatefulMapper())
  8. .uid("mapper-id") // ID for the mapper
  9. // Stateless printing sink
  10. .print(); // Auto-generated ID

如果您未手动指定ID,则会自动生成这些ID。只要这些ID不变,您就可以从保存点自动恢复。生成的ID取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些ID。

保存点状态

您可以将保存点视为Operator ID -&gt; State包含每个有状态 算子的映射:

  1. Operator ID | State
  2. ------------+------------------------
  3. source-id | State of StatefulSource
  4. mapper-id | State of StatefulMapper

在上面的例子中,打印接收器是无状态的,因此不是保存点状态的一部分。默认情况下,我们尝试将保存点的每个条目映射回新程序。

算子操作

您可以使用命令行客户端,以触发保存点取消作业用的保存点从保存点恢复处理保存点

使用Flink> = 1.2.0,也可以使用webui 从保存点恢复

触发保存点

触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制此目录的位置(请参阅:targetDirectory参数)。

注意:目标目录必须是JobManager(s)和TaskManager(例如分布式文件系统上的位置)可访问的位置。

例如,使用FsStateBackendRocksDBStateBackend

  1. # Savepoint target directory
  2. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  3. /savepoints/
  4. # Savepoint directory
  5. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  6. /savepoints/savepoint-:shortjobid-:savepointid/
  7. # Savepoint file contains the checkpoint meta data
  8. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  9. /savepoints/savepoint-:shortjobid-:savepointid/_metadata
  10. # Savepoint state
  11. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  12. /savepoints/savepoint-:shortjobid-:savepointid/...

注意: 虽然看起来好像可以移动保存点,但由于_metadata文件中的绝对路径,目前无法进行保存。请按照FLINK-5778了解取消此限制的进度。

请注意,如果使用MemoryStateBackend,则元数据保存点状态将存储在_metadata文件中。由于它是自包含的,您可以移动文件并从任何位置恢复。

触发保存点

  1. $ bin/flink savepoint :jobId [:targetDirectory]

这将触发具有ID的作业的保存点:jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

使用YARN触发保存点

  1. $ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

这将触发具有ID :jobId和YARN应用程序ID 的作业的保存点:yarnAppId,并返回创建的保存点的路径。

使用Savepoint取消作业

  1. $ bin/flink cancel -s [:targetDirectory] :jobId

这将以原子方式触发具有ID的作业的保存点:jobid并取消作业。此外,您可以指定目标文件系统目录以存储保存点。该目录需要可由JobManager和TaskManager访问。

从Savepoints恢复

  1. $ bin/flink run -s :savepointPath [:runArgs]

这将提交作业并指定要从中恢复的保存点。您可以指定保存点目录或_metadata文件的路径。

允许未恢复状态

默认情况下,resume 算子操作将尝试将保存点的所有状态映射回要恢复的程序。如果删除了 算子,则可以通过--allowNonRestoredState(short -n:)选项跳过无法映射到新程序的状态:

  1. $ bin/flink run -s :savepointPath -n [:runArgs]

处理保存点

  1. $ bin/flink savepoint -d :savepointPath

这将处理存储的保存点:savepointPath

请注意,也可以通过常规文件系统 算子操作手动删除保存点,而不会影响其他保存点或检查点(请记住,每个保存点都是自包含的)。直到Flink 1.2,这是一个更乏味的任务,使用上面的savepoint命令执行。

配置

您可以通过state.savepoints.dirKeys配置默认保存点目标目录。触发保存点时,此目录将用于存储保存点。您可以通过使用触发器命令指定自定义目标目录来覆盖默认值(请参阅:targetDirectory参数)。

  1. # Default savepoint target directory
  2. > 译者:[flink.sojb.cn](https://flink.sojb.cn/)
  3. state.savepoints.dir: hdfs:///flink/savepoints

如果既未配置缺省值也未指定自定义目标目录,则触发保存点将失败。

注意:目标目录必须是JobManager(s)和TaskManager(例如分布式文件系统上的位置)可访问的位置。

常问问题

我应该为我的工作中的所有算子分配ID吗?

根据经验,是的。严格地说,仅通过该uid方法将ID分配给作业中的有状态 算子就足够了。保存点仅包含这些 算子的状态,无状态 算子不是保存点的一部分。

在实践中,建议将其分配给所有 算子,因为Flink的一些内置 算子(如Window 算子)也是有状态的,并且不清楚哪些内置 算子实际上是有状态的,哪些不是。如果您完全确定算子是无状态的,则可以跳过该uid方法。

如果我在工作中添加一个需要状态的新算子,会发生什么?

当您向作业添加新算子时,它将在没有任何状态的情况下进行初始化。保存点包含每个有状态 算子的状态。无状态 算子根本不属于保存点。新 算子的行为类似于无状态 算子。

如果我删除一个有我工作状态的算子,会发生什么?

默认情况下,保存点还原将尝试将所有状态匹配回还原的作业。如果从包含已删除算子状态的保存点还原,则会因此失败。

您可以通过使用run命令设置--allowNonRestoredState(short :)来允许非恢复状态-n

  1. $ bin/flink run -s :savepointPath -n [:runArgs]

如果我在工作中重新排序有状态 算子会发生什么?

如果您为这些 算子分配了ID,它们将照常恢复。

如果您没有分配ID,则重新排序后,有状态 算子的自动生成ID很可能会更改。这将导致您无法从以前的保存点恢复。

如果我添加,删除或重新排序在我的工作中没有状态的 算子会发生什么?

如果为有状态 算子分配了ID,则无状态 算子不会影响保存点还原。

如果您没有分配ID,则重新排序后,有状态 算子的自动生成ID很可能会更改。这将导致您无法从以前的保存点恢复。

当我在恢复时更改程序的并行性时会发生什么?

如果使用Flink> = 1.2.0触发保存点并且不使用弃用状态API Checkpointed,则只需从保存点恢复程序并指定新的并行度即可。

如果从Flink <1.2.0触发的保存点恢复或使用现已弃用的API,则首先必须将作业和保存点迁移到Flink> = 1.2.0,然后才能更改并行度。请参阅升级作业和Flink版本指南