在KeyedProcessFunction 和 ProcessWindowFunction中,我们常常会定义一些KeyedState状态,而这些状态跟key和窗口是一一对应的,一个key第一次到达状态算子后产生状态数据,后续没有对应key的数据到达算子,假如不对状态进行清理,内存遭不住。所以设置StateTtlConfig来进行一些状态的后台定时清理。
Flink官网详细解释:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/state/
StateTtlConfig设置保留时长,时间到后并不是立即清理掉过期的状态数据;而是需要用户指定清理的时机即:
.cleanupFullSnapshot() //清理状态数据的时机,这个不适用RockDB State
.cleanupInRocksdbCompactFilter(1000)//清理状态数据的时机
当上游数据到后后访问的过期的数据时(还未被清理),可以设置是否返回过期数据:
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //是否读取已过期的状态数据
@Slf4jpublic static class LotDayReportDuplicateKeyProcessFunctionextends KeyedProcessFunction<String, Tuple2<String, LotDayReport>, LotDayReport> {// <车场id|table|id|type,System.currentTimeMillis()>private transient MapState<String, Long> lotTableIdMapState;@Overridepublic void open(Configuration parameters) {StateTtlConfig stateTtlConfig =StateTtlConfig.newBuilder(Time.minutes(1)) //保留时长.updateTtlOnReadAndWrite() //更新过期时间的时机.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //是否读取已过期的状态数据//.cleanupFullSnapshot() //清理状态数据的时机,这个不适用RockDB State.cleanupInRocksdbCompactFilter(1000)//RockDB State清理状态数据的时机.build();MapStateDescriptor<String, Long> mapStateDescriptor =new MapStateDescriptor<>("lotTableIdMapState", String.class, Long.class);mapStateDescriptor.enableTimeToLive(stateTtlConfig);this.lotTableIdMapState = getRuntimeContext().getMapState(mapStateDescriptor);}
