在之前的深入了解 Apache Flink 的网络协议栈 文章中,我们从高级抽象到底层细节各个层面全面介绍了 Flink 网络栈的工作机制。 作为这一系列的第二篇文章,本文将在第一篇的基础上更进一步,主要探讨如何监视与网络相关的指标,从而识别反压等因素带来的影响,或找出吞吐量和延迟的瓶颈所在。 本文将简要介绍了如何处理反压,但调整网络栈的话题将在以后的文章中进一步研究。 如果你不熟悉网络栈,我们强烈建议你先阅读 深入了解 Apache Flink 的网络协议栈,然后再继续阅读。
监控
网络监控最重要的部分可能是监控反压,即系统接收数据的速率高于其处理速度 [1]。 这会导致发送方反压,而导致反压的原因可能有两种情况:
- 接收方处理慢。发生这种情况的原因可能是
接收方反压
,无法保持与发送方相同的处理速度,或者由于垃圾收集、缺乏系统资源或 I/O 而暂时阻塞。 - 网络通道慢。即使这种情况可能和接收方没有(直接)关系,如果在同一台机器上运行的所有子任务共享的网络带宽被过度消耗,我们也称
发送方反压
。 请注意,除了 Flink 的网络栈之外,可能还有其他网络用户,例如 Source 和 Sink、分布式文件系统(Checkpoint、基于网络的存储)、日志记录和指标。 我们之前的一篇关于 如何调整 ApacheFlink 集群的大小:粗略计算 的文章介绍了更多相关内容。
[1] 如果你不熟悉反压以及它如何与 Flink 交互,建议阅读我们在 2015 年发表的关于如何处理反压 的文章。
如果出现反压,它将向上游传递,最终到达 Source 并减慢 Source 的速度。 这本身并不是一件坏事,只是表明你缺乏足够的资源处理当前的负载。 但是,但你可能想要做一些改进,以便在不动用更多资源的情况下处理更高的负载。 为此,你需要找到瓶颈在哪里(位于哪个 Task /算子)以及是什么原因造成的。
Flink 提供了两种机制来发现瓶颈所在:
- 直接通过 Flink 的 Web UI 及其反压监视器
- 间接通过一些网络指标。
Flink 的 Web UI 可能是快速排除故障时的首选,但它存在一些缺点,我们将在下面解释。 另一方面,Flink 的网络指标更适合持续监控和判断是哪些瓶颈导致了反压,并分析这些瓶颈的根因。 我们将在下文中具体介绍这两个部分。 在这两种情况下,你都需要从 Source 到 Sink 找出反压的根源。 调查工作的起点一般来说是最后一个遇到反压的算子。 而且最后这个算子很可能就是反压产生的源头。
反压监视器
反压监视器只暴露在 Flink 的 Web UI [2] 中。 由于它仅在请求时才会触发,因此目前无法通过监控指标来提供给用户。 反压监视器通过 Thread.getStackTrace() 对所有 TaskManager 上正在运行的任务的线程进行采样,并计算请求缓冲区的任务被阻塞的样本数。这些任务之所以会阻塞,要么是因为它们无法按照网络缓冲区生成的速率发送这些缓存,要么就是下游任务处理它们的速度很慢,并且没有可以使用的 Credit。 反压监视器将显示阻塞与总请求的比率。 由于某些反压被认为是正常/临时的,所以监视器将显示以下状态:
- OK,比率 ≤ 0.10
- LOW,0.10 < 比率 ≤ 0.5
- HIGH,0.5 < 比率 ≤ 1
[2] 你还可以通过 REST API 访问反压监视器:/jobs/:jobid/vertices/:vertexid/backpressure
尽管你也可以调整刷新间隔、样本数或样本之间的延迟等参数,但通常情况下这些参数用不着你来调整,因为默认值已经给出了足够好的结果。
反压监视器可以帮助你找到反压源自何处(位于哪个 Task /算子)。 但你没法用它进一步推断反压产生的原因。 此外,对于更大或较高并行度的作业,反压监视器显示的信息太乱以至于无法使用,并且还可能要花些时间才能完整收集来自 TaskManager 的数据。另请注意,采样工作可能还会影响你当前作业的性能。
网络指标
网络指标和任务 I/O 指标比反压监视器更轻量,而且会针对每个正在运行的作业不断更新指标。我们可以利用这些指标获得更多信息,收集到的信息除了用来监测背反压还有其他用途。和用户关系最大的指标是:
- Flink 1.8 及以下版本**:**outPoolUsage、inPoolUsage
- Flink 1.9 及以上版本**:**outPoolUsage、inPoolUsage、floatingBuffersUsage、exclusiveBuffersUsage
- numRecordsOut、numRecordsIn
每个指标都有两个作用域:一个是算子,另一个是子任务。网络监视使用的是<font style="color:rgb(24, 24, 24);">子任务作用域</font>
指标,并显示它已发送/接收的记录总数。你可能需要进一步研究这些数字来找出特定时间跨度内的记录数量,或使用等效的 PerSecond 指标。
- numBytesOut、numBytesInLocal、numBytesInRemote
- numBuffersOut、numBuffersInLocal、numBuffersInRemote
类似于 numBytes,但这里计算的是网络缓冲区的数量。
⚠️警告 出于完整性的考虑以及它们在以前被使用过,我们将简要介绍 outputQueueLength 和 inputQueueLength 这两个指标。它们有点类似 [out、in]PoolUsage 指标,但分别显示的是发送方子任务的输出队列和接收方子任务的输入队列中的缓冲区数量。但想要判断缓冲区的准确数量是很难的,而且本地通道也有一个很微妙的特殊问题:由于本地输入通道没有自己的队列(它直接使用输出队列),因此通道的这个值始终为 0(请参阅 FLINK-12576);对于只有本地输入通道的情况 inputQueueLength = 0。 总的来说,我们不鼓励使用 outputQueueLength 和 inputQueueLength,因为它们的解读很大程度上取决于算子当前的并行度以及 Exclusive Buffer 和 Floating Buffer 的配置数量。相比之下,我们建议使用各种 *PoolUsage 指标,它们会为用户提供更详尽的信息。 ⚠️注意 如果你要判断缓存的使用率,请记住以下几点:
- 任何至少使用过一次的传出通道将始终占用一个缓冲区(从 Flink 1.5 开始)。
- Flink 1.8 及以下版本**:**这个缓冲区(即使是空的!)总是在 backlog 中计 1,因此接收方试图为它保留一个 Floating Buffer。
- Flink 1.9 及以上版本:仅当缓冲区已准备好消费时才在 backlog 中计数,即已满或已刷新(请参阅 FLINK-11082)。
- 接收方只会在反序列化其中的最后一条记录后才释放已接收的缓冲区。
以下各节将运用这些这些指标来判断反压和资源的使用率/效率与吞吐量的关系。 后面还会有一个独立的部分具体介绍与延迟相关的指标。
反压
有两组指标可以用来监测反压:它们分别是(本地)缓冲池使用率和输入 / 输出队列长度。它们提供了不同级别的粒度,可惜都不够全面,怎样解读这些指标也有很多说法。由于队列长度指标解读存在固有问题,我们将重点关注输入和输出池的使用率指标,该指标也提供了更多细节信息。- 如果一项子任务的 outPoolUsage 为 100%,则它正在经受反压。子任务已经被阻塞了,还是仍在将记录写入网络缓冲区,取决于 RecordWriter 当前正在写入的缓冲区有没有写满。这与反压监视器显示的结果是不一样的!
- 当 inPoolUsage 为 100%时表示所有 Floating Buffer 都分配给了通道,反压最终将传递到上游。这些 Floating Buffer 处于以下任一状态中:由于一个 Exclusive Buffer 正被占用(远程输入通道一直在尝试维护 #exclusive buffer 的 Credit),这些 Floating Buffer 被保留下来供将来在通道上使用;它们被保留下来用于发送方等待发送的数据(backlog);它们可能包含数据并在输入通道中排队;或者它们可能包含数据并正由接收方的子任务读取(一次一个记录)。
- Flink 1.8 及以下版本:根据 FLINK-11082,即使在正常情况下 100% 的 inPoolUsage 也很常见。
- Flink 1.9 及以上版本:如果 inPoolUsage 始终保持在 100% 左右,则这是上游出现反压的强烈信号。
outPoolUsage **low** | outPoolUsage **high** | |
---|---|---|
inPoolUsage low |
正常 | 注意 (产生反压,临时状态:上游暂未出现反压或已经解除反压) |
inPoolUsage high (Flink 1.9+) |
注意:如果所有上游任务的 outPoolUsage 都很低 (可能最终会产生反压) |
问题 (下游任务或网络出现反压,可能会向上游传递) |
问题:如果任何上游任务的 outPoolUsage 很高 (可能在上游导致反压,还可能是反压的源头) |
我们甚至可以通过查看两个连续任务的子任务的网络指标来深入了解反压产生的原因:
- 如果接收方任务的所有子任务的 inPoolUsage 值都很低,并且有任一上游子任务的 outPoolUsage 较高,则可能是网络瓶颈导致了反压。由于网络是 TaskManager 的所有子任务共享的资源,因此瓶颈可能不是直接源自这个子任务,而是来自于各种并发操作,例如 Checkpoint、其他流、外部连接或同一台计算机上的其他 TaskManager/ 进程。
- 反压也可以由任务的所有并行实例或单个实例引起。
Flink 1.9 及以上版本
- 如果 floatingBuffersUsage 没到 100%,那么就不太可能存在反压。如果它达到了 100% 且所有上游任务都在承受反压,说明这个输入正在单个、部分或全部输入通道上承受反压。你可以使用 exclusiveBuffersUsage 来区分这三种情况: - 假设 floatingBuffersUsage 接近 100%,则 exclusiveBuffersUsage 越高,输入通道承受的压越大。在 exclusiveBuffersUsage 接近 100%的极端情况下,所有通道都在承受压。
exclusiveBuffersUsage **low** | exclusiveBuffersUsage **high** | |
---|---|---|
floatingBuffersUsage low + 所有上游 outPoolUsage low | 正常 | [3] |
floatingBuffersUsage low + 任一上游 outPoolUsage high | 问题 (可能是网络瓶颈) |
[3] |
floatingBuffersUsage high + 所有上游 outPoolUsage low | 注意 (反压最终只出现在某些输入通道上) |
注意 (最终大部分或所有输入通道出现反压) |
floatingBuffersUsage high + 任一上游 outPoolUsage high | 问题 (仅在某些输入通道在出现反压) |
问题 (大部分或所有输入通道都在承受背压) |
[3] 不应该出现这种情况
资源使用率 / 吞吐量
除了上面提到的各个指标的单独用法外,还有一些组合用法可以用来探究网络栈的深层状况:- 吞吐量较低时 outPoolUsage 值却经常接近 100%,但同时所有接收方的 inPoolUsage 都很低,这表明我们 Credit 通知的往返时间(取决于你的网络延迟)太久,导致默认的 exclusive buffer 数量无法充分利用你的带宽。可以考虑增加 buffers-per-channel 参数或尝试禁用基于 Credit 的流量控制来验证。
- 结合 numRecordsOut 和 numBytesOut 有助于确定序列化记录的平均大小,进而帮助你针对峰值场景做容量规划。
- 如果要了解缓冲区填充率和输出刷新器的影响,可以将 numBytesInRemote 与 numBuffersInRemote 结合使用。在调整吞吐量(而不是延迟!)时,较低的缓冲区填充率可能意味着网络效率较低。在这种情况下请考虑增加缓冲区超时时间。请注意,在 Flink 1.8 和 1.9 中,numBuffersOut 仅在缓冲区填满或中断缓冲区的事件发生(例如 Checkpoint Barrier)时才会增加,并且可能会滞后。另外请注意,由于缓冲区是针对远程信道的优化技术,对本地信道影响有限,因此不需要在本地信道上考察缓冲区填充率。
- 你还可以使用 numBytesInLocal 和 numBytesInRemote 区分本地与远程流量,但在大多数情况下没这个必要。
如何处理反压?
假设你确定了反压(瓶颈)的位置,下一步就是分析为什么会发生这种情况。下面我们按照从基本到复杂的顺序列出了导致反压的一些潜在成因。我们建议首先检查基本原因,然后再深入研究更复杂的原因,否则就可能得出一些错误的结论。请记住,反压可能是暂时的,并且是负载峰值、Checkpoint 或 Job 重启时数据 backlog 待处理导致的结果。 在这种情况下,你通常可以忽略它。 此外还要记住,分析和解决问题的过程可能会受到瓶颈本身的影响。话虽如此,这里还是有几件事需要检查一下。
系统资源
首先,你应该检查机器的基本资源使用情况,如 CPU、网络或磁盘 I/O 等指标。如果某些资源使用率比较高,你可以执行以下操作:- 尝试优化你的代码。代码分析器在这种情况下很有帮助。
- 调整 Flink 的特定资源。
- 通过增加并行度和/或增加群集中的计算机数量来扩展资源。
垃圾收集
一般来说,长时间的垃圾回收工作会引发性能问题。你可以打印 GC 调试日志(通过 -XX: +PrintGCDetails)或使用某些内存/GC 分析器来验证你是否处于这种情况。由于 GC 问题的处理与应用程序高度相关,并且独立于 Flink,因此我们不会在此详细介绍(可参考 Oracle 的垃圾收集调整指南, 或 Plumbr 的 Java 垃圾回收手册)。CPU/ 线程瓶颈
如果一个或几个线程导致 CPU 瓶颈,而整台机器的 CPU 使用率仍然相对较低,则 CPU 瓶颈可能就很难被发现了。例如,48 核计算机上的单个 CPU 线程瓶颈只会带来 2%的 CPU 使用率。可以考虑使用代码分析器,因为它们可以显示每个线程的 CPU 使用情况,这样就能识别出热线程。线程争用
与上面的 CPU/ 线程瓶颈问题类似,共享资源上较高的线程争用率可能会导致子任务瓶颈。还是要请出 CPU 分析器,考虑查找用户代码中的同步开销/锁争用——虽然我们应该避免在用户代码中添加同步性,这可能很危险!还可以考虑调查共享系统资源。例如,默认 JVM 的 SSL 实现可以从共享的 /dev/urandom 资源获取数据。负载不均衡
如果你的瓶颈是由数据偏差引起的,可以尝试将数据分区更改为几个独立的重键,或实现本地/预聚合来清除偏差或减轻其影响。 除此之外还有很多情况。一般来说,为了减少瓶颈和反压,首先要分析它发生的位置,然后找出原因。最好从检查哪些资源使用率比较高开始入手。延迟追踪
在可能发生的各个位置跟踪延迟是一个独立的话题。在本节中,我们将重点关注 Flink 网络栈中记录的等待时间——包括系统的网络连接。在吞吐量较低时,这些延迟会直接受<font style="color:rgb(24, 24, 24);">输出刷新器</font>
的缓冲区超时
参数的影响,或间接受任何应用程序代码延迟的影响。当处理记录的时间比预期的要长或者(多个)计时器同时触发时并阻止接收方处理传入的记录,网络栈内后续记录的等待时间会显着延长。我们强烈建议你将自己的指标添加到 Flink 作业中,以便更好地跟踪作业中的延迟,并更全面地了解延迟产生的原因。
- single:每个算子子任务一个直方图
- operator(默认值):Source Task 和 算子子任务的每个组合有一个直方图
- subtask:Source子任务和 算子子任务的每个组合有一个直方图(并行度翻了两番!)
⚠️注意 Flink 的 LatencyMarker 假设集群中所有计算机上的时钟都是同步的。我们建议设置自动时钟同步服务(如 NTP)以避免错误的延迟结果。
警告:由于添加了大量的指标以及使用维护成本非常高的直方图,启用延迟指标会显著影响集群的性能(特别是对于子任务粒度)。强烈建议仅将它们用于调试目的。
总结
本文讨论了如何监控 Flink 的网络栈,主要涉及识别反压:发生的位置,反压源头,以及(可能)发生的原因。可以通过两种方式识别反压:使用反压监视器处理简单状况并调试会话;使用 Flink 的任务和网络栈指标实现持续监控、更深入的分析和更低的运行时开销。反压可以由网络层本身引起,但在大多数情况下是由高负载下的某些子任务引起的。通过对这些指标的分析研究可以区分这两种场景。我们还提供了一些监控资源使用情况和追踪可能从 Source 到 Sink 的网络延迟的手段。翻译
Flink Network Stack Vol. 2: Monitoring, Metrics, and that Backpressure Thing