Working with State 工作状态


Keyed State and Operator State 键控状态和操作状态

在Flink中有两种基本的状态: Keyed StateOperator State

Keyed State

Keyed 状态

Keyed State 总是相对于键的,只能在 KeyedStream上的函数和运算符中使用。


密钥状态进一步被组织成所谓的Key Groups。键组是flink可以重新分配键状态的原子单元;与定义的最大并行度完全一样多的键组。在执行过程中,键操作器的每个并行实例与一个或多个键组的键一起工作。

Operator 状态

使用 Operator State (或 non-keyed state ),每个运算符状态都绑定到一个并行运算符实例。KafkaConnector是在FLink中使用运算符状态的良好激励示例。Kafka消费者的每个并行实例将主题分区和偏移的映射保持为其运营商状态。


Raw and Managed State 原始和托管状态

Keyed StateOperator State 以两种形式存在: managedraw

Managed State 在由FLink运行时控制的数据结构中表示,如内部哈希表或RocksDB。示例是“ValueState”, “ListState”等。flink的运行时对状态进行编码,并将它们写入检查点。

Raw State 是运算符保留在自己的数据结构中的状态。检查点时,它们只将一个字节序列写入检查点。flink对状态的数据结构一无所知,只看到原始字节。



Using Managed Keyed State 使用托管密钥状态



  • ValueState<T>:这将保存一个可以更新和检索的值(范围为上述输入元素的关键字,因此可能有一个值用于操作所看到的每个键)。可以使用 update(T) 来设置值,并使用 T value()检索该值。

  • ListState<T>: 这将保留元素的列表。可以在所有当前存储的元素上附加元素并检索Iterable。使用add(T)addAll(List<T>)添加元素,可使用Iterable<T> get()来检索可迭代的元素。也可以使用 update(List<T>)覆盖现有列表“”

  • ReducingState<T>:这保留了一个表示添加到状态的所有值的聚合的值。接口类似于 ListState ,但使用add(T)添加的元素将使用指定的ReduceFunction还原为聚合。

  • AggregatingState<IN, OUT>: 这保留了一个表示添加到状态的所有值的聚合的值。与 ReducingState相反,聚合类型可能与添加到状态的元素类型不同。接口与 ListState 相同,但使用add(IN) 添加的元素使用指定的 AggregateFunction进行聚合。

  • FoldingState<T, ACC>: 这保留了一个值,表示添加到状态的所有值的聚合。与 ReducingState相反,聚合类型可能与添加到状态的元素类型不同。接口类似于 ListState ,但是使用 add(T) 添加的元素使用指定的 FoldFunction折叠成一个聚合。

  • MapState<UK, UV>: 这保存了一个映射列表。您可以将键值对放入状态,并在所有当前存储的映射上检索 Iterable 。映射使用 put(UK, UV)putAll(Map<UK, UV>)添加。可以使用get(UK)检索与用户密钥相关的值。映射、键和值的可迭代视图可以分别使用entry()key()values()检索。

所有类型的状态都有一个方法 clear(),用于清除当前活动键的状态,即输入元素的键。

FoldingStateFoldingStateDescriptor 已在Flink 1.4中被废弃,并将在今后完全删除。请使用AggregatingStateAggregatingStateDescriptor


要获得状态句柄,必须创建一个StateDescriptor。这保存了状态的名称(我们稍后会看到,您可以创建几个状态,它们必须有唯一的名称,以便您可以引用它们)、状态所持有的值的类型,以及可能是用户指定的函数,例如ReduceFunction。根据要检索的状态类型,可以创建 ValueStateDescriptor, a ListStateDescriptor, a ReducingStateDescriptor, a FoldingStateDescriptor or a MapStateDescriptor

状态是使用RuntimeContext访问的,因此它只能在rich functions中使用。有关这方面的信息,请参阅here,但我们不久也将看到一个示例。在 RuntimeContext 中可用的RichFunction 具有以下访问状态的方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)


  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  2. /**
  3. * The ValueState handle. The first field is the count, the second field a running sum.
  4. */
  5. private transient ValueState<Tuple2<Long, Long>> sum;
  6. @Override
  7. public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
  8. // access the state value
  9. Tuple2<Long, Long> currentSum = sum.value();
  10. // update the count
  11. currentSum.f0 += 1;
  12. // add the second field of the input value
  13. currentSum.f1 += input.f1;
  14. // update the state
  15. sum.update(currentSum);
  16. // if the count reaches 2, emit the average and clear the state
  17. if (currentSum.f0 >= 2) {
  18. out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
  19. sum.clear();
  20. }
  21. }
  22. @Override
  23. public void open(Configuration config) {
  24. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  25. new ValueStateDescriptor<>(
  26. "average", // the state name
  27. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
  28. Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
  29. sum = getRuntimeContext().getState(descriptor);
  30. }
  31. }
  32. // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
  33. env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
  34. .keyBy(0)
  35. .flatMap(new CountWindowAverage())
  36. .print();
  37. // the printed output will be (1,4) and (1,5)
  1. class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
  2. private var sum: ValueState[(Long, Long)] = _
  3. override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
  4. // access the state value
  5. val tmpCurrentSum = sum.value
  6. // If it hasn't been used before, it will be null
  7. val currentSum = if (tmpCurrentSum != null) {
  8. tmpCurrentSum
  9. } else {
  10. (0L, 0L)
  11. }
  12. // update the count
  13. val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
  14. // update the state
  15. sum.update(newSum)
  16. // if the count reaches 2, emit the average and clear the state
  17. if (newSum._1 >= 2) {
  18. out.collect((input._1, newSum._2 / newSum._1))
  19. sum.clear()
  20. }
  21. }
  22. override def open(parameters: Configuration): Unit = {
  23. sum = getRuntimeContext.getState(
  24. new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
  25. )
  26. }
  27. }
  28. object ExampleCountWindowAverage extends App {
  29. val env = StreamExecutionEnvironment.getExecutionEnvironment
  30. env.fromCollection(List(
  31. (1L, 3L),
  32. (1L, 5L),
  33. (1L, 7L),
  34. (1L, 4L),
  35. (1L, 2L)
  36. )).keyBy(_._1)
  37. .flatMap(new CountWindowAverage())
  38. .print()
  39. // the printed output will be (1,4) and (1,5)
  40. env.execute("ExampleManagedState")
  41. }

这个例子实现了一个穷人的计数窗口。我们用第一个字段来键入元组(在示例中,所有元组都有相同的键 1)。函数将计数和运行的和存储在“ValueState”中。一旦计数达到2,它将发出平均值并清除状态,以便我们从 0开始。注意,如果在第一个字段中有不同值的元组,这将为每个不同的输入键保留不同的状态值。

State Time-To-Live (TTL) 状态生存时间(TTL)

a time-to-live (Ttl)可以分配给任意类型的键控状态。如果配置了一个TTL,并且状态值已经过期,则将在尽最大努力的基础上清理存储的值,下文将对此进行更详细的讨论。


为了使用状态TTL,必须首先构建一个StateTtlConfig 配置对象。然后,通过传递配置,可以在任何状态描述符中启用TTL功能:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.common.time.Time;
  4. StateTtlConfig ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1))
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build();
  9. ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
  10. stateDescriptor.enableTimeToLive(ttlConfig);
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. import org.apache.flink.api.common.state.ValueStateDescriptor
  3. import org.apache.flink.api.common.time.Time
  4. val ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1))
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build
  9. val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
  10. stateDescriptor.enableTimeToLive(ttlConfig)


newBuilder 方法的第一个参数是强制性的,它是实时值。

更新类型在刷新状态TTL时配置(默认为 OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入权限时
  • StateTtlConfig.UpdateType.OnReadAndWrite - 也是关于读访问


  • StateTtlConfig.StateVisibility.NeverReturnExpired - 过期的值永远不会返回。
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用的话返回


另一个选项ReturnExpiredIfNotCleanedUp 允许在清理之前返回过期状态。


  • 状态后端将上次修改的时间戳与用户值一起存储,这意味着启用此功能会增加状态存储的消耗。堆状态后端存储具有对用户状态对象的引用和在存储器中的原始长值的附加Java对象。ROCKSDB状态后端根据存储的值、列表条目或映射条目添加8个字节。

  • 当前只支持引用 processing time 的TTL。

  • 尝试还原以前没有TTL配置的状态,使用启用TTL的描述符(反之亦然)将导致兼容性失败和 StateMigrationException

  • TTL配置不是Check-或Savepoint的一部分,而是Flink在当前运行的作业中如何对待它的一种方式。

  • 只有当用户值序列化程序能够处理空值时,TTL的映射状态才支持空用户值。如果序列化程序不支持空值,则可以使用 NullableSerializer 包装它,代价是序列化形式中的额外字节。

Cleanup of Expired State 清除过期状态



此外,您可以在获取将减小其大小的完整状态快照时激活清理。在当前实现下不清除本地状态,但在从上一个快照恢复的情况下,它将不包括已删除的过期状态。它可以在 StateTtlConfig中配置:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. import org.apache.flink.api.common.time.Time;
  3. StateTtlConfig ttlConfig = StateTtlConfig
  4. .newBuilder(Time.seconds(1))
  5. .cleanupFullSnapshot()
  6. .build();
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. import org.apache.flink.api.common.time.Time
  3. val ttlConfig = StateTtlConfig
  4. .newBuilder(Time.seconds(1))
  5. .cleanupFullSnapshot
  6. .build



State in the Scala DataStream API Scala 数据流API中的状态

除了上面描述的接口之外,Scala API还提供了有状态的map()flatMap()函数的快捷方式,其中只有一个KeyedStream上的ValueState 函数。用户函数在 Option 中获取 ValueState 的当前值,并且必须返回将用于更新状态的更新值。

  1. val stream: DataStream[(String, Int)] = ...
  2. val counts: DataStream[(String, Int)] = stream
  3. .keyBy(_._1)
  4. .mapWithState((in: (String, Int), count: Option[Int]) =>
  5. count match {
  6. case Some(c) => ( (in._1, c), Some(c + in._2) )
  7. case None => ( (in._1, 0), Some(in._2) )
  8. })

Using Managed Operator State 使用托管运营商状态

To use managed operator state, a stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed&lt;T extends Serializable&gt; interface. 要使用托管操作符状态,有状态函数可以实现更通用的 CheckpointedFunction 接口,也可以实现 ListCheckpointed&lt;T extends Serializable&gt;接口。

CheckpointedFunction 校验点函数


  1. void snapshotState(FunctionSnapshotContext context) throws Exception;
  2. void initializeState(FunctionInitializationContext context) throws Exception;

必须执行检查点时,调用snapshotState()。对应的initializeState()在每次用户定义的函数被初始化时被调用,当函数首先被初始化时,或者当函数实际从较早的检查点恢复时。因此, initializeState() 不仅是不同类型状态被初始化的地方,而且还包括其中包括状态恢复逻辑的地方。

当前,支持列表样式的托管运算符状态。该状态应为 serializable object的 List,彼此独立,因此在重新调用时符合重新分发的条件。换句话说,这些对象是可以重新分配非键控状态的最佳粒度。根据状态访问方法,定义了以下重新分配方案:

  • Even-split redistribution 偶数再分配: 每个操作符返回一个状态元素列表。整个状态在逻辑上是所有列表的连接。在恢复/重新分配时,列表被平均地划分为与并行运算符相同的子列表。每个运算符都会获得一个子列表,该子列表可以是空的,也可以包含一个或多个元素。例如,如果使用并行主义1,运算符的校验点状态包含元素element1element2 ,则当将并行性增加到2时,element1 可能最终出现在运算符实例0中,而element2将转到运算符element1

  • Union redistribution UNION再分配 : 每个运算符返回一个状态元素列表。整个状态在逻辑上是所有列表的连接。在恢复/重新分配时,每个运算符都会获得状态元素的完整列表。

下面是一个有状态的SinkFunction 示例,它在将元素发送到外部世界之前使用 CheckpointedFunction 缓冲元素。它演示了基本的均匀再分配列表状态:

  1. public class BufferingSink
  2. implements SinkFunction<Tuple2<String, Integer>>,
  3. CheckpointedFunction {
  4. private final int threshold;
  5. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  6. private List<Tuple2<String, Integer>> bufferedElements;
  7. public BufferingSink(int threshold) {
  8. this.threshold = threshold;
  9. this.bufferedElements = new ArrayList<>();
  10. }
  11. @Override
  12. public void invoke(Tuple2<String, Integer> value) throws Exception {
  13. bufferedElements.add(value);
  14. if (bufferedElements.size() == threshold) {
  15. for (Tuple2<String, Integer> element: bufferedElements) {
  16. // send it to the sink
  17. }
  18. bufferedElements.clear();
  19. }
  20. }
  21. @Override
  22. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  23. checkpointedState.clear();
  24. for (Tuple2<String, Integer> element : bufferedElements) {
  25. checkpointedState.add(element);
  26. }
  27. }
  28. @Override
  29. public void initializeState(FunctionInitializationContext context) throws Exception {
  30. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  31. new ListStateDescriptor<>(
  32. "buffered-elements",
  33. TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
  34. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  35. if (context.isRestored()) {
  36. for (Tuple2<String, Integer> element : checkpointedState.get()) {
  37. bufferedElements.add(element);
  38. }
  39. }
  40. }
  41. }
  1. class BufferingSink(threshold: Int = 0)
  2. extends SinkFunction[(String, Int)]
  3. with CheckpointedFunction {
  4. @transient
  5. private var checkpointedState: ListState[(String, Int)] = _
  6. private val bufferedElements = ListBuffer[(String, Int)]()
  7. override def invoke(value: (String, Int)): Unit = {
  8. bufferedElements += value
  9. if (bufferedElements.size == threshold) {
  10. for (element <- bufferedElements) {
  11. // send it to the sink
  12. }
  13. bufferedElements.clear()
  14. }
  15. }
  16. override def snapshotState(context: FunctionSnapshotContext): Unit = {
  17. checkpointedState.clear()
  18. for (element <- bufferedElements) {
  19. checkpointedState.add(element)
  20. }
  21. }
  22. override def initializeState(context: FunctionInitializationContext): Unit = {
  23. val descriptor = new ListStateDescriptor[(String, Int)](
  24. "buffered-elements",
  25. TypeInformation.of(new TypeHint[(String, Int)]() {})
  26. )
  27. checkpointedState = context.getOperatorStateStore.getListState(descriptor)
  28. if(context.isRestored) {
  29. for(element <- checkpointedState.get()) {
  30. bufferedElements += element
  31. }
  32. }
  33. }
  34. }

initializeState方法以 FunctionInitializationContext作为参数。这用于初始化无键状态的“containers”。这是一个类型为 ListState 的容器,在该容器中,无键状态对象将在检查点时存储。


  1. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  2. new ListStateDescriptor<>(
  3. "buffered-elements",
  4. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
  5. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  1. val descriptor = new ListStateDescriptor[(String, Long)](
  2. "buffered-elements",
  3. TypeInformation.of(new TypeHint[(String, Long)]() {})
  4. )
  5. checkpointedState = context.getOperatorStateStore.getListState(descriptor)



如修改的BufferingSink的代码所示,在状态初始化过程中恢复的这个ListState保存在类变量中,以便将来在 snapshotState()中使用。在那里,ListState 被清除了以前检查点所包含的所有对象,然后用我们想要检查点的新对象来填充。


ListCheckpointed 列表校验点


  1. List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
  2. void restoreState(List<T> state) throws Exception;

snapshotState()上,操作符应该将对象列表返回给检查点,restoreState 在恢复时必须处理这样的列表。如果状态不可再分区,则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)

Stateful Source Functions 有状态源函数


  1. public static class CounterSource
  2. extends RichParallelSourceFunction<Long>
  3. implements ListCheckpointed<Long> {
  4. /** current offset for exactly once semantics */
  5. private Long offset;
  6. /** flag for job cancellation */
  7. private volatile boolean isRunning = true;
  8. @Override
  9. public void run(SourceContext<Long> ctx) {
  10. final Object lock = ctx.getCheckpointLock();
  11. while (isRunning) {
  12. // output and state update are atomic
  13. synchronized (lock) {
  14. ctx.collect(offset);
  15. offset += 1;
  16. }
  17. }
  18. }
  19. @Override
  20. public void cancel() {
  21. isRunning = false;
  22. }
  23. @Override
  24. public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
  25. return Collections.singletonList(offset);
  26. }
  27. @Override
  28. public void restoreState(List<Long> state) {
  29. for (Long s : state)
  30. offset = s;
  31. }
  32. }
  1. class CounterSource
  2. extends RichParallelSourceFunction[Long]
  3. with ListCheckpointed[Long] {
  4. @volatile
  5. private var isRunning = true
  6. private var offset = 0L
  7. override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
  8. val lock = ctx.getCheckpointLock
  9. while (isRunning) {
  10. // output and state update are atomic
  11. lock.synchronized({
  12. ctx.collect(offset)
  13. offset += 1
  14. })
  15. }
  16. }
  17. override def cancel(): Unit = isRunning = false
  18. override def restoreState(state: util.List[Long]): Unit =
  19. for (s <- state) {
  20. offset = s
  21. }
  22. override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
  23. Collections.singletonList(offset)
  24. }

当一个检查点被flink完全确认以与外部世界进行通信时,一些运营商可能需要该信息。在这种情况下,请参见 org.apache.flink.runtime.state.CheckpointListener 界面。