第1章 资源配置调优

Flink性能调优的第一步,就是为任务分配合适的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。
提交方式主要是yarn-per-job,资源的分配在使用脚本提交Flink任务时进行指定。
Ø 标准的Flink任务提交脚本(Generic CLI 模式)
从1.11开始,增加了通用客户端模式,参数使用-D 指定

  1. bin/flink run \
  2. -t yarn-per-job \
  3. -d \
  4. -p 5 \ 指定并行度
  5. -Dyarn.application.queue=test \ 指定yarn队列
  6. -Djobmanager.memory.process.size=1024mb \ 指定JM的总进程大小
  7. -Dtaskmanager.memory.process.size=1024mb \ 指定每个TM的总进程大小
  8. -Dtaskmanager.numberOfTaskSlots=2 \ 指定每个TMslot
  9. -c Test \
  10. ./test.jar

参数列表:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html

1.1 内存设置

生产资源配置:

  1. bin/flink run \
  2. -t yarn-per-job \
  3. -d \
  4. -p 5 \ 指定并行度
  5. -Dyarn.application.queue=test \ 指定yarn队列
  6. -Djobmanager.memory.process.size=2048mb \ JM2~4G足够
  7. -Dtaskmanager.memory.process.size=6144mb \ 单个TM2~8G足够
  8. -Dtaskmanager.numberOfTaskSlots=2 \ 与容器核数1core1slot1core2slot
  9. -c Test \
  10. test.jar

Flink是实时流处理,关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用QPS/TPS来描述数据情况。

1.2 并行度设置

1.2.1 最优并行度计算

开发完成后,先进行压测。任务并行度给10以下,测试单个并行度的处理上限。然后 总QPS/单并行度的处理能力 = 并行度
不能只从QPS去得出并行度,因为有些字段少、逻辑简单的任务,单并行度一秒处理几万条数据。而有些数据字段多,处理逻辑复杂,单并行度一秒只能处理1000条数据。
最好根据高峰期的QPS压测,并行度*1.2倍,富余一些资源。

1.2.2 Source 端并行度的配置

数据源端是 Kafka,Source的并行度设置为Kafka对应Topic的分区数。
如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度,考虑下Kafka 要扩大分区,同时调大并行度等于分区数。
Flink 的一个并行度可以处理一至多个分区的数据,如果并行度多于 Kafka 的分区数,那么就会造成有的并行度空闲,浪费资源。

1.2.3 Transform端并行度的配置

Ø Keyby之前的算子
一般不会做太重的操作,都是比如map、filter、flatmap等处理较快的算子,并行度可以和source保持一致。
Ø Keyby之后的算子
如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512;
小并发任务的并行度不一定需要设置成 2 的整数次幂;
大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;

1.2.4 Sink 端并行度的配置

Sink 端是数据流向下游的地方,可以根据 Sink 端的数据量及下游的服务抗压能力进行评估。如果Sink端是Kafka,可以设为Kafka对应Topic的分区数。
Sink 端的数据量小,比较常见的就是监控告警的场景,并行度可以设置的小一些。
Source 端的数据量是最小的,拿到 Source 端流过来的数据后做了细粒度的拆分,数据量不断的增加,到 Sink 端的数据量就非常大。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。
另外 Sink 端要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置,如果在 Flink Sink 这端的数据量过大的话,且 Sink 处并行度也设置的很大,但下游的服务完全撑不住这么大的并发写入,可能会造成下游服务直接被写挂,所以最终还是要在 Sink 处的并行度做一定的权衡。

1.3 RocksDB大状态调优

RocksDB 是基于 LSM Tree 实现的(类似HBase),写数据都是先缓存到内存中,所以RocksDB 的写请求效率比较高。RocksDB 使用内存结合磁盘的方式来存储数据,每次获取数据时,先从内存中 blockcache 中查找,如果内存中没有再去磁盘中查询。优化后差不多单并行度 TPS 5000 record/s,性能瓶颈主要在于 RocksDB 对磁盘的读请求,所以当处理性能不够时,仅需要横向扩展并行度即可提高整个Job 的吞吐量。以下几个调优参数:
Ø 设置本地 RocksDB 多目录
在flink-conf.yaml 中配置:
state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb

注意:不要配置单块磁盘的多个目录,务必将目录配置到多块不同的磁盘上,让多块磁盘来分担压力。当设置多个 RocksDB 本地磁盘目录时,Flink 会随机选择要使用的目录,所以就可能存在三个并行度共用同一目录的情况。如果服务器磁盘数较多,一般不会出现该情况,但是如果任务重启后吞吐量较低,可以检查是否发生了多个并行度共用同一块磁盘的情况。
当一个 TaskManager 包含 3 个 slot 时,那么单个服务器上的三个并行度都对磁盘造成频繁读写,从而导致三个并行度的之间相互争抢同一个磁盘 io,这样务必导致三个并行度的吞吐量都会下降。设置多目录实现三个并行度使用不同的硬盘从而减少资源竞争。
如下所示是测试过程中磁盘的 IO 使用率,可以看出三个大状态算子的并行度分别对应了三块磁盘,这三块磁盘的 IO 平均使用率都保持在 45% 左右,IO 最高使用率几乎都是 100%,而其他磁盘的 IO 平均使用率相对低很多。由此可见使用 RocksDB 做为状态后端且有大状态的频繁读取时, 对磁盘IO性能消耗确实比较大。
1.png
如下图所示,其中两个并行度共用了 sdb 磁盘,一个并行度使用 sdj磁盘。可以看到 sdb 磁盘的 IO 使用率已经达到了 91.6%,就会导致 sdb 磁盘对应的两个并行度吞吐量大大降低,从而使得整个 Flink 任务吞吐量降低。如果每个服务器上有一两块 SSD,强烈建议将 RocksDB 的本地磁盘目录配置到 SSD 的目录下,从 HDD 改为 SSD 对于性能的提升可能比配置 10 个优化参数更有效。
2.png
Ø state.backend.incremental:开启增量检查点,默认false,改为true。
Ø state.backend.rocksdb.predefined-options:SPINNING_DISK_OPTIMIZED_HIGH_MEM设置为机械硬盘+内存模式,有条件上SSD,指定为FLASH_SSD_OPTIMIZED
Ø state.backend.rocksdb.block.cache-size: 整个 RocksDB 共享一个 block cache,读数据时内存的 cache 大小,该参数越大读数据时缓存命中率越高,默认大小为 8 MB,建议设置到 64 ~ 256 MB。
Ø state.backend.rocksdb.thread.num: 用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以改为 4 等更大的值。
Ø state.backend.rocksdb.writebuffer.size: RocksDB 中,每个 State 使用一个 Column Family,每个 Column Family 使用独占的 write buffer,建议调大,例如:32M
Ø state.backend.rocksdb.writebuffer.count: 每个 Column Family 对应的 writebuffer 数目,默认值是 2,对于机械磁盘来说,如果内存⾜够大,可以调大到 5 左右
Ø state.backend.rocksdb.writebuffer.number-to-merge: 将数据从 writebuffer 中 flush 到磁盘时,需要合并的 writebuffer 数量,默认值为 1,可以调成3。
Ø state.backend.local-recovery: 设置本地恢复,当 Flink 任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从 hdfs 拉取数据

1.4 Checkpoint设置

一般我们的 Checkpoint 时间间隔可以设置为分钟级别,例如 1 分钟、3 分钟,对于状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔,例如设置两次Checkpoint 之间至少暂停 4或8 分钟。
如果 Checkpoint 语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,可以通过 Flink Web UI 的 Checkpoint 选项卡来查看 Checkpoint 过程中各阶段的耗时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。
RocksDB相关参数在1.3中已说明,可以在flink-conf.yaml指定,也可以在Job的代码中调用API单独指定,这里不再列出。

  1. // 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 Checkpoint
  2. RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints", true);
  3. env.setStateBackend(rocksDBStateBackend);
  4. // 开启Checkpoint,间隔为 3 分钟
  5. env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
  6. // 配置 Checkpoint
  7. CheckpointConfig checkpointConf = env.getCheckpointConfig();
  8. checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  9. // 最小间隔 4分钟
  10. checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))
  11. // 超时时间 10分钟
  12. checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
  13. // 保存checkpoint
  14. checkpointConf.enableExternalizedCheckpoints(
  15. CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

1.5 使用 Flink ParameterTool 读取配置

在实际开发中,有各种环境(开发、测试、预发、生产),作业也有很多的配置:算子的并行度配置、Kafka 数据源的配置(broker 地址、topic 名、group.id)、Checkpoint 是否开启、状态后端存储路径、数据库地址、用户名和密码等各种各样的配置,可能每个环境的这些配置对应的值都是不一样的。
如果你是直接在代码⾥⾯写死的配置,每次换个环境去运行测试作业,都要重新去修改代码中的配置,然后编译打包,提交运行,这样就要花费很多时间在这些重复的劳动力上了。在 Flink 中可以通过使用 ParameterTool 类读取配置,它可以读取环境变量、运行参数、配置文件。
ParameterTool 是可序列化的,所以你可以将它当作参数进行传递给算子的自定义函数类。

1.5.1 读取运行参数

我们可以在Flink的提交脚本添加运行参数,格式:
Ø —参数名 参数值
Ø -参数名 参数值
在 Flink 程序中可以直接使用 ParameterTool.fromArgs(args) 获取到所有的参数,也可以通过 parameterTool.get(“username”) 方法获取某个参数对应的值。
举例:通过运行参数指定jobname

  1. bin/flink run \
  2. -t yarn-per-job \
  3. -d \
  4. -p 5 \ 指定并行度
  5. -Dyarn.application.queue=test \ 指定yarn队列
  6. -Djobmanager.memory.process.size=1024mb \ 指定JM的总进程大小
  7. -Dtaskmanager.memory.process.size=1024mb \ 指定每个TM的总进程大小
  8. -Dtaskmanager.numberOfTaskSlots=2 \ 指定每个TMslot
  9. -c Test \
  10. test.jar
  11. --jobname ods-test //参数名自己随便起,代码里对应上即可

在代码里获取参数值:

  1. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  2. String myJobname = parameterTool.get("jobname"); //参数名对应
  3. env.execute(myJobname);

1.5.2 读取系统属性

ParameterTool 还⽀持通过 ParameterTool.fromSystemProperties() 方法读取系统属性。做个打印:
ParameterTool parameterTool = ParameterTool.fromSystemProperties();
System.out.println(parameterTool.toMap().toString());

1.5.3 读取配置文件

可以使用ParameterTool.fromPropertiesFile(“/application.properties”) 读取 properties 配置文件。可以将所有要配置的地方(比如并行度和一些 Kafka、MySQL 等配置)都写成可配置的,然后其对应的 key 和 value 值都写在配置文件中,最后通过 ParameterTool 去读取配置文件获取对应的值。

1.5.4 注册全局参数

在 ExecutionConfig 中可以将 ParameterTool 注册为全作业参数的参数,这样就可以被 JobManager 的web 端以及用户⾃定义函数中以配置值的形式访问。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));

可以不用将ParameterTool当作参数传递给算子的自定义函数,直接在用户⾃定义的 Rich 函数中直接获取到参数值

  1. env.addSource(new RichSourceFunction() {
  2. @Override
  3. public void run(SourceContext sourceContext) throws Exception {
  4. while (true) {
  5. ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  6. }
  7. }
  8. @Override
  9. public void cancel() {
  10. }
  11. })

1.6 压测方式

  1. 压测的方式很简单,先在kafka中积压数据,之后开启Flink任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。数据可以是自己造的模拟数据,也可以是生产中的部分数据。

第2章 反压处理

反压(BackPressure)通常产生于这样的场景:短时间的负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或遇到大促、秒杀活动导致流量陡增。反压如果不能得到正确的处理,可能会导致资源耗尽甚至系统崩溃。
反压机制是指系统能够自己检测到被阻塞的 Operator,然后自适应地降低源头或上游数据的发送速率,从而维持整个系统的稳定。Flink 任务一般运行在多个节点上,数据从上游算子发送到下游算子需要网络传输,若系统在反压时想要降低数据源头或上游算子数据的发送速率,那么肯定也需要网络传输。所以下面先来了解一下 Flink 的网络流控(Flink 对网络数据流量的控制)机制。

2.1 反压现象及定位

Flink 的反压太过于天然了,导致无法简单地通过监控 BufferPool 的使用情况来判断反压状态。Flink 通过对运行中的任务进行采样来确定其反压,如果一个 Task 因为反压导致处理速度降低了,那么它肯定会卡在向 LocalBufferPool 申请内存块上。那么该 Task 的 stack trace 应该是这样:

  1. java.lang.Object.wait(Native Method)
  2. o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163) o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) [...]

监控对正常的任务运行有一定影响,因此只有当 Web 页面切换到 Job 的 BackPressure 页面时,JobManager 才会对该 Job 触发反压监控。默认情况下,JobManager 会触发 100 次 stack trace 采样,每次间隔 50ms 来确定反压。Web 界面看到的比率表示在内部方法调用中有多少 stack trace 被卡在LocalBufferPool.requestBufferBlocking(),例如: 0.01 表示在 100 个采样中只有 1 个被卡在LocalBufferPool.requestBufferBlocking()。采样得到的比例与反压状态的对应关系如下:
Ø OK: 0 <= 比例 <= 0.10
Ø LOW: 0.10 < 比例 <= 0.5
Ø HIGH: 0.5 < 比例 <= 1
Task 的状态为 OK 表示没有反压,HIGH 表示这个 Task 被反压。

2.1.1 利用 Flink Web UI 定位产生反压的位置

在 Flink Web UI 中有 BackPressure 的页面,通过该页面可以查看任务中 subtask 的反压状态,如下两图所示,分别展示了状态是 OK 和 HIGH 的场景。排查的时候,先把operator chain禁用,方便定位。
3.png
4.png

2.1.2 利用Metrics定位反压位置

当某个 Task 吞吐量下降时,基于 Credit 的反压机制,上游不会给该 Task 发送数据,所以该 Task 不会频繁卡在向 Buffer Pool 去申请 Buffer。反压监控实现原理就是监控 Task 是否卡在申请 buffer 这一步,所以遇到瓶颈的 Task 对应的反压⻚⾯必然会显示 OK,即表示没有受到反压。
如果该 Task 吞吐量下降,造成该Task 上游的 Task 出现反压时,必然会存在:该 Task 对应的 InputChannel 变满,已经申请不到可用的Buffer 空间。如果该 Task 的 InputChannel 还能申请到可用 Buffer,那么上游就可以给该 Task 发送数据,上游 Task 也就不会被反压了,所以说遇到瓶颈且导致上游 Task 受到反压的 Task 对应的 InputChannel 必然是满的(这⾥不考虑⽹络遇到瓶颈的情况)。从这个思路出发,可以对该 Task 的 InputChannel 的使用情况进行监控,如果 InputChannel 使用率 100%,那么该 Task 就是我们要找的反压源。Flink 1.9 及以上版本inPoolUsage 表示 inputFloatingBuffersUsage 和inputExclusiveBuffersUsage 的总和。