使用 DataGen 造数据

开发完 Flink 作业,压测的方式很简单,先在 kafka 中积压数据,之后开启 Flink 任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。
数据可以是自己造的模拟数据,也可以是生产中的部分数据。造测试数据的工具: DataFactory、datafaker 、DBMonster、Data-Processer 、Nexmark、Jmeter 等。
Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。

DataStreamAPI 的 DataGenerator

https://gitee.com/luan_hao/flink-turn/blob/master/src/main/java/com/hao/flink/tuning/DataStreamDataGenDemo.java

SQL 的 DataGenerator

https://gitee.com/luan_hao/flink-turn/blob/master/src/main/java/com/hao/flink/tuning/SQLDataGenDemo.java

算子指定 UUID

对于有状态的 Flink 应用,推荐给每个算子都指定唯一用户 ID(UUID)。
严格地说,仅需要给有状态的算子设置就足够了。但是因为 Flink 的某些内置算子(如 window)是有状态的,而有些是无状态的,可能用户不是很清楚哪些内置算子是有状态的,哪些不是。

所以从实践经验上来说,我们建议每个算子都指定上 UUID。
默认情况下,算子 UID 是根据 JobGraph 自动生成的,JobGraph 的更改可能会导致UUID 改变。手动指定算子 UUID ,可以让 Flink 有效地将算子的状态从 savepoint 映射到作业修改后(拓扑图可能也有改变)的正确的算子上。比如替换原来的 Operator 实现、

增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。这是 savepoint 在 Flink 应用中正常工作的一个基本要素。Flink 算子的 UUID 可以通过 uid(String uid) 方法指定,通常也建议指定 name。

  1. #算子.uid("指定 uid")
  2. .reduce((value1, value2) -> Tuple3.of("uv", value2.f1, value1.f2 + value2.f2))
  3. .uid("uv-reduce").name("uv-reduce")

链路延迟

对象重用

当调用了 enableObjectReuse 方法后,Flink 会把中间深拷贝的步骤都省略掉,SourceFunction 产生的数据直接作为 MapFunction 的输入,可以减少 gc 压力。但需要特别注意的是,这个方法不能随便调用,必须要确保下游 Function 只有一种,或者下游的Function 均不会改变对象内部的值。否则可能会有线程安全的问题。

  1. env.getConfig().enableObjectReuse();

bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ -Drest.flamegraph.enabled=true \ -Dyarn.application.queue=default \ -Djobmanager.memory.process.size=1024mb \ -Dtaskmanager.memory.process.size=2048mb \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dpipeline.object-reuse=true \ -Dmetrics.latency.interval=30000 \ -c com.hao.flink.tuning.UidDemo \ /opt/module/flink-1.13.6/jar/flink-turn2.jar

细粒度滑动窗口优化

1)细粒度滑动的影响

当使用细粒度的滑动窗口(窗口长度远远大于滑动步长)时,重叠的窗口过多,一个数据会属于多个窗口,性能会急剧下降。

2)解决思路

DataStreamAPI中,自己解决(https://issues.apache.org/jira/browse/FLINK-7001)。
我们一般使用滚动窗口+在线存储+读时聚合的思路作为解决方案:
(1)从业务的视角来看,往往窗口的长度是可以被步长所整除的,可以找到窗口长度和窗口步长的最小公约数作为时间分片(一个滚动窗口的长度);
(2)每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储(内存数据库如 Redis,LSM-based NoSQL 存储如 HBase);
(3)扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。

3)细粒度的滑动窗口案例

https://gitee.com/luan_hao/flink-turn/blob/master/src/main/java/com/hao/flink/tuning/SlideWindowDemo.java

提交案例:统计最近 1 小时的 uv,1 秒更新一次(滑动窗口)

bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ -Drest.flamegraph.enabled=true \ -Dyarn.application.queue=default \ -Djobmanager.memory.process.size=1024mb \ -Dtaskmanager.memory.process.size=2048mb \ -Dtaskmanager.numberOfTaskSlots=2 \ -c com.hao.flink.tuning.SlideWindowDemo \ /opt/module/flink-1.13.6/jar/flink-turn2.jar —sliding-split false

4)时间分片案例

提交案例:统计最近 1 小时的 uv,1 秒更新一次(滚动窗口+状态存储)

bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ -Drest.flamegraph.enabled=true \ -Dyarn.application.queue=default \ -Djobmanager.memory.process.size=1024mb \ -Dtaskmanager.memory.process.size=2048mb \ -Dtaskmanager.numberOfTaskSlots=2 \ -c com.hao.flink.tuning.SlideWindowDemo \ /opt/module/flink-1.13.6/jar/flink-turn2.jar \ —sliding-split true

Flink 1.13 对 SQL 模块的 Window TVF 进行了一系列的性能优化,可以自动对滑动窗口进行切片解决细粒度滑动问题。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf