在KeyedProcessFunction 和 ProcessWindowFunction中,我们常常会定义一些KeyedState状态,而这些状态跟key和窗口是一一对应的,一个key第一次到达状态算子后产生状态数据,后续没有对应key的数据到达算子,假如不对状态进行清理,内存遭不住。所以设置StateTtlConfig来进行一些状态的后台定时清理。
Flink官网详细解释:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/state/
StateTtlConfig设置保留时长,时间到后并不是立即清理掉过期的状态数据;而是需要用户指定清理的时机即:
.cleanupFullSnapshot() //清理状态数据的时机,这个不适用RockDB State
.cleanupInRocksdbCompactFilter(1000)//清理状态数据的时机
当上游数据到后后访问的过期的数据时(还未被清理),可以设置是否返回过期数据:
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //是否读取已过期的状态数据
@Slf4j
public static class LotDayReportDuplicateKeyProcessFunction
extends KeyedProcessFunction<String, Tuple2<String, LotDayReport>, LotDayReport> {
// <车场id|table|id|type,System.currentTimeMillis()>
private transient MapState<String, Long> lotTableIdMapState;
@Override
public void open(Configuration parameters) {
StateTtlConfig stateTtlConfig =
StateTtlConfig.newBuilder(Time.minutes(1)) //保留时长
.updateTtlOnReadAndWrite() //更新过期时间的时机
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //是否读取已过期的状态数据
//.cleanupFullSnapshot() //清理状态数据的时机,这个不适用RockDB State
.cleanupInRocksdbCompactFilter(1000)//RockDB State清理状态数据的时机
.build();
MapStateDescriptor<String, Long> mapStateDescriptor =
new MapStateDescriptor<>("lotTableIdMapState", String.class, Long.class);
mapStateDescriptor.enableTimeToLive(stateTtlConfig);
this.lotTableIdMapState = getRuntimeContext().getMapState(mapStateDescriptor);
}