在KeyedProcessFunction 和 ProcessWindowFunction中,我们常常会定义一些KeyedState状态,而这些状态跟key和窗口是一一对应的,一个key第一次到达状态算子后产生状态数据,后续没有对应key的数据到达算子,假如不对状态进行清理,内存遭不住。所以设置StateTtlConfig来进行一些状态的后台定时清理。

    Flink官网详细解释:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/state/

    StateTtlConfig设置保留时长,时间到后并不是立即清理掉过期的状态数据;而是需要用户指定清理的时机即:

    .cleanupFullSnapshot() //清理状态数据的时机,这个不适用RockDB State

    .cleanupInRocksdbCompactFilter(1000)//清理状态数据的时机

    当上游数据到后后访问的过期的数据时(还未被清理),可以设置是否返回过期数据:

    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //是否读取已过期的状态数据

    1. @Slf4j
    2. public static class LotDayReportDuplicateKeyProcessFunction
    3. extends KeyedProcessFunction<String, Tuple2<String, LotDayReport>, LotDayReport> {
    4. // <车场id|table|id|type,System.currentTimeMillis()>
    5. private transient MapState<String, Long> lotTableIdMapState;
    6. @Override
    7. public void open(Configuration parameters) {
    8. StateTtlConfig stateTtlConfig =
    9. StateTtlConfig.newBuilder(Time.minutes(1)) //保留时长
    10. .updateTtlOnReadAndWrite() //更新过期时间的时机
    11. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //是否读取已过期的状态数据
    12. //.cleanupFullSnapshot() //清理状态数据的时机,这个不适用RockDB State
    13. .cleanupInRocksdbCompactFilter(1000)//RockDB State清理状态数据的时机
    14. .build();
    15. MapStateDescriptor<String, Long> mapStateDescriptor =
    16. new MapStateDescriptor<>("lotTableIdMapState", String.class, Long.class);
    17. mapStateDescriptor.enableTimeToLive(stateTtlConfig);
    18. this.lotTableIdMapState = getRuntimeContext().getMapState(mapStateDescriptor);
    19. }