• 使用richFunction、processFunction来进行状态编程,processFunction独有实现注册定时器的功能

    flink有两种状态

    原始状态

  • 需要自己管理:实现序列化、故障恢复

  • 当托管状态无法满足时 才考虑原始状态,一般不考虑

    托管状态

  • 状态与算子相关联

  • Flink进行统一管理,包括状态一致性、故障处理、高效存储访问,我们只需要调用接口就可以
  • 需要继承Rich方法,用getRuntimeContext创建状态

    算子状态 operator state
  • 一个分区维护一个状态(分区:算子子任务数、并行度)

  • 提供三种基本数据结构:

    • list state、
    • union list state、
    • broadcast state
      键控状态 keyed state
  • 一个key维护一个状态。(相同key都在同一分区里)

  • 只能应用于keyedStream
  • 提供多种数据结构

    • valueState:保存单个的值:
      • valueState.value()
      • valueState.update(T value)
    • ListState:保存一个列表
      • ListState.add(T value)
      • ListState.addAll(List values)
      • ListState.get()返回 iterable
      • ListState.update(list values)
    • MapState:保存key-value
      • MapState.get(K key)
      • MapState.put(K key, V value)
      • MapState.contains(K key)
      • MapState.remove(K key)
    • ReducingState
    • AggregatingState

      状态的生存时间TTL

  • 我们可以在程序中 .clear()方法直接清除,但是有时不能这样。 这时就需要配置一个TTL ```java // 配置状态的TTL StateTtlConfig ttlConfig = StateTtlConfig // 设置状态存活时间 .newBuilder(Time.hours(1)) // 何时更新状态失效时间,OnReadAndWrite每次读和写都更新、OnCreateAndWrite创建和每次写更新 .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 因为状态失效并不是立即清理的,所以失效后再访问是可以访问到的。ReturnExpiredIfNotCleanedUp过期状态还存在则可访问 NeverReturnExpired过期状态即使存在也不可访问 .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) .build();

ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>(“my-state”, Event.class); valueStateDescriptor.enableTimeToLive(ttlConfig);

  1. ---
  2. <a name="YaJ4i"></a>
  3. ### 状态后端
  4. - 负责两件事:
  5. - 状态的管理:存储、访问、维护
  6. - 检查点(checkpoint)的存储
  7. <a name="Kmv1B"></a>
  8. #### 两种状态后端
  9. - HashMapStateBackend:状态存储在taskManager内存、checkpoint存储在文件系统
  10. - EmbeddedRocksDBStateBackend:状态存储在RocksDBcheckpoint存储在RocksDB
  11. - 优点:唯一实现增量式 保存检查点、可实现超大数据的存储
  12. - 增量快照中只包含自上一次快照完成之后被修改的记录,和不执行barrier对齐是两个东西
  13. - 缺点:实现序列化 读写性能比内存慢一个数量级
  14. <a name="kGo9m"></a>
  15. #### demo
  16. ```java
  17. flink-conf.yaml
  18. # 默认状态后端hashmap、rocksdb
  19. state.backend: hashmap
  20. # 存放检查点的文件路径
  21. state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
  22. //设置状态后端
  23. env.setStateBackend(new HashMapStateBackend());
  24. env.setStateBackend(new EmbeddedRocksDBStateBackend());
  25. <dependency>
  26. <groupId>org.apache.flink</groupId>
  27. <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
  28. <version>1.13.0</version>
  29. </dependency>