到目前为止,我们一直从流水线作者或数据科学家的角度研究流处理。第2章介绍了水印,作为对以下问题的基本答案的一部分:事件时间处理在何处进行,处理时间何时实现。在本章中,我们处理相同的问题,但是从流处理系统的基本机制的角度出发。查看这些机制将有助于我们激励,理解和应用围绕水印的概念。我们讨论了如何在数据进入点创建水印,水印如何通过数据处理管道传播以及它们如何影响输出时间戳。我们还演示了水印如何保留必要的保证,这些保证是回答事件时间数据在何处处理以及何时实现(同时处理无限制数据)所必需的。

定义

考虑任何吸收数据并连续输出结果的管道。我们希望解决通常的问题,即何时可以安全地关闭事件时间窗口,这意味着该窗口不再需要任何数据。为此,我们要描述管道相对于其无限制输入所取得的进展。

解决事件时间窗口问题的一种幼稚方法是简单地将事件时间窗口基于当前处理时间。正如我们在第1章中所看到的,我们很快就遇到了麻烦-数据处理和传输不是瞬时的,因此处理和事件时间几乎永远不会相等。在我们的管道的任何打嗝或尖峰可能会导致我们无法正确分配消息的窗口。最终,该策略失败了,因为我们没有可靠的方法来保证此类窗口的安全。

另一种直观但最终不正确的方法是考虑管道处理的消息的速率。尽管这是一个有趣的指标,但是速率可能会随输入的变化,预期结果的可变性,可用于处理的资源等而任意变化。更重要的是,费率无助于回答完整性的根本问题。具体来说,rate不会告诉我们何时在特定时间间隔内看到了所有消息。在现实世界的系统中,有时会出现消息未通过系统取得进展的情况。这可能是由于瞬时错误(例如崩溃,网络故障,机器停机)引起的,也可能是由于持续错误(例如需要更改应用程序逻辑或其他人工干预才能解决的应用程序级故障)造成的。当然,如果发生许多故障,则处理率指标可能是检测到这一点的良好代理。但是,费率指标永远无法告诉我们一条消息无法通过我们的管道取得进展。但是,即使只有一条这样的消息也可以任意影响输出结果的正确性。

我们需要更稳健的进度度量。到达那里,我们对流数据做了一个基本假设:每个消息都有一个相关的逻辑事件时间戳。在连续获取无边界数据的情况下,此假设是合理的,因为这意味着将连续生成输入数据。在大多数情况下,我们可以将原始事件发生的时间作为其逻辑事件时间戳记。使用所有包含事件时间戳记的输入消息,然后我们可以检查这些时间戳记在任何管道中的分布。这样的流水线可能被分布为在许多代理上并行处理并消耗输入消息,而不能保证各个分片之间的顺序。因此,如图3-1所示,该管道中活动的飞行中消息的事件时间戳集将形成一个分布。

消息被管道吸收,处理并最终标记为完成。每条消息要么是“进行中”,即已收到但尚未完成,要么是“已完成”,即不再需要代表该消息进行处理。如果我们按事件时间检查消息的分布,则其外观如图3-1所示。随着时间的推移,更多消息将添加到右侧的“运行中”分发中,并且来自分发中“运行中”部分的更多消息将被完成并移至“已完成”分发中。
stsy_0301.png
图3-1 流管道中正在进行的和已完成的消息事件时间的分布。新消息作为输入到达,并保持“运行中”状态,直到对它们的处理完成。“飞行中”分布的最左边缘对应于任何给定时刻最早的未处理元素。

此分布上有一个关键点,位于“运行中”分布的最左侧,对应于管道中任何未处理消息的最旧的事件时间戳。我们使用此值定义水印:
水印是尚未完成的最古老作品的单调递增时间戳。

此定义提供了两个基本属性,这些属性使其很有用:

完整性

如果水印已超过时间戳T,我们将通过其单调属性来保证在T或T之前的按时(非延迟数据)事件将不再进行处理。因此,我们可以正确地在T或T之前发出任何聚合。换句话说,水印使我们知道何时关闭窗口是正确的。

能见度

如果由于任何原因邮件阻塞在我们的管道中,则水印将无法前进。此外,通过检查阻止水印前进的消息,我们将能够找到问题的根源。

源水印创建

这些水印来自哪里? 要为数据源建立水印,我们必须为从该源进入管道的每条消息分配一个逻辑事件时间戳。正如第二章告诉我们的那样,所有水印的创建都属于两大类之一:完美或启发式。为了提醒自己有关完美水印和启发式水印之间的区别,让我们看一下图3-2,该图展示了第2章中的加窗汇总示例。
stsy_0302.png
图3-2 带有完美(左)和启发式(右)水印的窗口求和

注意,区别特征在于完美的水印可确保水印处理所有数据,而启发式水印则接受某些后期数据元素。

在将水印创建为完美或启发式之后,水印将在整个管道的其余部分中保持不变。至于使水印创作完美或启发式的原因,很大程度上取决于所使用来源的性质。要了解原因,让我们看一下每种水印创建类型的几个示例。

完美的水印创作

完美的水印创建将时间戳记分配给传入的消息,从而严格保证结果水印不会再有任何事件时间小于水印的数据再次从该源中看到。使用完美水印创建的管道永远不必处理最新数据。也就是说,在水印之后到达的数据已经超过了新到达消息的事件时间。但是,完美的水印创建需要对输入有全面的了解,因此对于许多现实世界中的分布式输入源来说是不切实际的。以下是一些可以创建完美水印的用例示例:

入口时间戳记

将进入时间指定为数据进入系统的事件时间的源可以创建完美的水印。在这种情况下,源水印仅跟踪管道所观察到的当前处理时间。从本质上讲,这是2016年之前几乎所有支持窗口化的流系统使用的方法。

由于事件时间是从一个单一的,单调增加的源(实际处理时间)分配的,因此系统对数据流中下一个时间戳将具有完全的了解。结果,事件时间进度和窗口语义变得非常容易推论。当然,不利的一面是水印与数据本身的事件时间无关。这些事件时间被有效地丢弃,而水印仅跟踪相对于数据到达系统的数据进度。

静态的按时间顺序排列的日志集

时间顺序日志的静态大小为2的输入源(例如,具有静态分区集合的Apache Kafka主题,其中源的每个分区包含单调增加的事件时间)将是最简单的来源,可以创建完美的水印。为此,源只需在已知和静态的源分区集中跟踪未处理数据的最小事件时间(即,每个分区中最近读取的记录的最小事件时间)。

与前面提到的入口时间戳类似,由于已知整个静态分区集合中的事件时间会单调增加,因此该系统对接下来的时间戳具有完全的了解。这实际上是有界乱序处理的一种形式;已知分区中的无序量受这些分区中观察到的最小事件时间的限制。

通常,保证分区中时间戳单调增加的唯一方法是,当这些分区中的时间戳分配给数据写入时分配这些时间戳;例如,通过Web前端将事件直接记录到Kafka中。尽管仍然是一个有限的用例,但它肯定比到达数据处理系统时输入入口时间戳更为有用,因为水印会跟踪基础数据的有意义的事件时间。

启发式水印创建

另一方面,启发式水印创建会创建一个水印,该水印仅是估计不会再看到事件时间小于水印的数据。使用启发式水印创建的管道可能需要处理一些后期数据。延迟数据是在水印超前该数据的事件时间之后到达的任何数据。只能通过启发式水印创建来获得后期数据。如果启发式算法是一个相当不错的算法,则后期数据量可能很小,并且水印仍然可以用作完成估算。如果系统要支持需要正确性的用例(例如帐单之类的东西),则该系统仍需要为用户提供处理最新数据的方法。

对于许多现实世界中的分布式输入源而言,构建完美的水印在计算或操作上都是不切实际的,但仍然可以通过利用输入数据源的结构特征来构建高度精确的启发式水印。以下是两个可能具有启发式水印(质量不同)的示例:

动态的时间顺序日志集

考虑动态的结构化日志文件集(每个单独的文件包含相对于同一文件中的其他记录而言,事件时间单调增加的事件时间,但文件之间的事件时间没有固定关系的记录),其中包含完整的预期日志文件集(即,分区(以Kafka的说法)在运行时未知。通常在由多个独立团队构建和管理的全球规模服务中找到这种投入。在这样的用例中,在输入上创建完美的水印是很棘手的,但是创建准确的启发式水印是完全可能的。

通过跟踪现有日志文件集中未处理数据的最短事件时间,监视增长率并利用网络拓扑和带宽可用性等外部信息,即使在缺乏完善知识的情况下,您也可以创建非常准确的水印。所有输入。这种类型的输入源是Google上最常见的无边界数据集类型之一,因此我们在创建和分析此类方案的水印质量方面拥有丰富的经验,并且已经看到它们在许多用例中都可以很好地发挥作用。

Google Cloud Pub / Sub

Cloud Pub / Sub是一个有趣的用例。Pub / Sub目前不保证按订单交货; 即使单个发布者按顺序发布了两条消息,也有可能(通常很小)将它们无序发送(这是由于底层体系结构的动态特性所致,这使得透明性可以扩展到很高) 零用户干预的吞吐量水平)。结果,无法保证Cloud Pub / Sub拥有完美的水印。但是,Cloud Dataflow团队已经利用Cloud Pub / Sub中有关数据的知识,建立了合理准确的启发式水印。本章稍后将以案例研究的形式详细讨论这种启发式方法的实现。

考虑一个用户玩手机游戏并将其分数发送到我们的渠道进行处理的示例:您通常可以假设,对于使用移动设备进行输入的任何来源,通常都不可能提供完美的水印。由于设备长时间脱机的问题,因此无法为这种数据源提供任何形式的绝对完整性的合理估计。但是,您可以想象构建一个水印,以准确跟踪当前在线设备的输入完整性,类似于刚才描述的Google Pub / Sub水印。从无论如何提供低延迟结果的角度来看,活跃在线的用户很可能是最相关的用户子集,因此,这通常并没有您最初想象的那么多。

从广义上讲,通过启发式水印的创建,对源的了解越多,启发式就越好,并且可以看到的后期数据也越少。鉴于源的类型,事件的分布和使用模式将有很大的不同,因此没有一种万能的解决方案。但是,无论哪种情况(完美或启发式),在输入源上创建水印后,系统都可以完美地通过管道传播水印。这意味着完美的水印将在下游保持完美,启发式水印将严格保持与建立时一样的启发式。这是水印方法的好处:您可以完全从源头创建水印的问题上,减少跟踪管道完整性的复杂性。

水印传播

但是,大多数现实世界中的管道包含多个阶段。了解水印如何在独立阶段传播,对于了解水印如何影响整个管道以及 观察到的结果延迟很重要。

| 管道阶段
每当您的管道按某个新维度将数据分组在一起时,通常都需要不同的阶段。例如,如果您有一个使用原始数据的管道,计算了每个用户的汇总,然后使用这些每个用户的汇总来计算一些每个团队的汇总,那么您可能会得到一个三阶段的管道 :
- 一个使用原始的未分组数据
- 可以按用户分组数据并按用户汇总计算
- 可以按团队对数据分组并按团队计算汇总
我们将在第6章中进一步了解分组对管道形状的影响。 | | —- |

如上一节所述,在输入源处创建水印。然后,它们在概念上随着数据在系统中的流动而流经系统。您可以以不同的粒度级别跟踪水印。对于包含多个不同阶段的管道,每个阶段可能会跟踪其自己的水印,该水印的值是其之前所有输入和阶段的函数。因此,即将进行的阶段中的水印将具有过去的水印(因为它们看到的总输入量较少)。

我们可以在管道中任何分段操作或阶段的边界处定义水印。这只是理解管道中每个阶段的相对进度,而且对于独立且及时地为每个阶段尽快重新生成结果是有用的。边界处的水印,我们认为以下定义:

  • 输入水印,它捕获该阶段上游所有内容的进度(即该阶段输入的完成程度)。对于源,输入水印是特定于源的功能,可为输入数据创建水印。对于非源阶段,输入水印被定义为其所有上游源和阶段的所有分片/分区/实例的输出水印的最小值。
  • 输出水印,它捕获阶段本身的进度,并且基本上定义为阶段的输入水印和阶段中所有非晚期数据活动消息的事件时间的最小值。确切地说,“活动”所包含的内容在一定程度上取决于给定阶段实际执行的操作以及流处理系统的实现。它通常包括为聚合而缓冲但尚未在下游实现的数据,正在传输到下游阶段的待处理输出数据,等等。

定义特定阶段的输入和输出水印的一个不错的功能是,我们可以使用它们来计算阶段引入的事件时间延迟量。从其输入水印的值中减去该阶段的输出水印的值,即可得出该阶段引入的事件时间延迟或延迟量。这个滞后是每个阶段的输出将比实时延迟多远的概念。例如,执行10秒窗口聚合的阶段将具有10秒或更长时间的延迟,这意味着该阶段的输出将至少延迟那么多输入和实时。输入和输出水印的定义在整个管道中提供了水印的递归关系。管道中的每个后续阶段都会根据阶段的事件时间延迟,根据需要延迟水印。

每个阶段中的处理也不是整体的。我们可以在一个阶段内将处理划分为具有几个概念性组件的流,每个概念性组件都对输出水印有所贡献。如前所述,这些组件的确切性质取决于阶段执行的操作和系统的实现。从概念上讲,每个此类组件都充当缓冲区,活动消息可以驻留在缓冲区中,直到完成某些操作为止。例如,当数据到达时,将其缓存以进行处理。然后,处理过程可能会将数据写入状态,以供以后延迟聚合。延迟聚合触发后,可能会将结果写入输出缓冲区,等待下游阶段的消耗,如图3-3所示。
stsy_0303.png
图3-3 流传输系统阶段的示例系统组件,其中包含运行中数据的缓冲区。每个都将具有关联的水印跟踪,并且该阶段的总输出水印将是所有此类缓冲区中水印的最小值。

我们可以使用自己的水印跟踪每个此类缓冲区。每个阶段的缓冲区中最小的水印形成了阶段的输出水印。因此,输出水印可能是以下各项中的最小值:

  • 每个源的水印-每个发送阶段。
  • 每个外部输入水印—用于管道外部的源
  • 每个状态组件水印—用于可以写入的每种状态
  • 每个输出缓冲区水印—用于每个接收阶段

在此粒度级别上使水印可用还可以更好地了解系统的行为。水印跟踪消息在系统中各个缓冲区之间的位置,从而更易于诊断阻塞。

了解水印传播

为了更好地了解输入和输出水印之间的关系以及它们如何影响水印传播,我们来看一个示例。让我们考虑一下游戏得分,但是我们将计算用户参与度,而不是计算团队得分的总和。我们将首先计算每个用户的会话时长,前提是用户保持参与游戏的时间是他们享受游戏的合理时间。回答完四个问题以计算会话长度后,我们将第二次回答它们以计算固定时间段内的平均会话长度。

为了使我们的示例更加有趣,可以说我们正在处理两个数据集,一个用于移动得分,一个用于控制台得分。我们想通过整数求和在这两个独立的数据集上执行相同的分数计算。一种途径是为在移动设备上玩的用户计算分数,而另一种途径是为在家用游戏机上玩的用户计算分数,这可能是由于不同平台采用了不同的数据收集策略。重要的一点是,这两个阶段执行相同的操作,但是对不同的数据执行操作,因此输出水印非常不同。
首先,让我们看一下示例3-1,以了解该管道第一部分的缩写代码。

示例3-1 计算会话长度

  1. PCollection<Double> mobileSessions = IO.read(new MobileInputSource()) .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
  2. .triggering(AtWatermark())
  3. .discardingFiredPanes()) .apply(CalculateWindowLength());
  4. PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource()) .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
  5. .triggering(AtWatermark())
  6. .discardingFiredPanes()) .apply(CalculateWindowLength());

在这里,我们独立阅读每个输入,而以前我们是按团队对集合进行键入,在此示例中,我们按用户进行键入。之后,对于每个管道的第一阶段,我们进入会话,然后调用一个名为CalculateWindowLength的自定义PTransform。此PTransform只是按键(即用户)分组,然后通过将当前窗口的大小视为该窗口的值来计算每个用户的会话长度。在这种情况下,我们可以使用默认触发器(AtWatermark)和累积模式(discardingFiredPanes)设置,但是为了完整起见,我明确列出了它们。两个特定用户的每个管道的输出可能类似于图3-4。
stsy_0304.png
图3-4 跨两个不同输入管道的每用户会话长度

因为我们需要跨多个阶段跟踪数据,所以以红色跟踪与移动得分相关的所有内容,以蓝色跟踪与控制台得分相关的所有内容,而图3-5中“平均会话长度”的水印和输出为黄色。

我们已经回答了关于如何计算各个会话长度的四个问题:何时,何地,何时以及如何计算。接下来,我们将第二次回答它们,以将这些会话长度转换为固定时间窗口内的全局会话长度平均值。这要求我们首先将两个数据源合并为一个,然后重新显示在固定窗口中; 我们已经在计算的会话长度值中捕获了会话的重要本质,现在我们希望在一天中一致的时间范围内计算这些会话的全局平均值。例3-2显示了此代码。

示例3-2 计算会话长度

  1. PCollection<Double> mobileSessions = IO.read(new MobileInputSource()) .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
  2. .triggering(AtWatermark())
  3. .discardingFiredPanes()) .apply(CalculateWindowLength());
  4. PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource()) .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
  5. .triggering(AtWatermark())
  6. .discardingFiredPanes()) .apply(CalculateWindowLength());
  7. PCollection<Float> averageSessionLengths = PCollectionList .of(mobileSessions).and(consoleSessions) .apply(Flatten.pCollections()) .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
  8. .triggering(AtWatermark())
  9. .apply(Mean.globally());

如果我们看到此管道正在运行,则其外观如图3-5所示。和以前一样,两个输入管道正在为移动和控制台播放器计算各自的会话长度。然后,这些会话长度输入到管道的第二阶段,在此阶段,在固定窗口中计算全局会话长度平均值。
stsy_0305.png
图3-5 移动和主机游戏会话的平均会话时长(续下页)

鉴于发生了很多事情,让我们来看一些示例。这里的两个重要点是:

  • 每个“移动会话”和“控制台会话”阶段的输出水印至少与每个相应的输入水印一样老,实际上要早一些。这是因为在实际系统中,计算答案需要时间,并且我们不允许输出水印前进,直到完成对给定输入的处理为止。
  • “平均会话长度”阶段的输入水印是直接上游两个阶段的输出水印的最小值。

结果是,下游输入水印是上游输出水印的最小成分的别名。请注意,这与本章前面对这两种水印的定义相匹配。还要注意,过去下游的水印在过去是如何发展的,捕捉到了直观的观念,即上游阶段在时间上将比其后阶段更先进。

值得在此进行观察的一个观察结果就是,我们能够多么干净地再次询问示例3-1中的问题,从而大大改变了管道的结果。之前我们只计算每个用户的会话长度,而现在我们计算两分钟的全局会话长度平均值。这样可以更深入地了解用户在玩我们的游戏时的整体行为,并且使您可以一眼看出简单数据转换与实际数据科学之间的区别。

更好的是,既然我们了解了该管道如何工作的基本知识,我们可以更仔细地研究与再次询问四个问题有关的更细微的问题之一:输出时间戳。

水印传播和输出时间戳

在图3-5中,我介绍了输出时间戳的一些细节。但是,如果您仔细观察图中的第二阶段,您会看到第一阶段的每个输出都分配了与其窗口末尾匹配的时间戳。尽管这是输出时间戳的自然选择,但这并不是唯一有效的选择。从本章前面的内容可以知道,水印绝对不能向后移动。鉴于此限制,您可以推断出给定窗口的有效时间戳记的范围从窗口中最早的nonlate记录的时间戳开始(因为仅保证nonlate记录可以保留水印)并一直扩展到正无穷大。有很多选择。但是实际上,在大多数情况下,只有几种选择有意义:

窗口尽头

如果希望输出时间戳代表窗口边界,则使用窗口末尾是唯一安全的选择。稍后我们将看到,它还允许所有选项中最平滑的水印渐进。

第一个非延迟元素的时间戳

当您想要使水印尽可能保守时,使用第一个nonlate元素的时间戳是一个不错的选择。但是,权衡取舍的是,水印的进展可能会受到更大的阻碍,正如我们不久还将看到的那样。

特定元素的时间戳

对于某些用例,从系统角度来看,其他任意元素的时间戳是正确的选择。想象一下一个用例,其中您将查询流与该查询结果的点击流连接在一起。在执行了连接之后,某些系统会发现查询的时间戳更加有用。其他人则更喜欢点击的时间戳。从水印正确性的角度来看,任何这样的时间戳都是有效的,只要它与不迟到的元素相对应即可。

在考虑了输出时间戳的一些替代选项之后,让我们看一下选择输出时间戳会对整个管道产生什么影响。为了使更改尽可能地引人注目,在示例3-3和图3-6中,我们将切换到对窗口使用最早的时间戳:第一个不晚元素的时间戳作为窗口的时间戳。

示例3-3 平均会话长度管道,它输出最早设置的会话窗口的时间戳

  1. PCollection<Double> mobileSessions = IO.read(new MobileInputSource()) .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
  2. .triggering(AtWatermark())
  3. .withTimestampCombiner(EARLIEST)
  4. .discardingFiredPanes()) .apply(CalculateWindowLength());
  5. PCollection<Double> consoleSessions = IO.read(new ConsoleInputSource()) .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(1)))
  6. .triggering(AtWatermark())
  7. .withTimestampCombiner(EARLIEST)
  8. .discardingFiredPanes()) .apply(CalculateWindowLength());
  9. PCollection<Float> averageSessionLengths = PCollectionList .of(mobileSessions).and(consoleSessions) .apply(Flatten.pCollections()) .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
  10. .triggering(AtWatermark())
  11. .apply(Mean.globally());

stsy_0306.png
图3-6 在最早元素的时间戳上输出的会话的平均会话长度(续下页)

为了帮助指出输出时间戳选择的效果,请看一下第一阶段中的虚线,其中显示了每个阶段要保留的输出水印。与图3-7和3-8相比,我们选择的时间戳会延迟输出水印,在图3-7和3-8中,选择将输出时间戳作为窗口的结尾。从该图可以看出,第二级的输入水印因此也随之延迟。
stsy_0307.png
图3-7 使用不同的窗口超时时间戳比较水印和结果。该图中的水印对应于会话窗口末尾的输出时间戳(即图3-5)。
stsy_0308.png
图3-8 在该图中,水印在会话窗口的开始处(即图3-6)。我们可以看到该图中的水印线更加延迟,并且最终的平均会话长度也有所不同。

就此版本与图3-7的区别而言,有两个值得注意的地方:

水印延迟

与图3-5相比,水印在图3-6中的处理要慢得多。这是因为第一阶段的输出水印会保留到每个窗口中第一个元素的时间戳,直到该窗口的输入完成为止。仅在实现给定窗口后,才允许输出水印(以及下游输入水印)前进。

语义差异

因为现在分配了会话时间戳以匹配会话中最早的nonlate元素,所以当我们在下一阶段计算会话长度平均值时,各个会话通常会以不同的固定窗口时段结尾。到目前为止,我们所看到的两个选项中的任何一个都不存在固有的对与错;他们只是不同。但重要的是要了解它们会有所不同,并且对它们的区别方式有直觉,以便您可以在特定时机针对特定用例做出正确的选择。

重叠窗口的棘手案例

关于输出时间戳的另一个细微但重要的问题是如何处理滑动窗口。将输出时间戳设置为最早元素的幼稚方法很容易导致下游延迟,这是因为(正确)阻止了水印。要了解为什么,请考虑一个示例管道,该管道具有两个阶段,每个阶段都使用相同类型的滑动窗口。假设每个元素最终在三个连续的窗口中结束。随着输入水印的发展,在这种情况下,滑动窗口所需的语义如下:

  • 第一个窗口在第一阶段完成,并向下游发射。
  • 然后,第一个窗口在第二阶段完成,也可以向下游发射。
  • 一段时间后,第二个窗口在第一阶段完成…依此类推。

但是,如果选择输出时间戳作为窗格中第一个nonlate元素的时间戳,则实际发生的情况如下:

  • 第一个窗口在第一阶段完成,并向下游发射。
  • 第二阶段中的第一个窗口仍然无法完成,因为上游的第二个和第三个窗口的输出水印阻止了它的输入水印。正确地保留了这些水印,因为最早的元素时间戳被用作这些窗口的输出时间戳。
  • 第二个窗口在第一阶段完成,并向下游发射。
  • 第二阶段的第一个和第二个窗口仍然无法完成,由上游的第三个窗口阻止。
  • 第三窗口在第一阶段完成,并向下游发射。
  • 现在,第二阶段中的第一,第二和第三个窗口都可以完成,最终一次完成所有三个窗口的发射。

尽管此窗口化的结果是正确的,但这导致结果以不必要的延迟方式得以实现。因此,Beam具有用于重叠窗口的特殊逻辑,以确保窗口N + 1的输出时间戳始终大于窗口N的结尾。

百分位数水印

到目前为止,我们已经关注了水印,该水印是通过一个阶段中活动消息的最小事件时间来衡量的。跟踪最小值可让系统知道何时考虑了所有较早的时间戳。另一方面,我们可以考虑活动消息的事件时间戳记的整个分布,并利用它来创建更细粒度的触发条件。

不用考虑分布的最小点,我们可以取分布的任何百分比,并说我们可以保证已用较早的时间戳处理了所有事件的这一百分比。

此方案的优点是什么?如果对于业务逻辑而言,“大部分”正确是足够的,则百分位数水印提供了一种机制,通过该机制,与我们通过丢弃分布的长尾中的异常值来跟踪最小事件时间相比,水印可以更快,更平稳地前进。水印。图3-9显示了事件时间的紧凑分布,其中第90个百分位水印接近第100个百分位。图3-10展示了离群值进一步落后的情况,因此第90个百分位数的水印明显领先于第100个百分位数。通过从水印中丢弃离群值数据,百分位数水印仍然可以跟踪分布的大部分,而不会受到离群值的延迟。
stsy_0309.png
图3-9 外观正常的水印直方图
stsy_0310.png
图3-10 带离群值的水印直方图

图3-11显示了一个用于绘制两分钟固定窗口的边界的百分比水印示例。我们可以根据百分比水印跟踪的到达数据的时间戳百分比来绘制早期边界。
stsy_0211.png
图3-11 水印百分位数的影响。随着百分位数的增加,窗口中将包含更多事件:但是,实现该窗口的处理时间延迟也会增加。

图3-11显示了第33个百分位,66个百分位和100个百分位(满)水印,并跟踪了数据分布中相应的时间戳百分位。正如预期的那样,这些方法允许绘制边界早于跟踪整个第100个百分点的水印。请注意,第33个和第66个百分点的水印均允许较早触发窗口,但需要权衡将更多数据标记为较晚的时间。例如,对于第一个窗口[12:00,12:02),基于第33个百分位数水印关闭的窗口将仅包含四个事件,并在12:06处理时间实现结果。如果我们使用第66个百分位水印,则相同的事件时间窗口将包含七个事件,并在12:07处理时间实现。使用第100个百分位水印包括所有十个事件,并将结果具体化延迟到12:08处理时间。因此,百分位数水印提供了一种在实现结果的延迟与结果的精度之间进行权衡的方法。

处理时水印

到目前为止,我们一直在研究水印,因为它们与流经我们系统的数据有关。我们已经看到查看水印如何帮助我们识别最旧的数据和实时之间的总体延迟。但是,这还不足以区分旧数据和延迟的系统。换句话说,仅通过检查到目前为止定义的事件时间水印,我们就无法区分正在快速且无延迟地处理一个小时前的数据的系统与正在尝试处理的系统。处理实时数据,但已延迟了一个小时。

为了实现这种区分,我们需要更多的东西:处理时水印。我们已经看到流系统中有两个时域:处理时间和事件时间。到目前为止,我们已经根据事件流域中系统流过的数据的时间戳完全定义了水印。这是一个事件时间水印。现在,我们将相同的模型应用于处理时域,以定义处理时水印。

我们的流处理系统会不断执行操作,例如在阶段之间改组消息,将消息读取或写入持久状态或基于水印进度触发延迟的聚合。所有这些操作都是根据管道当前或上游阶段执行的先前操作执行的。因此,就像数据元素“流经”系统一样,处理这些元素所涉及的一系列操作也“流经”系统。

我们使用与定义事件时间水印完全相同的方式来定义处理时间水印,不同的是,我们使用最早的歌剧的处理时间时间戳来代替未完成的最旧工作的事件时间时间戳。尚未完成。延迟处理时间水印的一个例子可能是从一个阶段到另一个阶段的消息传递卡住,阻塞了读取状态或外部数据的I / O调用,或者是处理过程中阻止处理完成的异常。

因此,处理时间水印提供了与数据延迟分开的处理延迟概念。要了解这种区别的价值,请考虑图3-12中的图表,其中我们查看了事件时间水印延迟。

我们看到数据延迟在单调增加,但是没有足够的信息来区分滞留系统和滞留数据的情况。只有查看图3-13中所示的处理时间水印,我们才能区分出这种情况。
stsy_0312.png
图3-12 事件时间水印增加。无法从此信息中得知这是由于数据缓冲还是系统处理延迟引起的。
stsy_0313.png
图3-13 处理时间水印也在增加。这表明系统处理被延迟。

在第一种情况下(图3-12),当我们检查处理时间水印延迟时,我们发现它也在增加。这告诉我们系统中的某个操作被卡住了,卡住还导致数据延迟落后。在现实世界中,可能发生这种情况的一些例子是,当网络出现问题时,防止在管道的各个阶段之间传递消息,或者发生了故障并正在重试。通常,处理时间水印的增加表明存在一个问题,该问题阻止了系统功能所需的操作无法完成,并且通常需要用户或管理员的干预才能解决。

在第二种情况下,如图3-14所示,处理时间水印延迟很小。这告诉我们没有卡住的操作。事件时间水印延迟仍在增加,这表明我们有一些等待耗尽的缓冲状态。例如,如果我们在等待窗口边界发出聚合时正在缓冲某个状态,并且这对应于管道的正常操作,则可能发生这种情况,如图3-15所示。
stsy_0314.png
图3-14 事件时间水印延迟增加,处理时间水印稳定。这表明数据已在系统中缓冲并等待处理,而不是表明系统操作正在阻止数据处理完成。
stsy_0315.png
图3-15 固定窗口的水印延迟。事件时间水印延迟随着元素在每个窗口中的缓冲而增加,并随着每个窗口的聚集通过按时触发而发出而减小,而处理时间水印简单地跟踪系统级延迟(在系统级延迟中保持相对稳定)。健康的管道)。

因此,处理时间水印是区分系统等待时间和数据等待时间的有用工具。除了可见性之外,我们还可以在系统实现级别上使用处理时水印来处理诸如临时状态的垃圾收集之类的任务(Reuven在第5章中更多地讨论了此示例)。

实例探究

既然我们已经为水印的行为打下了基础,现在该看看一些实际系统来了解水印的不同机制是如何实现的。我们希望这些能够为延迟和正确性之间的权衡取舍,以及现实系统中水印的可扩展性和可用性。

案例研究:Google Cloud Dataflow中的水印

在流处理系统中实现水印的方法有很多种。在这里,我们简要介绍了Google Cloud Dataflow中的实施情况,这是用于执行Apache Beam管道的完全托管服务。数据流包括用于定义数据处理工作流程的SDK,以及用于在Google Cloud Platform资源上运行这些工作流程的Cloud Platform托管服务。

通过将每个工作程序的可用键空间划分为多个键范围并将每个范围分配给一个工作程序,数据流在多个物理工作程序中的数据处理图中带状(分片)了多个物理工作程序中的每个数据处理步骤。每当遇到具有不同键的GroupBy Key操作时,都必须将数据改组为对应的键。

图3-16描绘了带有GroupBy键的处理图的逻辑表示。
stsy_0316.png
图3-16 GroupByKey步骤使用来自另一个DoFn的数据。这意味着在第一步的键和第二步的键之间存在数据混洗。

关键范围对工作人员的物理分配可能如图3-17所示。
stsy_0317.png
图3-17 这两个步骤的关键范围是在可用工作程序之间分配(带)的。

在水印传播部分,我们讨论了为每个步骤的多个子组件维护水印。数据流跟踪每个组件的每个范围的水印。然后,水印聚合涉及在所有范围内计算每个水印的最小值,以确保满足以下保证:

  • 所有范围都必须报告水印。如果某个范围内没有水印,则我们无法推进该水印,因为未报告的范围必须视为未知。
  • 确保水印单调增加。由于可能会有较晚的数据,因此如果水印会导致水印向后移动,我们就不能对其进行更新。

Google Cloud Dataflow通过集中的聚合器代理执行聚合。我们可以将该代理分片以提高效率。从正确性的角度来看,水印聚合器充当有关水印的“单一事实来源”。

确保分布式水印聚合的正确性提出了某些挑战。至关重要的是,不会过早地推进水印,因为过早地推进水印会将打开时间的数据转换为较晚的数据。具体而言,随着对工作人员的物理分配被激活,工作人员在键范围内附加的持久状态上保持租约,从而确保只有单个工人可以为键更改持久状态。为了保证水印的正确性,我们必须确保仅当工作进程仍保持其持久状态的租约时,才能将工作进程中的每个水印更新都纳入汇总中。因此,水印更新协议必须考虑到国家所有权的租赁确认。

案例研究:Apache Flink中的水印

Apache Flink是一个开源流处理框架,用于分布式,高性能,始终可用且准确的数据流应用程序。可以使用Flink运行程序运行Beam程序。为此,Beam依赖于流处理概念的实现,例如Flink中的水印。与通过集中式水印聚合器代理实现水印聚合的Google Cloud Dataflow不同,Flink在带内执行水印跟踪和聚合。

为了了解它是如何工作的,让我们看一下Flink管道,如图3-18所示。
stsy_0318.png
图3-18 具有两个源和在带内传播的事件时间水印的Flink管道

在此管道中,在两个源处生成数据。这些源还都生成了水印“检查点”,这些检查点与数据流同步带内发送。这意味着,当从源A发出时间戳为“ 53”的水印检查点时,它保证不会从源A发出时间戳为“ 53”之后的非延迟数据消息。下游的“ keyBy”运算符使用输入数据和水印检查点。随着新的水印检查点被消耗,下游操作员对水印的看法会提高,并且可以为下游操作员发出新的水印检查点。

与数据流一起在带内发送水印检查点的这种选择不同于依靠中央聚合的Cloud Dataflow方法,并导致一些有趣的折衷。

以下是带内水印的一些优点:

减少水印传播延迟,并且延迟非常低

因为不必使水印数据遍历多跳并等待中央聚合,所以使用带内方法可以更轻松地实现非常低的延迟。

水印聚合没有单点故障

中央水印聚合代理的不可用将导致整个管道中水印的延迟。使用带内方法时,部分管道的不可用不会导致整个管道的水印延迟。

固有的可扩展性

尽管Cloud Dataflow在实践中可以很好地扩展,但与带内水印的隐式可伸缩性相比,使用集中式水印聚合服务来实现可伸缩性还需要更多的复杂性。

以下是带外水印聚合的一些优点:

“真相”的单一来源

对于可调试性,监视和其他应用程序(例如基于管道进度的节流输入),具有可出售水印值而不是在流中隐含水印的服务是有利的,并且系统的每个组件都有自己的部分 视图。

源水印创建

一些源水印需要全局信息。例如,源可能暂时处于闲置状态,数据速率较低,或者需要有关源或其他系统组件的带外信息以生成水印。这在中央服务中更容易实现。有关示例,请参阅以下针对Google Cloud Pub / Sub的源水印的案例研究。

案例研究:Google Cloud Pub / Sub的源水印

Google Cloud Pub / Sub是一项完全托管的实时消息传递服务,可让您在独立的应用程序之间发送和接收消息。在这里,我们讨论如何为通过Cloud Pub / Sub发送到管道中的数据创建合理的启发式水印。
首先,我们需要描述一下Pub / Sub的工作原理。消息发布在发布/订阅主题上。特定主题可以由任意数量的发布/订阅订阅来订阅。在订阅给定主题的所有订阅上传递相同的消息。传递方法是让客户从订阅中提取消息,并通过提供的ID确认收到特定消息。尽管发布/订阅确实会尝试首先提供最旧的消息,但客户并没有选择要提取哪些消息,对此没有硬性保证。

为了建立启发式方法,我们对将数据发送到Pub / Sub的源进行一些假设。具体来说,我们假设原始数据的时间戳是“行为良好的”。换句话说,在发送到Pub / Sub之前,我们希望源数据上有一定数量的乱序时间戳记。时间戳超出允许的无序范围发送的任何数据都将被视为后期数据。在我们当前的实现中,此界限至少为10秒,这意味着在发送到Pub / Sub之前最多10秒的时间戳重新排序不会创建较晚的数据。我们将此值称为估计范围。另一种看待此问题的方式是,当管线完全与输入保持一致时,水印将比实时时间晚10秒,以允许从源进行重新排序。如果管道积压,则所有积压(不仅是10秒的波段)都用于估计水印。

我们在发布/订阅中面临什么挑战?因为发布/订阅不能保证顺序,所以我们必须具有某种其他元数据才能充分了解积压。幸运的是,Pub / Sub提供了“未确认的最早的发布时间戳”方面的积压量。这与消息的事件时间戳不同,因为Pub / Sub与通过它发送的应用程序级元数据无关。相反,这是Pub / Sub提取邮件的时间戳。

此度量与事件时间水印不同。实际上,这是发布/订阅消息传递的处理时间水印。发布/订阅发布时间戳记不等于事件时间戳记,并且在发送历史(过去)数据的情况下,它可能任意距离。这些时间戳记的顺序也可能有所不同,因为如前所述,我们允许有限的重新排序。

但是,我们可以将其用作积压的一种方法,以了解有关积压中存在的事件时间戳的足够信息,以便我们可以如下创建合理的水印。

我们为包含输入消息的主题创建两个订阅:管道将实际用于读取要处理的数据的基本订阅,以及仅用于元数据以执行水印估计的跟踪订阅。

看一下图3-19中的基本订阅,我们看到消息可能会乱序到达。我们用发布/订阅发布时间戳“ pt”和事件时间时间戳“ et”标记每条消息。请注意,两个时域可以不相关。
stsy_0319.png
图3-19 通过发布/订阅订阅到达的消息的处理时间和事件时间时间戳

基本订阅上的某些消息未被确认,形成了积压。这可能是由于尚未交付,或者可能已经交付但尚未处理。还请记住,来自此订阅的请求分散在多个分片上。因此,仅通过查看基本订阅就不可能说出我们的水印应该是什么。

跟踪预订(如图3-20所示)用于有效检查基本预订的积压日志,并在积压日志中获取最少的事件时间戳。在跟踪订阅中保留很少或没有待办事项,我们可以在基本订阅的最早的未确认消息之前检查消息。
stsy_0320.png
图3-20 额外的“跟踪”订阅接收与“基本”订阅相同的消息

通过确保从此订阅中进行提取在计算上不昂贵,我们可以跟踪跟踪订阅。相反,如果我们在跟踪订阅方面远远落后,我们将停止推进水印。为此,我们确保至少满足以下条件之一:

  • 跟踪订阅远远领先于基本订阅。充分领先意味着跟踪订阅至少领先于估计频段。这确保了考虑估计带内的任何有界重排序。
  • 跟踪订阅足够接近实时。换句话说,跟踪订阅上没有积压。

在我们已持久保存有关消息的发布和事件时间戳记的元数据之后,我们会尽快确认跟踪订阅中的消息。我们将这种元数据以稀疏的直方图格式存储,以最大程度地减少使用的空间量和持久写入的大小。

最后,我们确保我们有足够的数据来进行合理的水印估计。我们从跟踪订阅中读取了一系列事件时间戳,这些事件时间戳的发布时间戳要比未确认的最古老的基本订阅或估计频段的宽度要新。这样可以确保我们考虑积压中的所有事件时间戳,或者如果积压很小,则考虑最近的估计范围,以进行水印估计。

最后,将水印值计算为该频段中的最短事件时间。该方法是正确的,因为在输入端10秒钟内重新排序限制内的所有时间戳都将由水印考虑,而不显示为最新数据。但是,它可能会产生过于保守的水印,按照第2章中的描述,水印会“过慢”地前进。由于我们在跟踪订阅中考虑了所有消息,而不是基本订阅中最早的未确认消息,因此可以在事件订阅中包括事件时间戳。已确认邮件的水印估计。

此外,还有一些启发式方法可以确保进度。在密集,频繁到达的数据的情况下,此方法效果很好。在数据稀疏或不频繁的情况下,可能没有足够的最新消息来建立合理的估计。如果我们在超过两分钟的时间内没有看到订阅数据(并且没有积压的情况),我们会将水印提前到接近实时的水平。这样可以确保即使再也没有消息出现,水印和管道也会继续取得进展。

所有以上这些确保了只要源数据事件时间戳的重新排序在估计范围之内,就不会有其他延迟数据。

概要

在这一点上,我们已经探索了如何使用消息的事件时间来给出流处理系统中进度的可靠定义。我们看到了这种进步概念如何随后可以帮助我们回答在什么地方进行时间处理以及何时在处理时间实现结果的问题。具体来说,我们研究了如何在源头创建水印,将数据摄取到管道中的点,然后在整个管道中传播,以保留基本的保证,从而保证在何时何地可以回答问题。我们还研究了在水印上更改输出窗口时间戳的含义。最后,我们探索了在大规模构建水印时的一些实际系统注意事项。

既然我们已经对水印在幕后的工作方式有了坚实的基础,那么在第4章中我们将使用窗口化和触发来回答更复杂的查询时,我们可以深入研究水印对我们的作用。