使用 DataGen 造数据
开发完 Flink 作业,压测的方式很简单,先在 kafka 中积压数据,之后开启 Flink 任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。
数据可以是自己造的模拟数据,也可以是生产中的部分数据。造测试数据的工具: DataFactory、datafaker 、DBMonster、Data-Processer 、Nexmark、Jmeter 等。
Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。
DataStreamAPI 的 DataGenerator
https://gitee.com/luan_hao/flink-turn/blob/master/src/main/java/com/hao/flink/tuning/DataStreamDataGenDemo.java
SQL 的 DataGenerator
算子指定 UUID
对于有状态的 Flink 应用,推荐给每个算子都指定唯一用户 ID(UUID)。
严格地说,仅需要给有状态的算子设置就足够了。但是因为 Flink 的某些内置算子(如 window)是有状态的,而有些是无状态的,可能用户不是很清楚哪些内置算子是有状态的,哪些不是。
所以从实践经验上来说,我们建议每个算子都指定上 UUID。
默认情况下,算子 UID 是根据 JobGraph 自动生成的,JobGraph 的更改可能会导致UUID 改变。手动指定算子 UUID ,可以让 Flink 有效地将算子的状态从 savepoint 映射到作业修改后(拓扑图可能也有改变)的正确的算子上。比如替换原来的 Operator 实现、
增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。这是 savepoint 在 Flink 应用中正常工作的一个基本要素。Flink 算子的 UUID 可以通过 uid(String uid) 方法指定,通常也建议指定 name。
#算子.uid("指定 uid")
.reduce((value1, value2) -> Tuple3.of("uv", value2.f1, value1.f2 + value2.f2))
.uid("uv-reduce").name("uv-reduce")
链路延迟
对象重用
当调用了 enableObjectReuse 方法后,Flink 会把中间深拷贝的步骤都省略掉,SourceFunction 产生的数据直接作为 MapFunction 的输入,可以减少 gc 压力。但需要特别注意的是,这个方法不能随便调用,必须要确保下游 Function 只有一种,或者下游的Function 均不会改变对象内部的值。否则可能会有线程安全的问题。
env.getConfig().enableObjectReuse();
bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ -Drest.flamegraph.enabled=true \ -Dyarn.application.queue=default \ -Djobmanager.memory.process.size=1024mb \ -Dtaskmanager.memory.process.size=2048mb \ -Dtaskmanager.numberOfTaskSlots=2 \ -Dpipeline.object-reuse=true \ -Dmetrics.latency.interval=30000 \ -c com.hao.flink.tuning.UidDemo \ /opt/module/flink-1.13.6/jar/flink-turn2.jar
细粒度滑动窗口优化
1)细粒度滑动的影响
当使用细粒度的滑动窗口(窗口长度远远大于滑动步长)时,重叠的窗口过多,一个数据会属于多个窗口,性能会急剧下降。
2)解决思路
DataStreamAPI中,自己解决(https://issues.apache.org/jira/browse/FLINK-7001)。
我们一般使用滚动窗口+在线存储+读时聚合的思路作为解决方案:
(1)从业务的视角来看,往往窗口的长度是可以被步长所整除的,可以找到窗口长度和窗口步长的最小公约数作为时间分片(一个滚动窗口的长度);
(2)每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储(内存数据库如 Redis,LSM-based NoSQL 存储如 HBase);
(3)扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。
3)细粒度的滑动窗口案例
提交案例:统计最近 1 小时的 uv,1 秒更新一次(滑动窗口)
bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ -Drest.flamegraph.enabled=true \ -Dyarn.application.queue=default \ -Djobmanager.memory.process.size=1024mb \ -Dtaskmanager.memory.process.size=2048mb \ -Dtaskmanager.numberOfTaskSlots=2 \ -c com.hao.flink.tuning.SlideWindowDemo \ /opt/module/flink-1.13.6/jar/flink-turn2.jar —sliding-split false
4)时间分片案例
提交案例:统计最近 1 小时的 uv,1 秒更新一次(滚动窗口+状态存储)
bin/flink run \ -t yarn-per-job \ -d \ -p 5 \ -Drest.flamegraph.enabled=true \ -Dyarn.application.queue=default \ -Djobmanager.memory.process.size=1024mb \ -Dtaskmanager.memory.process.size=2048mb \ -Dtaskmanager.numberOfTaskSlots=2 \ -c com.hao.flink.tuning.SlideWindowDemo \ /opt/module/flink-1.13.6/jar/flink-turn2.jar \ —sliding-split true
Flink 1.13 对 SQL 模块的 Window TVF 进行了一系列的性能优化,可以自动对滑动窗口进行切片解决细粒度滑动问题。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf