Checkpointing 检验指示

弗林克中的每个函数和运算符都可以是状态(有关详细信息,请参阅与状态一起工作。状态函数存储数据跨单个元素/事件的处理,使状态成为任何类型的更精细操作的关键构建块。

为了使状态容错,Flink需要到检查点状态。检查点允许Flink恢复流中的状态和位置,以赋予应用程序与无故障执行相同的语义。

关于流式容错的文档详细介绍了flink的流容错机制背后的技术。

Prerequisites 先决条件

Flink的检查点机制与流和状态的持久存储交互。一般而言,它要求:

  • a persistent (或 durable )数据源,可以重放特定时间量的记录。这样的源的示例是持久消息队列(例如,ApacheKafka、RabbitMQ、AmazonKinesis、Googlepubsub)或文件系统(例如,HDFS、S3、GFS、NFS、CEH等)。)。
  • 用于状态的持久存储,通常是分布式文件系统(例如,HDFS、S3、GFS、NFS、CEH等)。)

Enabling and Configuring Checkpointing 启用和配置检查点

默认情况下,禁用检查点。要启用检查点,请在StreamExecutionEnvironment上调用enableCheckpointing(n),其中n是以毫秒为单位的检查点间隔。

检查点操作的其他参数包括:

  • exactly-once vs. at-least-once:您可以选择将模式传递到 enableCheckpointing(n)方法以在两个保证级别之间进行选择。对于大多数应用而言,精确-一次是优选的。至少-一次对于某些超低延迟(持续几毫秒)应用而言可能是相关的。

  • checkpoint timeout: 进程检查点终止的时间,如果该检查点到那时还没有完成的话.

  • minimum time between checkpoints: 为了确保流应用程序在检查点之间取得一定的进展,可以定义在检查点之间传递所需的时间。例如,如果将此值设置为 5000,则下一个检查点将在上一个检查点完成后5秒钟内启动,而不管检查点持续时间和检查点间隔如何。请注意,这意味着检查点间隔永远不会小于此参数。

    通过定义“检查点之间的时间”通常比定义检查点间隔更容易配置应用程序,因为“检查点之间的时间”不容易受到检查点有时比平均时间长的影响(例如,如果目标存储系统暂时慢的话)。

    注意,这个值还意味着并发检查点的数目是 one

  • number of concurrent checkpoints: 默认情况下,当另一个检查点仍在进行中时,系统不会触发另一个检查点。这可以确保拓扑不会花费太多时间在检查点上,并且不会在处理流方面取得进展。它可以允许多个重叠检查点,这对于具有一定处理延迟(例如,函数调用需要一定时间响应的外部服务)但仍然希望执行非常频繁的检查点(100毫秒)来重新处理故障时很少处理的管道来说是很有趣的。

    当定义检查点之间的最小时间时,无法使用此选项。

  • externalized checkpoints: 您可以在外部配置定期检查点。外部化检查点将其元数据写入永久存储,并在作业失败时自动清除。这样,如果您的作业失败,您将有一个检查点以恢复。外部化检查点的部署说明中有更多详细信息。

  • fail/continue task on checkpoint errors: 这将确定如果任务的检查点过程执行中发生错误,任务是否会失败。这是默认行为。或者,当此操作被禁用时,任务将简单地将检查点拒绝给检查点协调器并继续运行。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. // start a checkpoint every 1000 ms
  3. env.enableCheckpointing(1000);
  4. // advanced options:
  5. // set mode to exactly-once (this is the default)
  6. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  7. // make sure 500 ms of progress happen between checkpoints
  8. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  9. // checkpoints have to complete within one minute, or are discarded
  10. env.getCheckpointConfig().setCheckpointTimeout(60000);
  11. // allow only one checkpoint to be in progress at the same time
  12. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  13. // enable externalized checkpoints which are retained after job cancellation
  14. env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. // start a checkpoint every 1000 ms env.enableCheckpointing(1000)
  3. // advanced options:
  4. // set mode to exactly-once (this is the default) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  5. // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  6. // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig.setCheckpointTimeout(60000)
  7. // prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined. env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
  8. // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

Related Config Options 相关配置选项

可以通过conf/flink-conf.yaml设置更多参数和/或默认值(请参阅[配置](//ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html)作为完整指南):

Key Default Description
state.backend

| (none) | 用于存储和检查点状态的状态后端。| |

state.backend.async

| true | 选项是否应在可能的情况下使用异步快照方法并进行配置。有些状态后端可能不支持异步快照,或者只支持异步快照,而忽略此选项。| |

state.backend.fs.memory-threshold

| 1024 | 状态数据文件的最小大小。所有小于该状态块的状态块都内联存储在根检查点元数据文件中。 | |

state.backend.incremental

| false | 选项状态后端是否应创建增量检查点(如果可能)。对于增量检查点,仅存储与上一个检查点不同的值,而不是完整的检查点状态。某些状态后端可能不支持增量检查点并忽略此选项。| |

state.backend.local-recovery

| false | 选项状态后端是否应创建增量检查点(如果可能)。对于增量检查点,仅存储与上一个检查点不同的值,而不是完整的检查点状态。某些状态后端可能不支持增量检查点并忽略此选项。| |

state.checkpoints.dir

| (none) | 用于在FLink支持的文件系统中存储检查点数据文件和元数据的默认目录。必须从所有参与进程/节点访问存储路径(即,所有任务管理器和作业管理器)。| |

state.checkpoints.num-retained

| 1 | 要保留的已完成检查点的最大数量。 | |

state.savepoints.dir

| (none) | 保存点的默认目录。用于将保存点写入文件系统的状态后端(MemoryStateBackend、FsStateBackend、RocksDBStateBackend)。 | |

taskmanager.state.local.root-dirs

| (none) | 配置参数定义了用于存储用于本地恢复的基于文件的状态的根目录。本地恢复当前仅覆盖键控状态后端。当前,MemoryStateBackend不支持本地恢复并忽略此选项|

Selecting a State Backend 选择状态后端

flink的checkpoint机制存储定时器和有状态运算符的所有状态的一致快照,包括连接器、Windows和任何用户定义的状态。其中存储检查点(例如,JobManager内存、文件系统、数据库)取决于所配置的状态后端

默认情况下,状态保存在TaskManager中的内存中,检查点存储在JobManager中的内存中。为了适当地保持大状态,Flink支持在其他状态后端存储和检查点状态的各种方法。可以通过`StreamExecutionEnvironment.setStateBackend(…)配置状态后端的选择。``。

有关作业范围和集群范围内配置的可用状态后端和选项的详细信息,请参见State backends]。

State Checkpoints in Iterative Jobs 迭代作业中的状态检查点

Flink目前只为没有迭代的作业提供处理保证。在迭代作业上启用检查点会导致异常。为了在迭代程序上强制检查点,用户在启用检查点时需要设置一个特殊标志:‘env.enableCheckpoint(Interval,force=true)’。

请注意,在循环边缘的飞行记录(以及与它们相关的状态更改)将在失败期间丢失。

Restart Strategies 重启策略

Flink支持不同的重新启动策略,这些策略控制在发生故障时如何重新启动作业。有关更多信息,请参见重新启动Strategies.