现在,我们从讨论编程模型和API转向实现它们的系统。模型和API允许用户描述他们要计算的内容。实际上,要大规模准确地运行计算,需要一个系统-通常是一个分布式系统。
在本章中,我们重点介绍实现系统如何正确实现Beam模型以产生准确的结果。流系统经常谈论一次精确处理。也就是说,确保每条记录都只处理一次。我们将解释这意味着什么,以及如何实现。
作为一个激励性的示例,本章重点介绍Google Cloud Dataflow使用的有效保证记录一次准确处理的技术。在本章的最后,我们还将研究一些其他流行的流系统使用的技术,以保证仅一次。
为什么完全一次
几乎不用说,对于许多用户而言,在其数据处理管道中丢失记录或丢失数据的任何风险都是无法接受的。即使这样,历史上许多通用流系统也无法保证记录处理-所有处理仅是“尽力而为”。其他系统至少提供一次保证,确保记录始终至少处理一次,但是记录可能会重复(从而导致汇总不准确);实际上,许多这样的至少一次系统在内存中执行聚合,因此当计算机崩溃时,它们的聚合仍可能丢失。这些系统用于低延迟的推测性结果,但通常不能保证这些结果的准确性。
正如第1章所指出的那样,这导致了Lambda体系结构诞生的战略-运行流系统来获得快速但不准确的结果。稍后的某个时间(通常是一天结束后),批处理系统会运行到正确的答案。仅当数据流可重播时,此方法才有效。但是,对于足以证明该策略可行的数据源而言,确实如此。但是,许多尝试过此操作的人在Lambda体系结构中遇到了许多问题:
不准确
用户倾向于低估故障的影响。他们通常认为一小部分记录会丢失或重复(通常基于运行的经验),而在那糟糕的一天,当10%(或更多!)的记录丢失或重复时,就会感到震惊。从某种意义上说,这样的系统只能提供“一半”的保证,而没有完整的保证,则一切皆有可能。
前后矛盾
最终计算所使用的批处理系统通常具有与流系统不同的数据语义。让两条管道产生可比的结果比最初想像的要困难得多。
复杂
根据定义,Lambda要求您编写和维护两个不同的代码库。您还必须运行和维护两个复杂的分布式系统,每个系统具有不同的故障模式。对于最简单的管道之外的任何事情,这很快就会变得不堪重负。
不可预测性
在许多使用情况下,最终用户将看到流式传输结果与每日结果相差不定的数量,该数量可能会随机变化。在这些情况下,用户将不再信任流数据,而是等待每日批处理结果,从而一开始就破坏了获得低延迟结果的价值。
潜伏
一些业务用例需要低延迟的正确结果,而Lambda体系结构并不是设计提供的。
幸运的是,许多Beam跑步者可以做得更好。在本章中,我们将说明一次精确的流处理如何帮助用户依靠准确的结果并避免依赖单个代码库和API的数据丢失风险。由于可能会影响管道输出的各种问题经常被错误地与恰好一次的保证混为一谈,因此当我们在Beam和数据处理的上下文中提到“恰好一次”时,我们首先准确地解释哪些问题在范围之内和之外。
准确性与完整性
每当Beam管道处理管道的记录时,我们要确保记录永远不会丢失或重复。但是,流传输管道的本质是,有时记录会在处理了其时间窗口的汇总之后显示得较晚。Beam SDK允许用户配置系统应等待多长时间才能到达较迟的数据。在此截止日期之后到达的所有(唯一)记录都将被删除。此功能有助于完整性,而不是准确性:及时显示要处理的所有记录都会被准确地处理一次,而这些较晚的记录将被明确删除。
尽管通常在流系统的背景下讨论较晚的记录,但值得注意的是,批处理管道具有相似的完整性问题。例如,常见的批处理范例是在凌晨2点运行前一天所有数据的作业。但是,如果直到凌晨2点才收集到昨天的某些数据,则批处理作业将不会处理这些数据!因此,批处理管道还可以提供准确但并非总是完整的结果。
副作用
Beam and Dataflow的一个特点是用户注入自定义代码,该代码在其流水线图的一部分中执行。数据流不保证该代码每条记录仅运行一次,无论是流传输还是批处理运行器。它可能通过用户转换多次运行给定记录,或者甚至可能同时在多个工作程序上运行同一条记录;这是保证面对工人故障时至少一次处理的必要条件。这些调用中只有一个可以“获胜”并在管道中进一步产生输出。
结果,不能保证非幂等副作用只执行一次。如果您编写的代码在管道外部具有副作用,例如联系外部服务,则对于给定的记录,这些效果可能会执行多次。这种情况通常是不可避免的,因为无法通过原子方式提交数据流的处理而对外部服务产生副作用。管道确实需要最终将结果发送到外界,并且这样的调用可能不是幂等的。正如您将在本章后面看到的那样,此类接收器通常能够添加一个额外的阶段,以首先将调用重组为幂等操作。
问题定义
因此,我们给出了一些我们未在谈论的示例。那么,一次精确处理意味着什么? 为此,我们从示例5-1中所示的简单流传输管道2开始。
示例5-1 一个简单的流媒体管道
Pipeline p = Pipeline.create(options);
// Calculate 1-minute counts of events per user. PCollection<..> perUserCounts =
p.apply(ReadFromUnboundedSource.read())
.apply(new KeyByUser()) .Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))) .apply(Count.perKey());
// Process these per-user counts, and write the output somewhere.
perUserCounts.apply(new ProcessPerUserCountsAndWriteToSink());
// Add up all these per-user counts to get 1-minute counts of all events. perUserCounts.apply(Values.<..>create())
.apply(Count.globally())
.apply(new ProcessGlobalCountAndWriteToSink());
p.run();
该管道计算两个不同的窗口聚合。第一个计算一分钟内每个用户的事件数,第二个计算每分钟有多少事件。两种聚合都将写入未指定的流接收器。
请记住,Dataflow在许多不同的工作线程上并行执行管道。在每个GroupByKey(Count操作使用幕后的GroupByKey)之后,具有相同键的所有记录都在称为shuffle的过程中在同一台计算机上进行处理。数据流工作人员使用远程过程调用(RPC)来在它们之间对数据进行混洗,以确保给定密钥的记录全部都存储在同一台计算机上。
图5-1显示了数据流在示例5-1.3中为管道创建的改组。Count.perKey将每个用户的所有数据改组到给定的worker上,而Count.global将所有这些部分计数改组到单个worker进行计算 全球总和。
图5-1 管道中的随机播放
为了使Dataflow能够准确地处理数据,此重排过程必须确保每个记录都被重排一次。就像您稍后将看到的那样,随机播放的分布式特性使这成为一个具有挑战性的问题。
该管道还可以从外部世界读取数据或向外部世界写入数据,因此数据流必须确保这种交互不会引入任何不准确性。数据流始终支持此任务-只要技术可行,Apache Spark和Apache Flink就会一次端到端地调用源和接收器。
本章的重点将放在三件事上:
随机播放
Dataflow如何保证每条记录仅被随机洗一次。
资料来源
Dataflow如何保证每个源记录仅被处理一次。
水槽
Dataflow如何保证每个接收器产生准确的输出。
确保随机播放一次
如前所述,Dataflow的流式改组使用RPC。现在,只要您有两台通过RPC进行通信的机器,就应该认真思考数据完整性。首先,RPC可能由于多种原因而失败。网络可能中断,RPC可能在完成之前超时,或者接收服务器可能决定呼叫失败。为了确保记录不会在随机播放中丢失,Dataflow采用了上游备份。这仅表示发送方将重试RPC,直到收到肯定的接收确认为止。数据流还确保即使发件人崩溃,它也将继续重试这些RPC。这样可以保证每条记录至少发送一次。
现在,问题在于这些重试本身可能会创建重复项。大多数RPC框架,包括Dataflow所使用的框架,都向发送方提供指示成功或失败的状态。在分布式系统中,您需要注意,即使RPC似乎失败了,它们有时也可能成功。造成这种情况的原因有很多:RPC超时导致的争用情况,即使RPC成功也无法传输服务器的肯定确认等。发送者真正可以信任的唯一状态是成功。
RPC返回失败状态通常表示该调用可能成功或可能不成功。尽管特定的错误代码可以传达明确的故障信息,但许多常见的RPC故障(如Deadline Exceeded)仍然模棱两可。在流式传输洗牌4的情况下,重试真正成功的RPC意味着两次刷新记录!数据流需要某种方式来检测和删除这些重复项。
从总体上讲,用于此任务的算法非常简单(请参见图5-2):每个发送的消息都用唯一的标识符标记。每个接收器都存储一个目录,其中包含所有已被查看和处理的标识符。每次收到记录时,都会在此目录中查找其标识符。如果找到该记录,则将其作为重复删除。由于Dataflow是建立在可伸缩键/值存储之上的,因此该存储用于保存重复数据删除目录。
图5-2 检测随机播放中的重复项
解决确定性
但是,使这种策略在现实世界中起作用需要非常谨慎。一种直接的折衷是,波束模型允许用户代码产生非确定性的输出。这意味着ParDo可以在同一输入记录上执行两次(由于重试),但是在每次重试时却产生不同的输出。理想的行为是这些输出中只有一个会提交到管道中。但是,由于涉及不确定性,因此很难保证两个输出具有相同的确定性ID。更加棘手的是,ParDo可以输出多个记录,因此每个重试都可能产生不同数量的输出!
那么,为什么不简单地要求所有用户处理都具有确定性呢?我们的经验是,在实践中,许多流水线都需要不确定的转换,而且流水线作者常常没有意识到他们编写的代码是不确定的。例如,考虑一个转换,该转换在Cloud Bigtable中查找补充数据,以丰富其输入数据。这是一个不确定的任务,因为外部值可能在两次重试之间发生变化。同样,任何依赖当前时间的代码也是不确定的。我们还看到了需要依赖随机数生成器的转换。即使用户代码是纯粹确定性的,任何允许延迟数据的事件时间聚合也可能具有不确定性的输入。
数据流通过使用检查点来有效地确定性地处理不确定性,从而解决了这个问题。转换中的每个输出及其唯一ID都会被检查点到达稳定的存储,然后再交付到下一个阶段。5随机交付中的任何重试都只是重播已检查过的输出-用户的不确定代码不会运行再次重试。换句话说,用户的代码可以多次运行,但其中只有一次可以“获胜”。此外,Dataflow使用一致的存储,以防止重复将其写入稳定的存储。
性能
为了实现一次准确的随机传送,记录ID的目录存储在每个接收者密钥中。对于到达的每个记录,Dataflow都会查找已经看到的ID目录,以确定该记录是否重复。每个步骤的每个输出都将检查点检查到存储,以确保生成的记录ID稳定。
但是,除非仔细实施,否则此过程将通过大大增加读写次数来大大降低客户的管道性能。因此,为了使一次精确的处理对Dataflow用户而言可行,必须减少I / O,尤其是通过防止每个记录上的I / O来减少。
数据流通过两种关键技术实现了这一目标:图形优化和Bloom过滤器。
图优化
在执行之前,Dataflow服务会在管道图上运行一系列优化。一种这样的优化是融合,其中服务将许多逻辑步骤融合到一个执行阶段。图5-3显示了一些简单的示例。
图5-3 优化示例:融合
所有融合步骤都是作为处理单元运行的,因此无需为每个步骤存储一次精确的数据。在许多情况下,融合将整个图形缩减为几个物理步骤,从而大大减少了所需的数据传输量(并且还节省了状态使用量)。
数据流还通过在将数据发送到主分组操作之前在本地执行部分合并来优化关联和交换合并操作(例如Count和Sum),如图5-4所示。这种方法可以大大减少要传递的消息数,因此也可以减少读写数。
布隆过滤器
前面提到的优化是一般技术,可一次提高副产品的性能。对于严格旨在改善一次处理的优化,我们转向Bloom过滤器。
在正常情况下,大多数到达的记录都不会重复。我们可以利用这一事实通过Bloom过滤器极大地提高性能,Bloom过滤器是紧凑的数据结构,可以快速进行设置成员资格检查。布隆过滤器具有非常有趣的属性:它们可以返回假阳性,但永远不会返回假阴性。如果过滤器说“是,该元素在集合中”,则说明该元素可能在集合中(可以计算出概率)。但是,如果过滤器说元素不在集合中,则肯定不是。此功能非常适合手头的任务。
Dataflow中的实现是这样的:每个工作人员都会对其看到的每个ID都保留一个Bloom过滤器。每当出现新记录ID时,它都会在过滤器中查找它。如果过滤器返回false,则该记录不是重复记录,并且工作程序可以跳过稳定存储中更昂贵的查找。仅当Bloom过滤器返回true时,才需要进行第二次查找,但是只要过滤器的误报率很低,就很少需要执行此步骤。
布隆过滤器会随着时间的流逝而逐渐填满,但是,这种情况下,假阳性率会增加。每当工人重启时,我们还需要通过扫描存储在状态中的ID目录来重新构造此Bloom筛选器。很有帮助的是,Dataflow将系统时间戳记附加到每条记录。6因此,服务没有创建单个Bloom过滤器,而是为每10分钟范围创建一个单独的过滤器。记录到达时,Dataflow会根据系统时间戳查询适当的过滤器。7此步骤可防止Bloom过滤器饱和,因为过滤器会随时间进行垃圾收集,并且还会限制需要在以下位置扫描的数据量启动。
图5-5说明了此过程:记录到达系统并根据它们的到达时间委托给Bloom过滤器。命中第一个过滤器的记录都不是重复的,并且所有目录查找都被过滤。记录r1是第二次传递的,因此需要进行目录查找以确认它确实是重复的。记录r4和r6也是如此。记录r8不是重复的;但是,由于其Bloom过滤器中的错误肯定,将生成目录查找(它将确定r8不是重复的,应进行处理)。
图5-5 一次布隆过滤器
垃圾收集
每个Dataflow工作程序都会永久存储一个已看到的唯一记录ID的目录。由于Dataflow的状态和一致性模型是针对每个密钥的,因此实际上每个密钥都存储了已交付给该密钥的记录的目录。我们无法永远存储这些标识符,否则所有可用存储空间最终都将用完。为了避免该问题,您需要对确认的记录ID进行垃圾回收。
实现此目标的一种策略是,发送方使用严格增加的序列号标记每个记录,以便跟踪仍在飞行中的最早的序列号(对应于未确认的记录传递)。由于早先的所有记录都已被确认,因此可以更早地收集目录中具有较早序列号的所有标识符。
但是,还有更好的选择。如前所述,Dataflow已经使用系统时间戳标记每个记录,该系统时间戳用于存储一次准确的Bloom过滤器。因此,Dataflow不会使用序列号对一次准确的目录进行垃圾收集,而是根据这些系统时间戳计算垃圾收集水印(这是第3章中讨论的处理时间水印)。这种方法的一个好处是,由于该水印是基于在给定阶段等待的物理时间量(与数据水印不同,后者基于自定义事件时间),因此可以直观地了解水印的哪些部分。管道很慢。此元数据是Dataflow WebUI中显示的系统延迟指标的基础。
如果记录到达时带有旧时间戳,并且我们已经对该时间点进行了垃圾收集标识符,会发生什么情况?之所以会发生这种情况,是因为我们将其称为“网络残余”,在这种情况下,旧消息会在网络内部无限期地停留,然后突然出现。嗯,触发垃圾收集的低水位标记直到确认记录传递后才继续前进,因此我们知道该记录已被成功处理。这些网络残留物显然是重复的,将被忽略。
恰好一次在源中
Beam提供了用于将数据读取到Dataflow管道中的源API。9如果处理失败并且需要确保由源产生的每个唯一记录都仅被处理一次,则Dataflow可能会重试从源进行的读取。
对于大多数源,Dataflow透明地处理此过程。这些来源具有确定性。例如,考虑一个从文件中读取数据的源。文件中的记录将始终按确定性顺序排列,并位于确定性字节位置,而不管文件被读取多少次。10文件名和字节位置唯一地标识每条记录,因此服务可以自动为以下记录生成唯一的ID: 每条记录。
提供类似确定性保证的另一个来源是Apache Kafka。每个Kafka主题都分为一组静态分区,并且分区中的记录始终具有确定的顺序。这样的确定性源将在数据流中无缝运行,而不会重复。
但是,并非所有来源都这么简单。例如,数据流管道的一种常见来源是Google Cloud Pub / Sub。发布/订阅是不确定的来源:多个订阅者可以从发布/订阅主题中提取信息,但是哪个订阅者收到给定的消息是不可预测的。如果处理失败,Pub / Sub将重新传递消息,但是消息可能会以不同的顺序传递给与原始处理者不同的工作者。这种不确定性行为意味着数据流需要协助来检测重复项,因为该服务无法确定性地分配在重试后保持稳定的记录ID。(我们将在本章后面深入研究Pub / Sub的更详细的案例研究。)
由于Dataflow无法自动分配记录ID,因此需要非确定性来源来通知系统记录ID应该是什么。Beam的Source API提供了UnboundedReader.getCurrentRecordId11方法。如果源为每个记录提供唯一的ID,并通知Dataflow它需要重复数据删除,则将过滤掉具有相同ID的12条记录。
在某个时候,每个管道都需要将数据输出到外部世界,而接收器就是一个完全做到这一点的转换。请记住,从外部传递数据是一个副作用,并且我们已经提到Dataflow不能保证完全应用一次副作用。那么,接收器如何保证输出恰好交付一次?
最简单的答案是,Beam SDK提供了许多内置接收器。这些接收器经过精心设计,以确保即使多次执行也不会产生重复。尽可能鼓励管道作者使用这些内置接收器之一。
但是,有时内置功能不足,您需要编写自己的内置程序。最好的方法是确保副作用操作是幂等的,因此面对重放时,它是可靠的。但是,副作用DoFn的某些部分通常是不确定的,因此可能在重放时发生变化。例如,在窗口汇总中,窗口中的记录集也可以是不确定的!
具体来说,窗口可能会尝试使用元素e0,e1,e2触发,但是工作程序在提交窗口处理之前崩溃(但在发送这些元素作为副作用之前没有崩溃)。当工作程序重新启动时,该窗口将再次触发,但是现在显示一个较晚的元素e3。由于此元素在提交窗口之前显示,因此不算作最新数据,因此将使用元素e0,e1,e2,e3再次调用DoFn。然后将它们发送到副作用操作。幂等性在这里无济于事,因为每次都发送不同的逻辑记录集。
还有其他方法可以引入不确定性。解决此风险的标准方法是依靠以下事实:Dataflow当前保证DoFn输出的只有一个版本可以使其经过混洗边界。
使用此保证的一种简单方法是通过内置的Reshuffle转换。例5-2中显示的模式可确保副作用操作始终收到确定的记录以进行输出。
示例5-2改组示例
c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1)))) .apply(GroupByKey.<..>.create())
.apply(new PrepareOutputData())
.apply(Reshuffle.<..>of())
.apply(WriteToSideEffect());
前面的管道将接收器分为两个步骤:PrepareOutputData和Write ToSideEffect。PrepareOutputData输出与幂等写入相对应的记录。如果我们简单地一个接一个地运行,则整个过程可能会在失败时重播,PrepareOutputData可能会产生不同的结果,并且两者都会产生副作用。当我们在两者之间添加Reshuffle时,Dataflow保证不会发生这种情况。
当然,Dataflow可能仍会多次运行WriteToSideEffect操作。副作用本身仍然需要是幂等的,否则接收器会出现重复。例如,设置或覆盖数据存储区中的值的操作是等价的,即使多次运行,该操作也会生成正确的输出。附加到列表的操作不是幂等的;如果该操作多次运行,则每次都会附加相同的值。
尽管Reshuffle提供了一种简单的方法来实现对DoFn的稳定输入,但是GroupBy Key也可以正常工作。但是,目前有一项提议消除了添加GroupByKey来实现向DoFn稳定输入的需求。相反,用户可以使用特殊注释@RequiresStableInput注释WriteToSideEffect,然后系统将确保对该转换的稳定输入。
用例
为了说明这一点,让我们检查一些内置的源和接收器,以了解它们如何实现上述模式。
示例来源:Cloud Pub / Sub
Cloud Pub / Sub是一个完全托管,可扩展,可靠且低延迟的系统,用于将消息从发布者分发到订阅者。发布者发布有关命名主题的数据,订阅者创建命名订阅以从这些主题中提取数据。可以为单个主题创建多个订阅,在这种情况下,每个订阅都会收到该订阅创建后在该主题上发布的所有数据的完整副本。发布/订阅保证记录将继续传递,直到被确认为止;但是,一条记录可能会多次发送。
发布/订阅旨在供分布式使用,因此许多发布过程可以发布到同一主题,许多订阅过程可以从同一订阅中提取。提取记录后,订户必须在一定时间内确认该记录,否则该提取将过期,Pub / Sub将把该记录重新传递给另一个订阅过程。
尽管这些特性使Pub / Sub具有高度的可伸缩性,但它们也使它成为诸如Dataflow之类的系统的具有挑战性的来源。不可能知道哪些记录将以什么顺序传递给哪个工人。此外,在失败的情况下,重新交付可能会将记录按不同的顺序发送给不同的工人!
发布/订阅会为每条消息提供一个稳定的消息ID,并且在重新交付时此ID相同。数据流发布/订阅源将默认使用此ID从发布/订阅中删除重复项。(根据ID的哈希值对记录进行混洗,因此重复的交付始终在同一工作人员上进行。)但是,在某些情况下,这还不够。用户的发布过程可能会重试发布,结果将重复项引入“发布/订阅”中。从该服务的角度来看,这些是唯一的记录,因此它们将获得唯一的记录ID。数据流的发布/订阅源允许用户提供自己的记录ID作为自定义属性。只要发布者在重试时发送相同的ID,Dataflow将能够检测到这些重复项。
Beam(以及因此的Dataflow)为Pub / Sub提供了参考源实现。但是,请记住,这不是Dataflow使用的,而是仅由非Dataflow运行程序(例如Apache Spark,Apache Flink和DirectRunner)使用的实现。由于各种原因,Dataflow在内部处理Pub / Sub,并且不使用公共Pub / Sub源。
接收器示例:文件
流媒体运行器可以使用Beam的文件接收器(TextIO,AvroIO以及任何其他实现FileBasedSink的接收器)将记录连续输出到文件。例5-3提供了一个示例用例。
示例5-3 窗口文件写入
c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1)))) .apply(TextIO.writeStrings().to(new MyNamePolicy()).withWindowedWrites());
例5-3中的代码段每分钟写入10个新文件,其中包含该窗口中的数据。MyNamePolicy是一个用户编写的函数,它根据分片和窗口确定输出文件名。您还可以使用触发器,在这种情况下,每个触发器窗格都将作为新文件输出。
使用示例5-3中模式的变体实现此过程。文件被写到临时位置,这些临时文件名通过GroupByKey发送到后续转换。在GroupByKey之后是完成转换,该转换将原子临时文件移动到其最终位置。例5-4中的伪代码概述了如何在Beam中实现一致的流文件接收器。(有关更多详细信息,请参见Beam代码库中的FileBasedSink和WriteFiles。)
示例5-4 文件接收器
c
// Tag each record with a random shard id.
.apply("AttachShard", WithKeys.of(new RandomShardingKey(getNumShards())))
// Group all records with the same shard.
.apply("GroupByShard", GroupByKey.<..>())
// For each window, write per-shard elements to a temporary file. This is the
// non-deterministic side effect. If this DoFn is executed multiple times, it will // simply write multiple temporary files; only one of these will pass on through // to the Finalize stage.
.apply("WriteTempFile", ParDo.of(new DoFn<..> { @ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) { // Write the contents of c.element() to a temporary file.
// User-provided name policy used to generate a final filename.
c.output(new FileResult()). }
}))
// Group the list of files onto a singleton key.
.apply("AttachSingletonKey", WithKeys.<..>of((Void)null)) .apply("FinalizeGroupByKey", GroupByKey.<..>create())
// Finalize the files by atomically renaming them. This operation is idempotent. // Once this DoFn has executed once for a given FileResult, the temporary file
// is gone, so any further executions will have no effect.
.apply("Finalize", ParDo.of(new DoFn<..>, Void> { @ProcessElement
public void processElement(ProcessContext c) { for (FileResult result : c.element()) {
rename(result.getTemporaryFileName(), result.getFinalFilename()); }
}}));
您可以在WriteTempFile中查看非幂等工作的完成方式。Group ByKey完成后,“完成”步骤将始终在重试中看到相同的捆绑包。因为文件重命名是幂等的14,所以这给我们提供了一次精确的接收器。
接收器示例:Google BigQuery
Google BigQuery是完全托管的云原生数据仓库。Beam提供了一个BigQuery接收器,BigQuery提供了一种流插入插件,该API支持极低延迟的插入。此流插入API允许您使用唯一ID标记插入,BigQuery会尝试过滤具有相同ID的重复插入。15要使用此功能,BigQuery接收器必须为每个记录生成统计上唯一的ID。它通过使用java.util.UUID软件包来完成此操作,该软件包会生成统计上唯一的128位ID。
生成随机的通用唯一标识符(UUID)是不确定的操作,因此在插入BigQuery之前,必须添加Reshuffle。完成此操作后,Dataflow进行的任何重试将始终使用改组后的相同UUID。重复插入BigQuery的尝试将始终具有相同的插入ID,因此BigQuery可以对其进行过滤。例5-5中显示的伪代码说明了如何实现BigQuery接收器。
示例5-5 BigQuery接收器
c
.apply(new DoFn<> {
@ProcessElement
public void processElement(ProcessContext context) {
String uniqueId = UUID.randomUUID().toString(); context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
new RecordWithId(context.element(), uniqueId)));
}
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication. .apply(ParDo.of(new DoFn<..> {
@ProcessElement
public void processElement(ProcessContext context) { insertIntoBigQuery(context.element().record(), context.element.id());
} });
再次,我们将接收器分为非幂等步骤(生成随机数),然后是幂等步骤。
其他系统
现在,我们只详细解释了Dataflow,让我们将其与其他流行流系统的一些简要概述进行对比。每一个都以不同的方式实施一次精确的保证,并因此做出不同的权衡。
Apache Spark流
Spark Streaming使用微批处理架构进行连续数据处理。用户在逻辑上处理流对象;但是,在幕后,Spark将此流表示为连续的RDD系列。16每个RDD都作为批处理,Spark依赖于批处理的恰好一次性质来确保正确性。如前所述,正确批处理混洗的技术已经有一段时间了。这种方法可能会导致输出延迟增加(尤其是对于深管道和高输入量而言),并且通常需要仔细调整才能达到所需的延迟。
Spark确实假设操作都是幂等的,并且可能在图形的当前点重播操作链。但是,提供了一个检查点原语,该原语使RDD得以实现,从而保证了不会重播该RDD之前的历史记录。此检查点功能是出于性能方面的考虑(例如,防止重播昂贵的操作);但是,您也可以使用它来实现非幂等副作用。
Apache Flink
Apache Flink还为流管道提供一次精确的处理,但是这样做的方式不同于Dataflow或Spark。Flink流传输管道会定期计算一致的快照,每个快照代表整个管道的一致的时间点状态。Flink快照是逐步计算的,因此无需在计算快照时暂停所有处理。这允许记录在拍摄快照的同时继续流经系统,从而减轻了Spark Streaming方法的某些延迟问题。
Flink通过将特殊编号的快照标记插入到来自源的数据流中来实现这些快照。当每个操作员接收到快照标记时,它会执行特定的算法,使其能够将其状态复制到外部位置,并将快照标记传播给下游操作员。所有操作员执行完此快照算法后,便会提供完整的快照。任何工作程序故障都将导致整个管道从最后一个完整的快照回滚其状态。飞行中的消息不需要包含在快照中。Flink中的所有消息传递都是通过基于TCP的有序通道完成的。可以通过从最后一个正确的序列号恢复连接来处理任何连接失败;17与Dataflow不同,Flink任务是静态分配给工作程序的,因此可以假定该连接将从同一发送方恢复并重播相同的有效负载。
因为Flink可能随时回滚到以前的快照,所以尚未在快照中进行的任何状态修改都必须视为临时的。将数据发送到Flink管道之外的世界的接收器必须等待快照完成,然后才发送该快照中包含的数据。Flink提供了notifySnap shotComplete回调,它使接收器可以知道每个快照何时完成,并向前发送数据。即使这确实影响了Flink管道的输出延迟,18该延迟也仅在接收器处引入。实际上,对于深层流水线,这使得Flink的端到端等待时间比Spark短,因为Spark在流水线的每个阶段都引入了批处理延迟。
Flink的分布式快照是一种处理流传输管道一致性的绝妙方法。但是,对管道进行了许多假设。假定故障很少发生,19因为故障的影响(回滚到先前的快照)是巨大的。为了保持低延迟输出,还假定快照可以快速完成。这是否会在非常大的群集上引起问题,故障率可能会增加,以及完成快照所需的时间也会增加,尚待观察。
通过假定任务是静态分配给工作人员的(至少在单个快照时期内),还简化了实现。这种假设使Flink可以在工作程序之间提供一次简单的精确传输,因为它知道如果连接失败,可以从同一工作程序中按顺序提取相同的数据。相反,Dataflow中的任务在工作人员之间不断地进行负载平衡(并且工作人员组在不断增长和缩小),因此Dataflow无法做出这种假设。这迫使Dataflow实施更为复杂的传输层,以提供一次精确的处理。
概况
总之,曾经被认为与低延迟结果不兼容的一次精确数据处理很有可能— Dataflow在不牺牲延迟的情况下高效地进行了处理。这样可以实现更丰富的流处理用途。
尽管本章重点介绍特定于数据流的技术,但其他流系统也提供了一次保证。Apache Spark Streaming依赖一系列Spark批处理运行程序中的一次保证来将流水线管道作为一系列小批处理作业运行。Apache Flink对Chandy Lamport分布式快照使用变体以获取运行一致状态,并可以使用这些快照来确保一次处理。我们鼓励您也了解这些其他系统,以广泛了解不同的流处理系统如何工作!