RocksDB 大状态调优

RocksDB 是基于 LSM Tree 实现的(类似 HBase),写数据都是先缓存到内存中,所以 RocksDB 的写请求效率比较高。RocksDB 使用内存结合磁盘的方式来存储数据,每次获取数据时,先从内存中 blockcache 中查找,如果内存中没有再去磁盘中查询。使用RocksDB 时,状态大小仅受可用磁盘空间量的限制,性能瓶颈主要在于 RocksDB 对磁盘的读请求,每次读写操作都必须对数据进行反序列化或者序列化。当处理性能不够时,仅需要横向扩展并行度即可提高整个 Job 的吞吐量。

1646982516(1).png

从 Flink1.10 开始,Flink 默认将 RocksDB 的内存大小配置为每个 task slot 的托管内存。调试内存性能的问题主要是通过调整配置项

taskmanager.memory.managed.size 或者 taskmanager.memory.managed.fraction

以增加 Flink 的托管内存(即堆外的托管内存)。进一步可以调整一些参数进行高级性能调优,这些参数也可以在应用程序中通过

RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory)指定。

下面介绍提高资源利用率的几个重要配置:

开启 State 访问性能监控

Flink 1.13 中引入了 State 访问的性能监控,即 latency trackig state。此功能不局限于 State Backend 的类型,自定义实现的 State Backend 也可以复用此功能。

image.png

State 访问的性能监控会产生一定的性能影响,所以,默认每 100 次做一次取样 (sample),对不同的 State Backend 性能损失影响不同:
对于 RocksDB State Backend,性能损失大概在 1% 左右
对于 Heap State Backend,性能损失最多可达 10%

state.backend.latency-track.keyed-state-enabled:true #启用访问状态的性能监控 state.backend.latency-track.sample-interval: 100 #采样间隔 state.backend.latency-track.history-size: 128 #保留的采样数据个数,越大越精确 state.backend.latency-track.state-name-as-variable: true #将状态名作为变量

开启增量检查点和本地恢复

1)开启增量检查点
RocksDB 是目前唯一可用于支持有状态流处理应用程序增量检查点的状态后端,可以修改参数开启增量检查点:

state.backend.incremental: true #默认 false,改为 true。 或代码中指定 new EmbeddedRocksDBStateBackend(true)

2)开启本地恢复
当 Flink 任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从 hdfs 拉取数据。本地恢复目前仅涵盖键控类型的状态后端(RocksDB),MemoryStateBackend不支持本地恢复并忽略此选项。

state.backend.local-recovery: true

3)设置多目录
如果有多块磁盘,也可以考虑指定本地多目录

state.backend.rocksdb.localdir : /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb

注意:不要配置单块磁盘的多个目录,务必将目录配置到多块不同的磁盘上,让多块磁盘来分担压力。

调整预定义选项

Flink 针对不同的设置为 RocksDB 提供了一些预定义的选项集合,其中包含了后续提到的一些参数,如果调整预定义选项后还达不到预期,再去调整后面的 block、writebuffer等参数。

当 前 支 持 的 预 定 义 选 项 有 DEFAULT 、 SPINNING_DISK_OPTIMIZED 、
SPINNING_DISK_OPTIMIZED_HIGH_MEM 或 FLASH_SSD_OPTIMIZED。有条
件上 SSD 的,可以指定为 FLASH_SSD_OPTIMIZED

state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM

设置为机械硬盘+内存模式

增大 block 缓存

整个 RocksDB 共享一个 block cache,读数据时内存的 cache 大小,该参数越大读 数据时缓存命中率越高,默认大小为 8 MB,建议设置到 64 ~ 256MB。

state.backend.rocksdb.block.cache-size: 64m #默认 8m

增大 write buffer 和 level 阈值大小

RocksDB 中,每个 State 使用一个 Column Family,每个 Column Family 使用独占的 write buffer,默认 64MB,建议调大。
调整这个参数通常要适当增加 L1 层的大小阈值 max-size-level-base,默认 256m。
该值太小会造成能存放的 SST 文件过少,层级变多造成查找困难,太大会造成文件过多,合并困难。建议设为 target_file_size_base(默认 64MB)
的倍数,且不能太小,例如 5~10倍,即 320~640MB。

state.backend.rocksdb.writebuffer.size: 128m state.backend.rocksdb.compaction.level.max-size-level-base: 320m

增大 write buffer 数量

每个 Column Family 对应的 writebuffer 最大数量,这实际上是内存中“只读内存 表“的最大数量,默认值是 2。对于机械磁盘来说,如果内存足够大,可以调大到 5 左右

state.backend.rocksdb.writebuffer.count: 5

增大后台线程数和 write buffer 合并数

1)增大线程数
用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以 改为 4 等更大的值

state.backend.rocksdb.thread.num: 4

2)增大 writebuffer 最小合并数
将数据从 writebuffer 中 flush 到磁盘时,需要合并的 writebuffer 最小数量,默认值为 1,可以调成 3。

state.backend.rocksdb.writebuffer.number-to-merge: 3

开启分区索引功能

Flink 1.13 中对 RocksDB 增加了分区索引功能,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的 partitioned Index 做了多级索引。也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,相对于内存比较小的场景中,性能提升 10 倍左右。如果在内存管控下 Rocksdb 性能不如预期的话,这也能成为一个性能优化点。

state.backend.rocksdb.memory.partitioned-index-filters : true #默认 false