用Data Stream API编写的程序通常以各种形式保存状态:
- Windows会在触发数据元或聚合之前收集数据元或聚合
- 转换函数可以使用键/值状态接口来存储值
- 转换函数可以实现
CheckpointedFunction
接口以使其局部变量具有容错能力
另请参阅流API指南中的状态部分。
激活检查点时,检查点会持续保持此类状态,以防止数据丢失并始终如一地恢复。状态如何在内部表示,以及在检查点上如何以及在何处持续取决于所选择的状态后台。
可用的状态后台
开箱即用,Flink捆绑了这些状态后台:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
如果没有配置其他任何内容,系统将使用MemoryStateBackend。
MemoryStateBackend
该MemoryStateBackend保存数据在内部作为Java堆的对象。键/值状态和窗口 算子包含存储值,触发器等的哈希表。
在检查点时,此状态后台将对状态进行SNAPSHOT,并将其作为检查点确认消息的一部分发送到JobManager(主服务器),JobManager也将其存储在其堆上。
可以将MemoryStateBackend配置为使用异步SNAPSHOT。虽然我们强烈建议使用异步SNAPSHOT来避免阻塞管道,但请注意,默认情况下,此函数目前处于启用状态。要禁用此函数,用户可以MemoryStateBackend
在构造函数中将相应的布尔标志实例化为false
(这应该仅用于调试),例如:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
MemoryStateBackend的局限性:
- 默认情况下,每个状态的大小限制为5 MB。可以在MemoryStateBackend的构造函数中增加此值。
- 无论配置的最大状态大小如何,状态都不能大于akka帧大小(请参阅配置)。
- 聚合状态必须适合JobManager内存。
鼓励MemoryStateBackend用于:
- 本地开发和调试
- 几乎没有状态的作业,例如仅包含一次记录函数的作业(Map,FlatMap,Filter,…)。Kafka消费者需要很少的状态。
FsStateBackend
所述FsStateBackend配置有文件系统URL(类型,地址,路径),如“HDFS://名称节点:40010 /Flink/检查点”或“文件:///数据/Flink/检查点”。
FsStateBackend将正在运行的数据保存在TaskManager的内存中。在检查点时,它将状态SNAPSHOT写入配置的文件系统和目录中的文件。最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。
FsStateBackend 默认使用异步SNAPSHOT,以避免在编写状态检查点时阻塞处理管道。要禁用此函数,用户可以FsStateBackend
在构造函数集中使用相应的布尔标志来实例化a false
,例如:
new FsStateBackend(path, false);
鼓励FsStateBackend:
- 具有大状态,长窗口,大键/值状态的作业。
- 所有高可用性设置。
RocksDBStateBackend
所述RocksDBStateBackend配置有文件系统URL(类型,地址,路径),如“HDFS://名称节点:40010 /Flink/检查点”或“文件:///数据/Flink/检查点”。
RocksDBStateBackend将RocksDB数据库中的飞行中数据保存在(默认情况下)存储在TaskManager数据目录中。在检查点时,整个RocksDB数据库将被检查点到配置的文件系统和目录中。最小元数据存储在JobManager的内存中(或者,在高可用性模式下,存储在元数据检查点中)。
RocksDBStateBackend始终执行异步SNAPSHOT。
RocksDBStateBackend的局限性:
- 由于RocksDB的JNI桥接API基于byte [],因此每个Keys和每个值的最大支持大小为2 ^ 31个字节。重要提示:在RocksDB中使用合并 算子操作的状态(例如ListState)可以静默地累积> 2 ^ 31字节的值大小,然后在下次检索时失败。这是目前RocksDB JNI的一个限制。
我们鼓励RocksDBStateBackend:
- 具有非常大的状态,长窗口,大键/值状态的作业。
- 所有高可用性设置。
请注意,您可以保存的状态量仅受可用磁盘空间量的限制。与将状态保持在内存中的FsStateBackend相比,这允许保持非常大的状态。但是,这也意味着使用此状态后台可以实现的最大吞吐量更低。对此后台的所有读/写都必须通过去/序列化来检索/存储状态对象,这比使用堆基表示正在进行的堆上表示更昂贵。
RocksDBStateBackend是目前唯一提供增量检查点的后台(见这里)。
配置状态后台
如果您不指定任何内容,则默认状态后台是JobManager。如果要为群集上的所有作业建立不同的默认值,可以通过在flink-conf.yaml中定义新的默认状态后台来实现。可以基于每个作业覆盖默认状态后台,如下所示。
设置每个作业状态后台
每个作业状态后台StreamExecutionEnvironment
在作业上设置,如下例所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
设置默认状态后台
可以flink-conf.yaml
使用配置Keys在配置中配置默认状态后台state.backend
。
config条目的可能值包括jobmanager(MemoryStateBackend),filesystem(FsStateBackend),rocksdb(RocksDBStateBackend),或实现状态后台工厂FsStateBackendFactory的类的完全限定类名,例如org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
RocksDBStateBackend。
该state.checkpoints.dir
选项定义所有后台写入检查点数据和元数据文件的目录。您可以在此处找到有关检查点目录结构的更多详细信息。
配置文件中的示例部分可能如下所示:
# The backend that will be used to store operator state checkpoints
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
state.backend: filesystem
# Directory for storing checkpoints
> 译者:[flink.sojb.cn](https://flink.sojb.cn/)
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints