状态后端
用 Data Stream API 编写的流处理程序,通常要保存不同形式的状态信息: Programs written in the Data Stream API often hold state in various forms:
- 窗口被触发之前收集或聚合的元素
- 转换函数也许用到key/value状态接口来存储values
- 转换函数也许实现了Checkpointed接口,使得本地变量具有容错的能力
可以在流处理API中查看 Working with State. See also Working with State in the streaming API guide.
当检查点功能被激活后,以上的这些基于特定检查点的状态将被持久化,以防止数据丢失或恢复到一致性状态。状态在内部如何被描述,如何被持久化以及持久化到哪里,这依赖于选择的状态后端。
可用的状态后端
开箱即用,Flink提供了这些状态后端:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
不作任何配置的情况下,系统默认使用MemoryStateBackend。
MemoryStateBackend
MemoryStateBackend以对象的形式保存数据到java堆中。Key/value状态和窗口运算中的数据,用hash表来存储值、触发器等信息。
基于checkpoints接口的方式,状态后端将对状态进行快照,并作为检查点的一部分,发送通知给JobManager (master),这些数据同样存储在java堆中。
MemoryStateBackend 的限制:
- 每个独立的状态大小上限是5MB,这个值可以通过MemoryStateBackend的构造函数来增加。
- 不考虑可配置的状态的最大值,状态的大小不能大于akka框架的。 (看 Configuration).
- 聚合的状态必须匹配到JobManager的内存.
MemoryStateBackend 鼓励用于:
- 本地开发或调试
- 拥有很小状态的Jobs, 例如那种一次包含一个记录的函数 (Map, FlatMap, Filter, …). Kafka Consumer 就需要很小的状态.
FsStateBackend
FsStateBackend 可以通过一个文件系统的URL来配置 (type, address, path), 例如 “hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”. The FsStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
FsStateBackend保存TaskManager内存中的运行数据. 基于检查点功能, 它将包含状态的快照信息写入配置好的文件系统目录中. 最小的元数据信息被存储在JobManager的内存中 (或者在高可用模式下, 存储到元数据检查点中).
FsStateBackend鼓励用于: The FsStateBackend is encouraged for:
- 拥有大的状态的Jobs, 很长的窗口, 很大的key/value状态.
- 所有高可用安装模式.
- Jobs with large state, long windows, large key/value states.
- All high-availability setups.
RocksDBStateBackend
RocksDBStateBackend可以通过一个文件系统的URL来配置 (type, address, path), 例如 “hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”. The RocksDBStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.
RocksDBStateBackend保存TaskManager中的运行数据,存储到RocksDB 数据库中. 基于检查点功能, 所有RocksDB数据库的数据将通过检查点保存到文件系统目录中. 最小的元数据信息被存储到JobManager的内存中 (或者在高可用模式下, 存储到元数据检查点中).
RocksDBStateBackend鼓励用于:
- 拥有非常大状态的Jobs, 很长的窗口, 很大的key/value状态.
- 所有高可用安装模式.
注意,RocksDBStateBackend状态的大小仅仅受限于磁盘空间的可用度。 相对于存储状态到内存的FsStateBackend,这允许保存非常大的状态。 这同样意味着,可以达到的最大的吞吐量将低于状态后端的。
配置一个状态后端
Configuring a State Backend
可以为每个job配置状态后端。而且,你可以定义一个默认的状态后端,以供当在job中没有明确定义状态后端时使用。
为每一个job配置状态后端
Setting the Per-job State Backend
每个job的状态后端可以在StreamExecutionEnvironment
中设置,就像下面展示的例子:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
设置默认的状态后端
一个默认的状态后端可以在flink-conf.yaml
配置文件中使用state.backend
配置关键字来配置。
A default state backend can be configured in the flink-conf.yaml
, using the configuration key state.backend
.
可选的值是jobmanager (MemoryStateBackend), filesystem (FsStateBackend),或者一个实现了状态后端工厂FsStateBackendFactory的完全限定类名,例如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
代表RocksDBStateBackend.
在默认的状态设置为filesystem的情况下,入口state.backend.fs.checkpointdir
定义检查点数据将被保存的目录。
配置文件中一段配置可以像下面这样: