使用richFunction、processFunction来进行状态编程,processFunction独有实现注册定时器的功能
flink有两种状态
原始状态
需要自己管理:实现序列化、故障恢复
-
托管状态
状态与算子相关联
- Flink进行统一管理,包括状态一致性、故障处理、高效存储访问,我们只需要调用接口就可以
需要继承Rich方法,用getRuntimeContext创建状态
算子状态 operator state
一个分区维护一个状态(分区:算子子任务数、并行度)
提供三种基本数据结构:
一个key维护一个状态。(相同key都在同一分区里)
- 只能应用于keyedStream
提供多种数据结构
- valueState
:保存单个的值: - valueState.value()
- valueState.update(T value)
- ListState
:保存一个列表 - ListState.add(T value)
- ListState.addAll(List
values) - ListState.get()返回 iterable
- ListState.update(list
values)
- MapState
:保存key-value - MapState.get(K key)
- MapState.put(K key, V value)
- MapState.contains(K key)
- MapState.remove(K key)
- ReducingState
- AggregatingState
状态的生存时间TTL
- valueState
我们可以在程序中 .clear()方法直接清除,但是有时不能这样。 这时就需要配置一个TTL ```java // 配置状态的TTL StateTtlConfig ttlConfig = StateTtlConfig // 设置状态存活时间 .newBuilder(Time.hours(1)) // 何时更新状态失效时间,OnReadAndWrite每次读和写都更新、OnCreateAndWrite创建和每次写更新 .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 因为状态失效并不是立即清理的,所以失效后再访问是可以访问到的。ReturnExpiredIfNotCleanedUp过期状态还存在则可访问 NeverReturnExpired过期状态即使存在也不可访问 .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) .build();
ValueStateDescriptor
---<a name="YaJ4i"></a>### 状态后端- 负责两件事:- 状态的管理:存储、访问、维护- 检查点(checkpoint)的存储<a name="Kmv1B"></a>#### 两种状态后端- HashMapStateBackend:状态存储在taskManager内存、checkpoint存储在文件系统- EmbeddedRocksDBStateBackend:状态存储在RocksDB、checkpoint存储在RocksDB- 优点:唯一实现增量式 保存检查点、可实现超大数据的存储- 增量快照中只包含自上一次快照完成之后被修改的记录,和不执行barrier对齐是两个东西- 缺点:实现序列化 读写性能比内存慢一个数量级<a name="kGo9m"></a>#### demo```javaflink-conf.yaml# 默认状态后端hashmap、rocksdbstate.backend: hashmap# 存放检查点的文件路径state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints//设置状态后端env.setStateBackend(new HashMapStateBackend());env.setStateBackend(new EmbeddedRocksDBStateBackend());<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>1.13.0</version></dependency>
