状态后端

Data Stream API 编写的流处理程序,通常要保存不同形式的状态信息: Programs written in the Data Stream API often hold state in various forms:

  • 窗口被触发之前收集或聚合的元素
  • 转换函数也许用到key/value状态接口来存储values
  • 转换函数也许实现了Checkpointed接口,使得本地变量具有容错的能力

可以在流处理API中查看 Working with State. See also Working with State in the streaming API guide.

当检查点功能被激活后,以上的这些基于特定检查点的状态将被持久化,以防止数据丢失或恢复到一致性状态。状态在内部如何被描述,如何被持久化以及持久化到哪里,这依赖于选择的状态后端

可用的状态后端

开箱即用,Flink提供了这些状态后端:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

不作任何配置的情况下,系统默认使用MemoryStateBackend。

MemoryStateBackend

MemoryStateBackend以对象的形式保存数据到java堆中。Key/value状态和窗口运算中的数据,用hash表来存储值、触发器等信息。

基于checkpoints接口的方式,状态后端将对状态进行快照,并作为检查点的一部分,发送通知给JobManager (master),这些数据同样存储在java堆中。

MemoryStateBackend 的限制:

  • 每个独立的状态大小上限是5MB,这个值可以通过MemoryStateBackend的构造函数来增加。
  • 不考虑可配置的状态的最大值,状态的大小不能大于akka框架的。 (看 Configuration).
  • 聚合的状态必须匹配到JobManager的内存.

MemoryStateBackend 鼓励用于:

  • 本地开发或调试
  • 拥有很小状态的Jobs, 例如那种一次包含一个记录的函数 (Map, FlatMap, Filter, …). Kafka Consumer 就需要很小的状态.

FsStateBackend

FsStateBackend 可以通过一个文件系统的URL来配置 (type, address, path), 例如 “hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”. The FsStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.

FsStateBackend保存TaskManager内存中的运行数据. 基于检查点功能, 它将包含状态的快照信息写入配置好的文件系统目录中. 最小的元数据信息被存储在JobManager的内存中 (或者在高可用模式下, 存储到元数据检查点中).

FsStateBackend鼓励用于: The FsStateBackend is encouraged for:

  • 拥有大的状态的Jobs, 很长的窗口, 很大的key/value状态.
  • 所有高可用安装模式.
  • Jobs with large state, long windows, large key/value states.
  • All high-availability setups.

RocksDBStateBackend

RocksDBStateBackend可以通过一个文件系统的URL来配置 (type, address, path), 例如 “hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”. The RocksDBStateBackend is configured with a file system URL (type, address, path), such as “hdfs://namenode:40010/flink/checkpoints” or “file:///data/flink/checkpoints”.

RocksDBStateBackend保存TaskManager中的运行数据,存储到RocksDB 数据库中. 基于检查点功能, 所有RocksDB数据库的数据将通过检查点保存到文件系统目录中. 最小的元数据信息被存储到JobManager的内存中 (或者在高可用模式下, 存储到元数据检查点中).

RocksDBStateBackend鼓励用于:

  • 拥有非常大状态的Jobs, 很长的窗口, 很大的key/value状态.
  • 所有高可用安装模式.

注意,RocksDBStateBackend状态的大小仅仅受限于磁盘空间的可用度。 相对于存储状态到内存的FsStateBackend,这允许保存非常大的状态。 这同样意味着,可以达到的最大的吞吐量将低于状态后端的。

配置一个状态后端

Configuring a State Backend

可以为每个job配置状态后端。而且,你可以定义一个默认的状态后端,以供当在job中没有明确定义状态后端时使用。

为每一个job配置状态后端

Setting the Per-job State Backend

每个job的状态后端可以在StreamExecutionEnvironment中设置,就像下面展示的例子:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))

设置默认的状态后端

一个默认的状态后端可以在flink-conf.yaml配置文件中使用state.backend配置关键字来配置。 A default state backend can be configured in the flink-conf.yaml, using the configuration key state.backend.

可选的值是jobmanager (MemoryStateBackend), filesystem (FsStateBackend),或者一个实现了状态后端工厂FsStateBackendFactory的完全限定类名,例如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory 代表RocksDBStateBackend.

在默认的状态设置为filesystem的情况下,入口state.backend.fs.checkpointdir 定义检查点数据将被保存的目录。

配置文件中一段配置可以像下面这样: