本文档解释了在开发应用程序时如何使用Flink的状态抽象。

Keyed State and Operator State 键控状态和操作状态

在Flink中有两种基本的状态: Keyed StateOperator State

Keyed State

Keyed 状态

Keyed State 总是相对于键的,只能在 KeyedStream上的函数和运算符中使用。

您可以将键控状态视为已分区或分块的运算符状态,每个键只使用一个状态分区。每个键状态在逻辑上绑定到一个唯一的并行操作符-实例(key>)的组合,而且由于每个键“属于”一个键控操作符的一个并行实例,我们可以简单地将其看作是<操作符、key>。

密钥状态进一步被组织成所谓的Key Groups。键组是flink可以重新分配键状态的原子单元;与定义的最大并行度完全一样多的键组。在执行过程中,键操作器的每个并行实例与一个或多个键组的键一起工作。

Operator 状态

使用 Operator State (或 non-keyed state ),每个运算符状态都绑定到一个并行运算符实例。KafkaConnector是在FLink中使用运算符状态的良好激励示例。Kafka消费者的每个并行实例将主题分区和偏移的映射保持为其运营商状态。

当并行度被改变时,操作员状态接口支持并行操作员实例之间的重新分配状态。可以有用于执行这种再分配的不同方案。

Raw and Managed State 原始和托管状态

Keyed StateOperator State 以两种形式存在: managedraw

Managed State 在由FLink运行时控制的数据结构中表示,如内部哈希表或RocksDB。示例是“ValueState”, “ListState”等。flink的运行时对状态进行编码,并将它们写入检查点。

Raw State 是运算符保留在自己的数据结构中的状态。检查点时,它们只将一个字节序列写入检查点。flink对状态的数据结构一无所知,只看到原始字节。

所有数据流函数都可以使用托管状态,但在实现运算符时,只能使用原始状态接口。建议使用托管状态(而不是原始状态),因为托管状态flink能够在并行度更改时自动重新分发状态,并且还可以实现更好的内存管理。v

如果您的托管状态需要自定义序列化逻辑,请参阅相应的指南,以确保将来的兼容性。Flink的默认序列化程序不需要特殊处理。

Using Managed Keyed State 使用托管密钥状态

托管键状态接口提供对不同类型状态的访问,这些状态的作用域都是当前输入元素的键。这意味着这种状态只能在KeyedStream上使用,而“KeyedStream”可以通过stream.keyBy(…)创建。

现在,我们将首先查看可用的不同类型的状态,然后我们将看到它们如何在程序中使用。可用的状态基元是:

  • ValueState<T>:这将保存一个可以更新和检索的值(范围为上述输入元素的关键字,因此可能有一个值用于操作所看到的每个键)。可以使用 update(T) 来设置值,并使用 T value()检索该值。

  • ListState<T>: 这将保留元素的列表。可以在所有当前存储的元素上附加元素并检索Iterable。使用add(T)addAll(List<T>)添加元素,可使用Iterable<T> get()来检索可迭代的元素。也可以使用 update(List<T>)覆盖现有列表“”

  • ReducingState<T>:这保留了一个表示添加到状态的所有值的聚合的值。接口类似于 ListState ,但使用add(T)添加的元素将使用指定的ReduceFunction还原为聚合。

  • AggregatingState<IN, OUT>: 这保留了一个表示添加到状态的所有值的聚合的值。与 ReducingState相反,聚合类型可能与添加到状态的元素类型不同。接口与 ListState 相同,但使用add(IN) 添加的元素使用指定的 AggregateFunction进行聚合。

  • FoldingState<T, ACC>: 这保留了一个值,表示添加到状态的所有值的聚合。与 ReducingState相反,聚合类型可能与添加到状态的元素类型不同。接口类似于 ListState ,但是使用 add(T) 添加的元素使用指定的 FoldFunction折叠成一个聚合。

  • MapState<UK, UV>: 这保存了一个映射列表。您可以将键值对放入状态,并在所有当前存储的映射上检索 Iterable 。映射使用 put(UK, UV)putAll(Map<UK, UV>)添加。可以使用get(UK)检索与用户密钥相关的值。映射、键和值的可迭代视图可以分别使用entry()key()values()检索。

所有类型的状态都有一个方法 clear(),用于清除当前活动键的状态,即输入元素的键。

FoldingStateFoldingStateDescriptor 已在Flink 1.4中被废弃,并将在今后完全删除。请使用AggregatingStateAggregatingStateDescriptor

重要的是要记住,这些状态对象仅用于与状态进行接口。状态不一定存储在内部,但可能驻留在磁盘或其他地方。要记住的第二件事是,从状态中得到的值取决于输入元素的键。因此,如果所涉及的键不同,则在用户函数的一次调用中获得的值可能与另一次调用中的值不同。

要获得状态句柄,必须创建一个StateDescriptor。这保存了状态的名称(我们稍后会看到,您可以创建几个状态,它们必须有唯一的名称,以便您可以引用它们)、状态所持有的值的类型,以及可能是用户指定的函数,例如ReduceFunction。根据要检索的状态类型,可以创建 ValueStateDescriptor, a ListStateDescriptor, a ReducingStateDescriptor, a FoldingStateDescriptor or a MapStateDescriptor

状态是使用RuntimeContext访问的,因此它只能在rich functions中使用。有关这方面的信息,请参阅here,但我们不久也将看到一个示例。在 RuntimeContext 中可用的RichFunction 具有以下访问状态的方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

这是一个FlatMapFunction示例,它显示了所有部件是如何连接在一起的:

  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  2. /**
  3. * The ValueState handle. The first field is the count, the second field a running sum.
  4. */
  5. private transient ValueState<Tuple2<Long, Long>> sum;
  6. @Override
  7. public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
  8. // access the state value
  9. Tuple2<Long, Long> currentSum = sum.value();
  10. // update the count
  11. currentSum.f0 += 1;
  12. // add the second field of the input value
  13. currentSum.f1 += input.f1;
  14. // update the state
  15. sum.update(currentSum);
  16. // if the count reaches 2, emit the average and clear the state
  17. if (currentSum.f0 >= 2) {
  18. out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
  19. sum.clear();
  20. }
  21. }
  22. @Override
  23. public void open(Configuration config) {
  24. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  25. new ValueStateDescriptor<>(
  26. "average", // the state name
  27. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
  28. Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
  29. sum = getRuntimeContext().getState(descriptor);
  30. }
  31. }
  32. // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
  33. env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
  34. .keyBy(0)
  35. .flatMap(new CountWindowAverage())
  36. .print();
  37. // the printed output will be (1,4) and (1,5)
  1. class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
  2. private var sum: ValueState[(Long, Long)] = _
  3. override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
  4. // access the state value
  5. val tmpCurrentSum = sum.value
  6. // If it hasn't been used before, it will be null
  7. val currentSum = if (tmpCurrentSum != null) {
  8. tmpCurrentSum
  9. } else {
  10. (0L, 0L)
  11. }
  12. // update the count
  13. val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
  14. // update the state
  15. sum.update(newSum)
  16. // if the count reaches 2, emit the average and clear the state
  17. if (newSum._1 >= 2) {
  18. out.collect((input._1, newSum._2 / newSum._1))
  19. sum.clear()
  20. }
  21. }
  22. override def open(parameters: Configuration): Unit = {
  23. sum = getRuntimeContext.getState(
  24. new ValueStateDescriptor[(Long, Long)](c2a9124493fadff6528dbadacafde7d8)])
  25. )
  26. }
  27. }
  28. object ExampleCountWindowAverage extends App {
  29. val env = StreamExecutionEnvironment.getExecutionEnvironment
  30. env.fromCollection(List(
  31. (1L, 3L),
  32. (1L, 5L),
  33. (1L, 7L),
  34. (1L, 4L),
  35. (1L, 2L)
  36. )).keyBy(_._1)
  37. .flatMap(new CountWindowAverage())
  38. .print()
  39. // the printed output will be (1,4) and (1,5)
  40. env.execute("ExampleManagedState")
  41. }

这个例子实现了一个穷人的计数窗口。我们用第一个字段来键入元组(在示例中,所有元组都有相同的键 1)。函数将计数和运行的和存储在“ValueState”中。一旦计数达到2,它将发出平均值并清除状态,以便我们从 0开始。注意,如果在第一个字段中有不同值的元组,这将为每个不同的输入键保留不同的状态值。

State Time-To-Live (TTL) 状态生存时间(TTL)

a time-to-live (Ttl)可以分配给任意类型的键控状态。如果配置了一个TTL,并且状态值已经过期,则将在尽最大努力的基础上清理存储的值,下文将对此进行更详细的讨论。

所有状态集合类型都支持每个入口TTL。这意味着列表元素和映射项将独立过期。

为了使用状态TTL,必须首先构建一个StateTtlConfig 配置对象。然后,通过传递配置,可以在任何状态描述符中启用TTL功能:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.common.time.Time;
  4. StateTtlConfig ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1))
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build();
  9. ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
  10. stateDescriptor.enableTimeToLive(ttlConfig);
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. import org.apache.flink.api.common.state.ValueStateDescriptor
  3. import org.apache.flink.api.common.time.Time
  4. val ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1))
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build
  9. val stateDescriptor = new ValueStateDescriptor[String](99c5b226c158af40aa43ee80391a348d)
  10. stateDescriptor.enableTimeToLive(ttlConfig)

该配置有多个选项可考虑:

newBuilder 方法的第一个参数是强制性的,它是实时值。

更新类型在刷新状态TTL时配置(默认为 OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入权限时
  • StateTtlConfig.UpdateType.OnReadAndWrite - 也是关于读访问

如果尚未清除过期值,则状态可见性将配置是否在读取访问中返回过期值(默认情况下,NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 过期的值永远不会返回。
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用的话返回

NeverReturnExpired的情况下,过期状态的行为就好像它不再存在了,即使它仍然必须被移除。对于数据必须在TTL之后才能读取访问的用例来说,该选项是有用的,例如。处理隐私敏感数据的应用程序。

另一个选项ReturnExpiredIfNotCleanedUp 允许在清理之前返回过期状态。

注意:

  • 状态后端将上次修改的时间戳与用户值一起存储,这意味着启用此功能会增加状态存储的消耗。堆状态后端存储具有对用户状态对象的引用和在存储器中的原始长值的附加Java对象。ROCKSDB状态后端根据存储的值、列表条目或映射条目添加8个字节。

  • 当前只支持引用 processing time 的TTL。

  • 尝试还原以前没有TTL配置的状态,使用启用TTL的描述符(反之亦然)将导致兼容性失败和 StateMigrationException

  • TTL配置不是Check-或Savepoint的一部分,而是Flink在当前运行的作业中如何对待它的一种方式。

  • 只有当用户值序列化程序能够处理空值时,TTL的映射状态才支持空用户值。如果序列化程序不支持空值,则可以使用 NullableSerializer 包装它,代价是序列化形式中的额外字节。

Cleanup of Expired State 清除过期状态

当前,只有在显式读取过期值(例如,通过调用ValueState.value())时,才会删除过期值。

注意,这意味着默认情况下,如果未读取过期状态,则不会删除它,可能会导致状态不断增长。这可能会在未来的版本中发生变化。

此外,您可以在获取将减小其大小的完整状态快照时激活清理。在当前实现下不清除本地状态,但在从上一个快照恢复的情况下,它将不包括已删除的过期状态。它可以在 StateTtlConfig中配置:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. import org.apache.flink.api.common.time.Time;
  3. StateTtlConfig ttlConfig = StateTtlConfig
  4. .newBuilder(Time.seconds(1))
  5. .cleanupFullSnapshot()
  6. .build();
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. import org.apache.flink.api.common.time.Time
  3. val ttlConfig = StateTtlConfig
  4. .newBuilder(Time.seconds(1))
  5. .cleanupFullSnapshot
  6. .build

此选项不适用于RocksDB状态后端中的增量检查点。

更多的策略将添加在未来的清理过期状态自动在后台。

State in the Scala DataStream API Scala 数据流API中的状态

除了上面描述的接口之外,Scala API还提供了有状态的map()flatMap()函数的快捷方式,其中只有一个KeyedStream上的ValueState 函数。用户函数在 Option 中获取 ValueState 的当前值,并且必须返回将用于更新状态的更新值。

  1. val stream: DataStream[(String, Int)] = ...
  2. val counts: DataStream[(String, Int)] = stream
  3. .keyBy(_._1)
  4. .mapWithState((in: (String, Int), count: Option[Int]) =>
  5. count match {
  6. case Some(c) => ( (in._1, c), Some(c + in._2) )
  7. case None => ( (in._1, 0), Some(in._2) )
  8. })

Using Managed Operator State 使用托管运营商状态

To use managed operator state, a stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed&lt;T extends Serializable&gt; interface.
要使用托管操作符状态,有状态函数可以实现更通用的 CheckpointedFunction 接口,也可以实现 ListCheckpointed&lt;T extends Serializable&gt;接口。

CheckpointedFunction 校验点函数

CheckpointedFunction接口提供了对具有不同再分配方案的非键控状态的访问。它需要实施两种方法:

  1. void snapshotState(FunctionSnapshotContext context) throws Exception;
  2. void initializeState(FunctionInitializationContext context) throws Exception;

必须执行检查点时,调用snapshotState()。对应的initializeState()在每次用户定义的函数被初始化时被调用,当函数首先被初始化时,或者当函数实际从较早的检查点恢复时。因此, initializeState() 不仅是不同类型状态被初始化的地方,而且还包括其中包括状态恢复逻辑的地方。

当前,支持列表样式的托管运算符状态。该状态应为 serializable object的 List,彼此独立,因此在重新调用时符合重新分发的条件。换句话说,这些对象是可以重新分配非键控状态的最佳粒度。根据状态访问方法,定义了以下重新分配方案:

  • Even-split redistribution 偶数再分配: 每个操作符返回一个状态元素列表。整个状态在逻辑上是所有列表的连接。在恢复/重新分配时,列表被平均地划分为与并行运算符相同的子列表。每个运算符都会获得一个子列表,该子列表可以是空的,也可以包含一个或多个元素。例如,如果使用并行主义1,运算符的校验点状态包含元素element1element2 ,则当将并行性增加到2时,element1 可能最终出现在运算符实例0中,而element2将转到运算符element1

  • Union redistribution UNION再分配 : 每个运算符返回一个状态元素列表。整个状态在逻辑上是所有列表的连接。在恢复/重新分配时,每个运算符都会获得状态元素的完整列表。

下面是一个有状态的SinkFunction 示例,它在将元素发送到外部世界之前使用 CheckpointedFunction 缓冲元素。它演示了基本的均匀再分配列表状态:

  1. public class BufferingSink
  2. implements SinkFunction<Tuple2<String, Integer>>,
  3. CheckpointedFunction {
  4. private final int threshold;
  5. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  6. private List<Tuple2<String, Integer>> bufferedElements;
  7. public BufferingSink(int threshold) {
  8. this.threshold = threshold;
  9. this.bufferedElements = new ArrayList<>();
  10. }
  11. @Override
  12. public void invoke(Tuple2<String, Integer> value) throws Exception {
  13. bufferedElements.add(value);
  14. if (bufferedElements.size() == threshold) {
  15. for (Tuple2<String, Integer> element: bufferedElements) {
  16. // send it to the sink
  17. }
  18. bufferedElements.clear();
  19. }
  20. }
  21. @Override
  22. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  23. checkpointedState.clear();
  24. for (Tuple2<String, Integer> element : bufferedElements) {
  25. checkpointedState.add(element);
  26. }
  27. }
  28. @Override
  29. public void initializeState(FunctionInitializationContext context) throws Exception {
  30. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  31. new ListStateDescriptor<>(
  32. "buffered-elements",
  33. TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
  34. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  35. if (context.isRestored()) {
  36. for (Tuple2<String, Integer> element : checkpointedState.get()) {
  37. bufferedElements.add(element);
  38. }
  39. }
  40. }
  41. }
  1. class BufferingSink(threshold: Int = 0)
  2. extends SinkFunction[(String, Int)]
  3. with CheckpointedFunction {
  4. @transient
  5. private var checkpointedState: ListState[(String, Int)] = _
  6. private val bufferedElements = ListBuffer[(String, Int)]()
  7. override def invoke(value: (String, Int)): Unit = {
  8. bufferedElements += value
  9. if (bufferedElements.size == threshold) {
  10. for (element <- bufferedElements) {
  11. // send it to the sink
  12. }
  13. bufferedElements.clear()
  14. }
  15. }
  16. override def snapshotState(context: FunctionSnapshotContext): Unit = {
  17. checkpointedState.clear()
  18. for (element <- bufferedElements) {
  19. checkpointedState.add(element)
  20. }
  21. }
  22. override def initializeState(context: FunctionInitializationContext): Unit = {
  23. val descriptor = new ListStateDescriptor[(String, Int)](8b3a87fe6af0fa598190dc0771384747)]() {})
  24. )
  25. checkpointedState = context.getOperatorStateStore.getListState(descriptor)
  26. if(context.isRestored) {
  27. for(element <- checkpointedState.get()) {
  28. bufferedElements += element
  29. }
  30. }
  31. }
  32. }

initializeState方法以 FunctionInitializationContext作为参数。这用于初始化无键状态的“containers”。这是一个类型为 ListState 的容器,在该容器中,无键状态对象将在检查点时存储。

请注意如何初始化状态,类似于键控状态,使用StateDescriptor,其中包含状态名称和有关状态所持有值的类型的信息:

  1. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  2. new ListStateDescriptor<>(
  3. "buffered-elements",
  4. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
  5. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  1. val descriptor = new ListStateDescriptor[(String, Long)](b50d435b092f4f4cedcf03548a5c9e3c)]() {})
  2. )
  3. checkpointedState = context.getOperatorStateStore.getListState(descriptor)

状态访问方法的命名约定包含它的重新分布模式,然后是它的状态结构。例如,要在RESTORE上使用LIST状态和联合重新分配方案,可以使用getUnionListState(descriptor)访问状态。如果方法名称不包含重新分配模式,则仅意味着将使用基本的均匀再分配方案。

初始化容器后,我们使用上下文的isRestored()方法检查故障后是否正在恢复。如果这是true,_即_we正在恢复,则应用恢复逻辑。

如修改的BufferingSink的代码所示,在状态初始化过程中恢复的这个ListState保存在类变量中,以便将来在 snapshotState()中使用。在那里,ListState 被清除了以前检查点所包含的所有对象,然后用我们想要检查点的新对象来填充。

作为一个侧面注释,键控状态也可以在initializeState()方法中初始化。这可以使用提供的FunctionInitializationContext来完成。

ListCheckpointed 列表校验点

ListCheckpointed接口是CheckpointedFunction的一个更有限的变体,它只支持列表样式的状态,在还原时采用均匀分割的重新分配方案。它还要求采用两种方法:

  1. List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
  2. void restoreState(List<T> state) throws Exception;

snapshotState()上,操作符应该将对象列表返回给检查点,restoreState 在恢复时必须处理这样的列表。如果状态不可再分区,则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)

Stateful Source Functions 有状态源函数

与其他操作符相比,有状态源需要更多的注意。为了使状态和输出集合的更新是原子的(在失败/恢复时只需要一次语义),用户需要从源的上下文中获得一个锁。

  1. public static class CounterSource
  2. extends RichParallelSourceFunction<Long>
  3. implements ListCheckpointed<Long> {
  4. /** current offset for exactly once semantics */
  5. private Long offset;
  6. /** flag for job cancellation */
  7. private volatile boolean isRunning = true;
  8. @Override
  9. public void run(SourceContext<Long> ctx) {
  10. final Object lock = ctx.getCheckpointLock();
  11. while (isRunning) {
  12. // output and state update are atomic
  13. synchronized (lock) {
  14. ctx.collect(offset);
  15. offset += 1;
  16. }
  17. }
  18. }
  19. @Override
  20. public void cancel() {
  21. isRunning = false;
  22. }
  23. @Override
  24. public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
  25. return Collections.singletonList(offset);
  26. }
  27. @Override
  28. public void restoreState(List<Long> state) {
  29. for (Long s : state)
  30. offset = s;
  31. }
  32. }
  1. class CounterSource
  2. extends RichParallelSourceFunction[Long]
  3. with ListCheckpointed[Long] {
  4. @volatile
  5. private var isRunning = true
  6. private var offset = 0L
  7. override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
  8. val lock = ctx.getCheckpointLock
  9. while (isRunning) {
  10. // output and state update are atomic
  11. lock.synchronized({
  12. ctx.collect(offset)
  13. offset += 1
  14. })
  15. }
  16. }
  17. override def cancel(): Unit = isRunning = false
  18. override def restoreState(state: util.List[Long]): Unit =
  19. for (s <- state) {
  20. offset = s
  21. }
  22. override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
  23. Collections.singletonList(offset)
  24. }

当一个检查点被flink完全确认以与外部世界进行通信时,一些运营商可能需要该信息。在这种情况下,请参见 org.apache.flink.runtime.state.CheckpointListener 界面。