1. 使用 DataGen 造数据
开发完 Flink 作业,压测的方式很简单,先在 kafka 中积压数据,之后开启 Flink 任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。
数据可以是自己造的模拟数据,也可以是生产中的部分数据。造测试数据的工具:DataFactory、datafaker、DBMonster、Data-Processer 、Nexmark、Jmeter 等。
Flink 从 1.11 开始提供了一个内置的 DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。
2. 算子指定 UUID
对于有状态的Flink应用,推荐给每个算子都指定唯一用户ID(UUID)。严格地说,仅需要给有状态的算子设置就足够了。但是因为Flink的某些内置算子(如 window)是有状态的,而有些是无状态的,可能用户不是很清楚哪些内置算子是有状态的,哪些不是。所以从实践经验上来说,我们建议每个算子都指定上 UUID。
默认情况下,算子 UID 是根据 JobGraph 自动生成的,JobGraph 的更改可能会导致 UUID 改变。手动指定算子UUID,可以让 Flink 有效地将算子的状态从 savepoint 映射到作业修改后(拓扑图可能也有改变)的正确的算子上。比如替换原来的Operator实现、增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator状态对应上。这是savepoint在Flink应用中正常工作的一个基本要素。
3. 链路延迟测量
对于实时的流式处理系统来说,我们需要关注数据输入、计算和输出的及时性,所以处理延迟是一个比较重要的监控指标,特别是在数据量大或者软硬件条件不佳的环境下。Flink提供了开箱即用的LatencyMarker 机制来测量链路延迟。开启如下参数:
metrics.latency.interval: 30000 #默认 0,表示禁用,单位毫秒
监控的粒度,分为以下 3 档:
- ➢ single:每个算子单独统计延迟;
- ➢ operator(默认值):每个下游算子都统计自己与 Source 算子之间的延迟;
- ➢ subtask:每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间的延迟。
metrics.latency.granularity: operator #默认 operator
一般情况下采用默认的 operator 粒度即可,这样在 Sink 端观察到的 latency metric就是我们最想要的全链路(端到端)延迟。subtask 粒度太细,会增大所有并行度的负担,不建议使用。
LatencyMarker 不会参与到数据流的用户逻辑中的,而是直接被各算子转发并统计。 为了让它尽量精确,有两点特别需要注意:
- ➢ 保证 Flink 集群内所有节点的时区、时间是同步的:ProcessingTimeService 产生时间戳最终是靠 System.currentTimeMillis()方法,可以用 ntp 等工具来配置。
- ➢ metrics.latency.interval 的时间间隔宜大不宜小:一般配置成 30000(30 秒)左右。一是因为延迟监控的频率可以不用太频繁,二是因为 LatencyMarker 的处理也要消耗一定性能。
