KeyedState和OperatorState

在Flink中,状态分为2种:

  • OperatorState:和Operator的一个特定的并行实例相绑定,比如Kafka Connector中,每一个并行的Kafka Consumer都在Operator State中维护当前Consumer订阅的partition和offset。
  • KeyedState:和具体的Key关联,只能在 KeyedStream上使用。由于Flink中的 keyBy操作保证了每一个Key相关联的所有消息都会发送给下游算子的同一个并行实例处理,因此KeyedState也可以看作是OperatorState的一种分区(partitioned)形式,每一个键都关联一个状态分区(state-partition)。

无论是KeyedState还是OperatorState,都有两种形式:Managed State和Raw State。

  • Managed State的数据结构由Flink进行管理。有ValueState,ListState,ReducingState,AggregatingState,MapState
  • Raw State的数据结构对Flink是透明的,做checkpoint时,只会写入一系列二进制流。

建议使用Managed State,这样Flink可以在并行度改变的情况下重新分布状态,也可以更好地进行内存管理。

State Time-To-Live (TTL)

所有集合类型的state都支持per-entry TTL,即List中的元素和Map中的entries都能分别过期。

  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. import org.apache.flink.api.common.state.ValueStateDescriptor
  3. import org.apache.flink.api.common.time.Time
  4. val ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1)) // 设定过期时间
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 何时更新TTL
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build
  9. // 每一个State需要指定唯一的名字。
  10. val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
  11. stateDescriptor.enableTimeToLive(ttlConfig)

注:

  • TTL只支持processing time
  • TTL的配置不会被存在checkpoint/savepoint中。但如果开启了TTL,会额外存储每个entry的最近更新时间。如果之前没配置TTL,恢复checkpoint的程序配置了,或者反之,就会报StateMigrationException。
  • 默认情况下,过期的state值会在读的时候被清除,也会定期被后台线程回收。

RocksDB的清理

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. StateTtlConfig ttlConfig = StateTtlConfig
  3. .newBuilder(Time.seconds(1))
  4. .cleanupInRocksdbCompactFilter(1000)
  5. .build();

RocksDB会定期运行异步的compaction filter来清理过期state。

在处理指定数量的state entries之后,RocksDB compaction filter会用当前时间戳来检查过期。数量用 cleanupInRocksdbCompactFilter设置,默认为1000。

Managed Operator State

一个有状态的自定义方法可继承 CheckpointedFunction

  1. public interface CheckpointedFunction {
  2. // 创建检查点时被调用
  3. void snapshotState(FunctionSnapshotContext context) throws Exception;
  4. // 初始化时调用(从检查点恢复状态时也会调用)
  5. // 通过FunctionInitializationContext
  6. void initializeState(FunctionInitializationContext context) throws Exception;
  7. }