

为了能够在创建 Savepoint 过程中,唯一识别对应的 Operator 的状态数据,Flink 提供了 API 来为程序中每个 Operator 设置 ID,这样可以在后续更新 / 升级程序的时候,可以在 Savepoint 数据中基于 Operator ID 来与对应的状态信息进行匹配,从而实现恢复。当然,如果我们不指定 Operator ID,Flink 也会我们自动生成对应的 Operator 状态 ID。
强烈建议手动为每个 Operator 设置 ID,即使未来 Flink 应用程序可能会改动很大,比如替换原来的 Operator 实现、增加新的 Operator、删除 Operator 等等,至少我们有可能与 Savepoint 中存储的 Operator 状态对应上。另外,保存的 Savepoint 状态数据,毕竟是基于当时程序及其内存数据结构生成的,所以如果未来 Flink 程序改动比较大,尤其是对应的需要操作的内存数据结构都变化了,可能根本就无法从原来旧的 Savepoint 正确地恢复。
DataStream<String> stream = env.// Stateful source (e.g. Kafka) with ID.addSource(new StatefulSource()).uid("source-id") // ID for the source operator.shuffle()// Stateful mapper with ID.map(new StatefulMapper()).uid("mapper-id") // ID for the mapper// Stateless printing sink.print(); // Auto-generated ID
从 Flink1.10 开始,Flink 自动管理 RocksDB 的内存。Flink 使用 RocksDB 作为状态后端时,状态将作为序列化字节串存在于堆外内存(off-heap)存储或本地磁盘中。RocksDB 底层基于 LSM 树作为索引结构的 KV 存储引擎。键由 Key 的序列化字节串组成,而值由状态的序列化字节组成。每次注册 KV 状态时,它都会映射到列族,并将键值对以字节串存储在 RocksDB 中,这意味着每次读写(READ or WRITE)操作都必须对数据进行序列化或反序列化。
使用 RocksDB 作为状态后端有以下优点:
- 不受 Java GC 影响,与堆内存状态后端相比,使用 RocksDB 内存开销更小。
- 目前唯一支持增量检查点(incremental checkpointing)选项。
- 状态大小仅受限于本地可用的磁盘空间大小。
RocksDB 写操作
- 把数据写入到内存的
MemTable中。当MemTable满时,它将成为READ ONLYMemTable,并被一个新的MemTable替换。 - 只读的
MemTable被后台线程周期性地刷新到磁盘中,生成按键排序的只读文件,称为 SSTables。 SSTable也是不可变的,后台线程通过多路归并实现进一步整合。
对于 RocksDB,每个状态都是一个列族,这意味着每个状态都包含自己的 MemTables和 SSTables集。
https://www.ververica.com/blog/manage-rocksdb-memory-size-apache-flink
RocksDB 读操作
- 访问活动内存表(Active Memory Table)进行反馈查询。如果找到对应的
key,则读取操作将由新到旧依次访问。直到找到待查询的key为止。 - 如果任何
MemTable中都找不到对应的key,那么读操作将访问SSTables,再次从最新的开始读。 读取
SSTables步骤如下:block_cache_size。最终将控制内存中缓存的最大未压缩块数。随着块数的增加,内存也随之增加。因此,通过预先配置块的数量,可以保持固定的内存消耗水平。write_buffer_size。最终将控制 RocksDB 中 MemTable 的最大值。活跃 MemTables 和只读的 MemTables 最终会影响 RocksDB 中的内存大小,所以可以提前调整。max_write_buffer_number。在 RocksDB 将 MemTables 导出到磁盘上的 SSTables 之前,此配置决定并控制着内存中保留的 MemTables 的最大数量。这实际上是内存中只读内存表的最大数量。
