KeyedState和OperatorState
在Flink中,状态分为2种:
- OperatorState:和Operator的一个特定的并行实例相绑定,比如Kafka Connector中,每一个并行的Kafka Consumer都在Operator State中维护当前Consumer订阅的partition和offset。
- KeyedState:和具体的Key关联,只能在
KeyedStream上使用。由于Flink中的keyBy操作保证了每一个Key相关联的所有消息都会发送给下游算子的同一个并行实例处理,因此KeyedState也可以看作是OperatorState的一种分区(partitioned)形式,每一个键都关联一个状态分区(state-partition)。
无论是KeyedState还是OperatorState,都有两种形式:Managed State和Raw State。
- Managed State的数据结构由Flink进行管理。有ValueState
,ListState ,ReducingState ,AggregatingState ,MapState - Raw State的数据结构对Flink是透明的,做checkpoint时,只会写入一系列二进制流。
建议使用Managed State,这样Flink可以在并行度改变的情况下重新分布状态,也可以更好地进行内存管理。
State Time-To-Live (TTL)
所有集合类型的state都支持per-entry TTL,即List中的元素和Map中的entries都能分别过期。
import org.apache.flink.api.common.state.StateTtlConfigimport org.apache.flink.api.common.state.ValueStateDescriptorimport org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)) // 设定过期时间.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 何时更新TTL.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build// 每一个State需要指定唯一的名字。val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])stateDescriptor.enableTimeToLive(ttlConfig)
注:
- TTL只支持processing time
- TTL的配置不会被存在checkpoint/savepoint中。但如果开启了TTL,会额外存储每个entry的最近更新时间。如果之前没配置TTL,恢复checkpoint的程序配置了,或者反之,就会报StateMigrationException。
- 默认情况下,过期的state值会在读的时候被清除,也会定期被后台线程回收。
RocksDB的清理
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000).build();
RocksDB会定期运行异步的compaction filter来清理过期state。
在处理指定数量的state entries之后,RocksDB compaction filter会用当前时间戳来检查过期。数量用 cleanupInRocksdbCompactFilter设置,默认为1000。
Managed Operator State
一个有状态的自定义方法可继承 CheckpointedFunction。
public interface CheckpointedFunction {// 创建检查点时被调用void snapshotState(FunctionSnapshotContext context) throws Exception;// 初始化时调用(从检查点恢复状态时也会调用)// 通过FunctionInitializationContextvoid initializeState(FunctionInitializationContext context) throws Exception;}
