什么是状态(state)

虽然数据流中的许多操作只是逐个事件进行处理(例如事件解析器),但有些操作需要跨多个事件记住信息(例如窗口算子)。这些操作称为有状态操作。

一些有状态操作的例子:

  • 当应用程序搜索特定的事件模式时,状态将存储到目前为止遇到的事件序列。
  • 当按分钟/小时/天聚合事件时,状态保存挂起的聚合结果。当对数据点流进行机器学习模型训练时,状态保存模型参数的当前版本。
  • 当需要管理历史数据时,状态可以高效地访问过去发生的事件。

为了使用 checkpoints 和 savepoints,需要对状态有足够了解。

对状态的了解还允许重新调整Flink应用程序,意味着Flink会负责在并行实例之间重新分配状态。 Queryable state允许在运行时从Flink外部访问状态。 在处理状态时,学习Flink的 state backends 也可能很有用。Flink提供了不同的状态后端,用于指定状态的存储方式和位置。

Keyed State

键控状态(Keyed state)被维护在一个可以被看作是嵌入式 KV 存储的地方。状态是被分区的,并且与被状态算子(stateful operators)读取的流严格分布在一起。因此,只有在键控流(即经过键控/分区数据交换后的流)上才能访问键/值状态,并且仅限于与当前事件的键相关联的值。将流和 state 的键进行对齐,确保所有状态更新是本地操作,从而保证一致性而无需事务开销。这种对齐还允许Flink透明地重新分布状态并调整流的分区。

有状态的流处理 - 图1

Keyed State进一步组织成所谓的键组(Key Groups)。键组是Flink可以重新分配Keyed State的原子单位;键组的数量正好等于定义的最大并行度。在执行过程中,每个有键算子的并行实例都使用一个或多个键组的键进行处理。

状态持久化

Flink通过流重放(stream replay)和检查点(Checkpointing)的组合来实现容错性。检查点标记了每个输入流中的特定的一个时间点,以及每个算子对应的状态。通过恢复算子的状态并从检查点的位置重新播放记录,可以在保持一致性的同时(确切一次语义),从检查点恢复流数据流。

检查点间隔是在执行过程中以容错开销和恢复时间(需要重新播放的记录数)为代价进行权衡的一种手段。

容错机制不断绘制分布式流数据流的快照。对于具有较小状态的流应用程序,这些快照非常轻量级,可以频繁进行绘制而对性能影响较小。流应用程序的状态通常存储在可配置的位置,通常是分布式文件系统。

在发生程序故障(由于机器、网络或软件故障)的情况下,Flink会停止分布式流数据流。然后系统重新启动算子并将它们重置为最新成功的检查点。输入流被重置为状态快照的位置。作为重新启动的并行数据流的一部分处理的任何记录都保证不会影响先前检查点的状态。

:::warning 默认情况下,检查点功能是禁用的。有关如何启用和配置检查点功能的详细信息,请学习 Checkpointing。

:::

:::info 为了实现全面的保证,数据流源(如消息队列或 broker)需要能够将流倒回到定义的最近点。Apache Kafka具有这种能力,并且Flink与Kafka的连接器利用了这一点。有关Flink连接器提供的保证的更多信息,请学习Flink连接器的容错性保证。

:::

:::info 由于Flink的检查点通过分布式快照实现,我们可以随意使用快照(snapshot)和检查点(checkpoint)这两个词表达相同的意思。通常我们也将快照一词用于表示检查点或保存点。

:::

Checkpointing

Flink的容错机制的核心部分是绘制分布式数据流和算子状态的一致性快照。这些快照作为一致性的检查点,在发生故障时可以回退到这些检查点。Flink绘制这些快照的机制被描述在 Lightweight Asynchronous Snapshots for Distributed Dataflows中。它受到了分布式快照的标准 Chandy-Lamport algorithm 的启发,专门针对Flink的执行模型进行了调整。需要注意的是,与检查点相关的所有操作都可以异步进行。检查点屏障不需要同步进行,操作可以异步地绘制它们的状态快照。自Flink 1.11版本以来,可以使用或不使用对齐方式进行检查点。在本节中,我们首先描述对齐方式的检查点。

Barriers

Flink在分布式快照中的核心元素是流屏障(stream barriers)。这些屏障被注入到数据流中,并作为数据流的一部分与记录一起流动。屏障永远不会超过记录,它们严格按照顺序流动。屏障将数据流中的记录分成进入当前快照的记录集和进入下一个快照的记录集。每个屏障都携带了将其前面推入的快照的ID。屏障不会中断流的流动,因此非常轻量级。不同快照中的多个屏障可以同时存在于流中,这意味着可能会同时发生多个快照。

有状态的流处理 - 图2

流屏障被注入到并行数据流中的流源中。注入快照n的屏障的位置(我们称之为Sn)是源流中快照覆盖数据的位置。例如,在Apache Kafka中,该位置将是分区中最后一条记录的偏移量。该位置Sn被报告给检查点协调器(Flink的JobManager)。然后,屏障向下游流动。当一个中间算子从所有输入流接收到快照n的屏障后,它会将快照n的屏障发射到所有输出流中。一旦一个汇聚算子(即流图的末端)从所有输入流接收到快照n的屏障,它将向检查点协调器确认快照n。在所有汇聚算子确认了一个快照后,该快照被认为已经完成。一旦完成了快照n,作业将不再向源请求Sn之前的记录,因为此时这些记录(以及它们的后代记录)已经通过整个数据流拓扑结构传递。

有状态的流处理 - 图3

接收多个输入流的算子需要根据快照屏障对输入流进行对齐。上面的图示说明了这一点:

  • 一旦算子从一个输入流接收到快照屏障n,它就不能处理来自该流的任何进一步记录,直到它也从其他输入流接收到快照屏障n。否则,它会将属于快照n的记录与属于快照n+1的记录混合在一起。
  • 一旦从最后一个流接收到快照屏障n,算子就会发出所有未处理的输出记录,然后自身发出快照屏障n。
  • 它对state进行快照并继续开始处理来自所有输入流的记录,在处理流中的记录之前先处理输入缓冲区中的记录。
  • 最后,算子异步将状态写入状态后端。

请注意,对于具有多个输入的所有算子以及在洗牌之后的算子(当它们消费多个上游子任务的输出流)都需要进行对齐。

Snapshotting Operator State

当算子包含任何形式的状态时,该状态也必须成为快照的一部分。

算子在从其输入流接收到所有快照屏障之后,并在将屏障发射到其输出流之前,在这个时间点对状态进行一个快照。这个时刻,从屏障之前的记录对状态所做的所有更新已经完成,而对于依赖屏障之后的记录的更新还没有被应用。由于快照的状态可能很大,它存储在可配置的状态后端中。默认情况下,这是JobManager的内存,但在生产环境中应配置分布式可靠存储(例如HDFS)。状态存储完成后,算子确认检查点,将快照屏障发射到输出流中,并继续执行。

生成的快照现在包含:

  • 对于每个并行流数据源,快照开始时流中的偏移/位置
  • 对于每个算子,一个指针指向快照中其对应的state

有状态的流处理 - 图4

Recovery

在这种机制下的恢复过程很简单:发生故障时,Flink选择最新完成的检查点k。然后,系统重新部署整个分布式数据流,并将每个算子设置为使用检查点k中的快照状态。源头被设置为从位置Sk开始读取流。例如,在Apache Kafka中,这意味着告诉消费者从偏移量Sk开始获取数据。如果状态是增量快照的,算子将从最新的完整快照状态开始,然后将一系列增量快照更新应用到该状态中。

Unaligned Checkpointing

检查点也可以进行非对齐的操作。基本思想是,飞行中的数据成为算子状态的一部分。

需要注意的是,这种方法实际上更接近于 Chandy-Lamport algorithm,但Flink仍然在源中插入屏障,以避免让checkpoint 部分负担过大。

有状态的流处理 - 图5

图示为算子处理非对齐检查点屏障的方式:

  • 算子对其输入缓冲区中存储的第一个屏障作出反应。
  • 它立即通过将其添加到输出缓冲区的末尾将屏障转发给下游算子。
  • 算子标记所有超过的记录以进行异步存储,并创建自己状态的快照。

因此,在非对齐检查点中,算子只会暂时停止处理输入以标记缓冲区、转发屏障并创建其他状态的快照。

非对齐检查点确保屏障尽快到达汇聚算子。它特别适用于至少有一个移动缓慢的数据路径的应用程序,在该应用程序中,对齐时间可能达到几个小时。

然而,由于它增加了额外的I/O压力,当状态后端的I/O性能成为瓶颈时,它并不能提供帮助。

需要注意的是,保存点始终是对齐的。

Unaligned Recovery

在非对齐检查点中,算子在处理来自上游算子的任何数据之前,首先恢复飞行中的数据。除此以外,它执行与对齐检查点恢复相同的步骤。

State Backends

确切地说,键/值索引存储在哪种数据结构中取决于所选择的状态后端。一个状态后端将数据存储在内存中的哈希映射中,另一个状态后端使用RocksDB作为键/值存储。除了定义保存状态的数据结构外,状态后端还实现了获取键/值状态的时间点快照并将该快照作为检查点的一部分存储的逻辑。可以配置状态后端而无需更改应用程序逻辑。

有状态的流处理 - 图6

Savepoints

所有使用检查点的程序都可以从保存点恢复执行。保存点允许在不丢失任何状态的情况下更新程序和Flink集群。

保存点是手动触发的检查点,它对程序进行快照并将其写入到状态后端。它们依赖于常规的检查点机制来实现这一点。

保存点与检查点类似,不同之处在于它们由用户触发,并且在完成新的检查点时不会自动过期。为了正确使用保存点,重要的是要理解检查点和保存点之间的区别,这在检查点与保存点中有所描述。

恰好一次 vs 至少一次

对齐步骤可能会增加流程序的延迟。通常,这种额外的延迟在几毫秒的数量级上,但我们也看到一些异常情况下的延迟明显增加。对于需要所有记录都具有一致超低延迟(几毫秒)的应用程序,Flink提供了一个开关来跳过检查点期间的流对齐。检查点快照仍然在算子从每个输入看到检查点屏障后立即生成。

当跳过对齐时,即使检查点 n 的某些屏障已经到达时,算子会继续处理所有输入。这样,算子在快照检查点n之前会处理属于检查点 n+1 的元素。在恢复时,这些记录将出现为重复记录,因为它们都包含在检查点n的状态快照中,并将作为检查点n后的数据的一部分重播。

:::info Flink 仅对具有多个前驱算子(连接)以及具有多个发送方(在流重新分区/洗牌之后)的算子进行对齐。因此,如果数据流只涉及到并行流算子(map(),flatMap(),filter(),…),在“至少一次”模式下,实际上是“恰好一次”

:::

批处理程序中的状态和容错性

Flink将批处理程序作为流式程序的特殊情况执行,其中流是有界的(具有有限数量的元素)。DataSet在内部被视为数据流。因此,上述概念同样适用于批处理程序,只是有一些小的例外:

  • 批处理程序的容错不使用检查点。恢复通过完全重播数据流来进行。这是可能的,因为输入是有界的。这样做会增加恢复的成本,但使常规处理更便宜,因为它避免了检查点。
  • DataSet API中的有状态操作使用简化的内存/外部存储数据结构,而不是键/值索引。
  • DataSet API引入了特殊的同步(基于superstep的)迭代,仅对有界流可能。