如何定位反压的来源? - 图1

Web 用户界面中的反压监测 在过去的几年里,人们从不同的角度来探讨反压问题。然而,在最近发布的 Flink 的版本中,发现和分析背压的来源已经发生了很大的变化(尤其是在 Flink 1.13 中添加了新的指标和 Web UI)。这篇文章将试图阐明这些变化,并详细的说明如何追踪背压的来源。

什么是反压?

Ufuk Celebi 发表文章Apache Flink 如何处理背压,虽然时间比较久远但却非常准确的解释了反压。如果你还不熟悉反压这个概念,我强烈推荐你阅读它。可以进一步阅读Flink 网络流控与反压机制,更加深入地理解反压以及 Flink 的网络栈的工作原理。

如果 Job Graph 中的某些算子无法以接收记录相同的速率处理记录,则会发生反压。 这会使运行这个算子的子任务的输入缓冲区被填满。 一旦输入缓冲区已满,反压就会传播到上游子任务的输出缓冲区。 一旦这些缓冲区都被填满,上游子任务也会被迫降低记录的处理速度,以匹配下游算子的处理速度。 反压进一步向上传播,直到到达 source 算子

只要负载和可用资源是静态的,并且没有一个算子产生短暂的数据突发(如窗口算子),这些输入/输出缓冲区应该只处于以下两种状态之一:几乎为空或几乎被填满。

  • 如果下游算子或子任务能够跟上数据的流入,则缓冲区将为空
  • 如果跟不上,则缓冲区将被填满 [1]

[1] 还有第三种可能。 在极少数情况下,当网络交换是 Job 的瓶颈时,下游 task 的输入缓冲区为空,而上游输出缓冲区被填满。

事实上,检查缓冲区的使用指标是以几年前 Nico Kruber 描述的关于 监控、指标和反压 为基础。 正如我在开头提到的,Flink 现在提供了更好的工具来完成相同的工作,但在我们开始之前,需要弄清楚两个问题。

我为什么要关心反压?

反压是机器或算子超载的指示器。 反压的累积直接影响系统端到端的延迟,因为记录在处理之前需要在队列中等待更长的时间。 其次,在反压发生的时候 Aligned Checkpoint 需要更长的时间,而 Unaligned Checkpoint 的 size 会更大(可以在文档了解更多关于 checkpointing 的信息)。如果你正在为 Checkpoint Barriers 传播时间而苦恼,那么处理反压最有可能帮助解决这个问题。最后,你可能只想优化你的作业以降低运行作业的成本。

为了解决上面的问题,需要意识到反压存在,然后定位和分析反压。

为什么我不应该关心反压?

坦白地说,不必一直关心反压。 几乎从定义上来说,没有反压意味着集群至少会存在轻微的资源使用不充分和资源过剩。 如果你想最小化空闲资源,你可能无法避免产生一些反压。 对于批处理来说尤其如此。

如何检测和跟踪反压的来源?

检测反压的一种方法是使用指标,然而,在 Flink 1.13 中,不再需要深入调查。 在大多数情况下,只需查看 Web UI 中的 Job Graph 就足够了。

如何定位反压的来源? - 图2

在上面的例子中首先要注意的是不同的 task 有不同的颜色。 这些颜色代表了两个因素的组合:task 的反压有多大,以及 task 有多忙。 空闲的 task 是蓝色的,完全忙碌的 task 是红色的,完全反压的 task 将是黑色的。 介于两者之间的任何状态都将是这三种颜色的组合/阴影。 有了这些知识,人们就可以很容易地发现反压 task(黑色)。反压 **task**(黑色)下游最繁忙的(红色)任务很可能是背压的来源(瓶颈)

如果你点击一个特定的 task ,然后进入“ BackPressure”选项卡,你将能够进一步分析问题,并检查该 task 中每个子任务的 busy/backpressured/idle 状态。例如,如果存在数据倾斜,并且所有子任务的资源使用率并不完全相同,那么这种方法就特别方便。

如何定位反压的来源? - 图3

子任务之间的反压

在上面的示例中,我们可以清楚地看到哪些子任务处于空闲状态,哪些处于反压状态,以及没有一个处于忙碌状态。 简而言之,这应该足以让你快速了解你的 Job :)。

反压新指标的解释

如果你好奇它是如何工作的,我们可以更深入一点。 在这个新机制的基础上,每个子任务都会公开和计算三个新指标:

  • idleTimeMsPerSecond
  • busyTimeMsPerSecond
  • backPressuredTimeMsPerSecond
它们分别以 ms 为单位测量子任务处于空闲状态、繁忙状态或反压状态的平均时间。除了一些四舍五入问题,它们应该是相辅相成的,加起来可达 1000ms/s。从本质上讲,它们与例如 CPU 使用率指标非常相似。 另一个重要的细节是,它们是很短一段时间内(几秒钟)的平均值,它们考虑了子任务线程内发生的所有事情: 算子、函数、计时器、Checkpoint、记录序列化/反序列化、网络栈和其他 Flink 内部开销。忙于触发计时器并产生结果的 WindowOperator 将被报告为<font style="color:rgb(51, 51, 51);">繁忙或反压状态</font>。在 CheckpointedFunction#snapshotState 调用中执行一些昂贵计算的函数,例如刷新内部缓冲区,也会被报告为<font style="color:rgb(51, 51, 51);">繁忙状态</font>

然而,有一个限制是 busyTimeMsPerSecond 和 idleTimeMsPerSecond 指标忽略了在主子任务执行循环之外的单独线程中发生的任何事情。 幸运的是,这只与两种情况有关:

  • 在运算符中手动生成的自定义线程(不推荐
  • 实现 deprecated 的 SourceFunction 接口的旧式 source。这些 source 将报告 NaN/N/A 作为busyTimeMsPerSecond 的值。想了解更多关于数据源的信息,请点击这里

如何定位反压的来源? - 图4

旧式 source 不报告繁忙时间

为了在 Web UI 中的 Job Graph 显示这些原始数据,这些指标需要从所有子任务中聚合(在 Job Graph 上,我们仅显示 task)。 这就是为什么 Web UI 会显示给定 task 的所有子任务的最大值,以及为什么繁忙和反压的最大值加起来可能不会达到 100% 的原因。 一个子任务可以在 60% 时反压,而另一个子任务可以在 60% 时处于忙碌状态。 这可能导致 task 在 60% 时既反压又忙碌。

负载变化

还有一件事。你还记得这些指标是几秒钟内的平均值的吗?在分析具有不同负载的 job 或 task 时,请记住这一点,例如包含周期性触发的 WindowOperator 的 (sub)task。无论是负载持续为 50% 的子任务,还是每秒在完全忙碌和完全空闲之间交替的子任务,都将报告相同的 busyTimeMsPerSecond 值,即 500 ms/s。

此外,不同的负载,尤其是触发窗口可以将瓶颈移动到 Job Graph 中的不同位置:

如何定位反压的来源? - 图5

瓶颈在两个任务之间交替

如何定位反压的来源? - 图6

SlidingWindowOperator

在这个特定的示例中,SlidingWindowOperator 是瓶颈,只要它正在累积记录。 然而,一旦它开始触发它的窗口(每 10 秒一次),下游任务 SlidingWindowCheckMapper -> Sink: SlidingWindowCheckPrintSink 就会成为瓶颈,并且 SlidingWindowOperator 反压。 由于这些繁忙/背压/空闲指标的平均时间超过几秒钟,因此这种微妙之处不会立即可见,必须通过长时间仔细观察才能看出来。 最重要的是,Web UI 每 10 秒更新一次状态,这使得发现负载变化频繁的反压源头变得更加困难。

如何处理反压

一般来说,这是一个复杂的话题,值得专门写一篇文章。 在某种程度上之前的文章中已经提到了这个问题。

简而言之,有两种处理反压的高级方法。

  • 要么添加更多资源(更多机器、更快的 CPU、更多 RAM、更好的网络、使用 SSD……)
  • 要么优化现有资源的使用(优化代码、调整配置、避免数据倾斜)

无论哪种情况,首先需要通过以下方式分析导致背压的原因:

  1. 发现背压的存在
  2. 找出导致反压的子任务或机器
  3. 定位代码的哪一部分导致了反压,以及哪些资源是稀缺的

反压监控的改进和指标可以帮助你解决前两个问题。 为了解决最后一个问题,可能需要分析代码。 为了帮助分析,同样从 Flink 1.13 开始,火焰图(Flame Graph) 被集成到 Flink 的 Web UI 中。 火焰图(Flame Graph)是一种众所周知的分析工具和可视化技术,我鼓励你尝试一下。

但请记住,在确定瓶颈位置之后,你可以像分析其他非分布式应用程序一样分析它(通过检查资源利用率、attach profiler 等)。 通常没有解决这类问题的灵丹妙药你可以尝试扩大规模,但有时可能并不容易或不实际。

不管怎样……前面提到的对反压监控的改进让我们能够轻松地检测到反压的来源,而火焰图(Flame Graph)可以帮助我们分析特定子任务导致问题的原因。 这两个功能结合起来,可以使以前相当繁琐的 Flink 作业的调试和性能分析过程变得更加容易! 请升级到 Flink 1.13.x 并试用它们!

翻译

How to identify the source of backpressure?