让我们谈谈SQL。在本章中,我们将以最简单的方式开始,在时间上稍作回溯以建立更多的上下文,最后回到未来,用优美的弓箭将一切包裹起来。想象一下昆汀·塔伦蒂诺(Quentin Tarantino)拥有计算机科学学位,并且极力向世人介绍流SQL的优点,因此他愿意与我一起撰写本章。有点像这样减去暴力。

什么是流式SQL?

我认为,这个问题的答案已经使我们的行业迷惑了数十年。公平地说,很长一段时间以来,数据库社区已经了解了99%的答案。但是我还没有看到流式SQL的真正有说服力和全面的定义,它涵盖了健壮的流式语义的全部范围。这就是我们将尝试在此处提出的内容,尽管假设我们现在已经100%做到这一点是一种骄傲。也许是99.1%?宝贝的步骤。

无论如何,我想在前面指出,在撰写本文时,我们将在本章中讨论的大多数内容仍纯粹是假设的。本章及其后的一章(介绍流式连接)都描述了关于流式SQL可能是一种理想主义的观点。一些部件已经在诸如Apache Calcite,Apache Flink和Apache Beam的系统中实现。许多其他地方都没有实现。在此过程中,我将尝试列举出确实存在的一些事物,但是鉴于目标是不断变化的,所以最好的选择就是简单地咨询您所关注的特定系统的文档。

需要注意的是,这里提出的流式SQL的愿景是Calcite,Flink和Beam社区之间进行协作讨论的结果,这也是值得一提的。Calcite的首席开发人员朱利安·海德(Julian Hyde)长期以来对流式SQL的外观提出了自己的见解。在2016年,Flink社区的成员将Calcite SQL支持集成到Flink本身,并开始向Calcite SQL方言添加特定于流的功能,例如窗口构造。然后,在2017年,所有三个社区开始进行讨论,以试图就Calcite SQL中的健壮流处理的语言扩展和语义看起来应该达成一致。本章试图将讨论中的思想精炼为一个清晰而有凝聚力的叙述,内容涉及将流概念集成到SQL中,而不管它是方解石还是其他方言。

关系代数

在讨论流对SQL意味着什么时,请务必牢记SQL的理论基础:关系代数。关系代数只是描述由命名,类型化的元组组成的数据之间的关系的一种数学方法。关系代数的核心是关系本身,它是这些元组的集合。用经典的数据库术语来说,关系类似于表,它可以是物理数据库表,SQL查询的结果,视图(实例化或其他),等等。它是一组包含命名和类型的数据列的行。

关系代数的更关键方面之一是它的闭包特性:将任何运算符从关系代数应用于任何有效的关系1都会产生另一个关系。换句话说,关系是关系代数的通用货币,所有运算符都将它们当作输入使用,并将它们作为输出生成。

从历史上看,许多在SQL中支持流式传输的尝试都未能满足闭包属性。它们将流与经典关系分开对待,提供新的运算符在两者之间进行转换,并限制可以应用于一个或另一个的运算符。这极大地提高了任何此类流式SQL系统的采用标准:潜在的用户必须学习新的运算符,并了解他们适用的地方,不适用的地方,并类似地重新学习该规则中的适用性规则。任何老运营商的新世界。更糟糕的是,这些系统中的大多数仍然不能提供我们想要的全套流语义,例如,对健壮的无序处理的支持和对强大的时间联接的支持(我们将在第9章中介绍后者)。因此,我想指出,基本上不可能列举任何已经被广泛采用的现有流SQL实现。这种流式SQL系统的额外认知开销和受限功能确保了它们仍然是利基企业。

为了改变这一点,以真正将流式SQL推到最前沿,我们需要的是一种使流成为关系代数本身内的一流公民的方法,从而使整个标准关系代数可以自然地应用于流式和非流式使用中。案件。这并不是说流和表应该被视为完全相同的事物; 它们绝对是不一样的,并且认识到事实有助于清晰地理解和推动流/表关系的导航,我们将很快看到。但是核心代数应该干净自然地适用于两个世界,只有在绝对必要的情况下,才能以最小的扩展超出标准的关系代数。

时变关系

顺带一提,我在本章开头提到的重点是:将流自然集成到SQL中的关键是扩展关系(关系代数的核心数据对象)来表示一段时间内的一组数据,而不是而不是特定时间点的一组数据。更简洁地说,我们需要时变关系,而不是时间点关系。

但是什么是时变关系?让我们首先根据经典的关系代数来定义它们,然后再考虑它们与流理论和表理论的关系。
就关系代数而言,时变关系实际上只是经典关系随时间的演变。要理解我的意思,请想象一个由用户事件组成的原始数据集。随着时间的流逝,随着用户生成新事件,数据集将继续增长和发展。如果您在特定的时间点观察到该集合,那就是经典的关系。但是,如果您观察场景随着时间的整体发展,那是一个时变的关系。

换句话说,如果经典关系就像是二维表,由x轴上的已命名,键入的列和y轴上的记录的行组成,则时变关系就像具有x和y轴的三维表像以前一样,但是随着时间的推移,另外一个z轴捕获了二维表的不同版本。随着关系的更改,关系的新快照将添加到z维度中。

让我们看一个例子。想象一下我们的原始数据集是用户和分数。例如,与整本书中其他大多数示例一样,移动游戏的每用户得分。并假设此处的示例数据集最终在特定时间点(在这种情况下为12:07)观察时最终看起来像这样:

12:07> SELECT * FROM UserScores;
—————————
| Name | Score | Time |
—————————
| Julie | 7 | 12:01 |
| Frank | 3 | 12:03 |
| Julie | 1 | 12:03 |
| Julie | 4 | 12:07 |
—————————

换句话说,它记录了四个分数随时间推移的到来:12:01时朱莉的分数为7,12:03时弗兰克的分数为3,朱莉的第二分数为1,最后12点朱莉的第三分数为4。:07(请注意,“时间”列此处包含表示系统中记录到达时间的处理时间时间戳;稍后我们将介绍事件时间时间戳)。假设这些是为此关系获得的唯一数据,那么我们在12:07之后的任何时间观察到的数据都看起来像上表。但是,如果相反,我们在12:01观察到了这种关系,那么它将看起来像下面的样子,因为到那时为止,只有朱莉的第一个分数会到达:

12:01> SELECT * FROM UserScores;
—————————
| Name | Score | Time |
—————————
| Julie | 7 | 12:01 |
—————————

如果我们在12:03再次观察到它,弗兰克的分数和朱莉的第二个分数也将到达,那么这种关系将演变为:

12:01> SELECT * FROM UserScores;
—————————
| Name | Score | Time |
—————————
| Julie | 7 | 12:01 |
| Frank | 3 | 12:03 |
| Julie | 1 | 12:03 |
—————————

从这个例子中,我们可以开始理解该数据集的时变关系必须是什么样子:它将捕获该关系随时间的整个演变。因此,如果我们在12:07或之后观察到时变关系(或TVR),则它看起来像以下内容(请注意,使用假设的TVR关键字来表示我们希望查询返回完整的时变关系,而不是经典关系的标准时间点快照):

12:07> SELECT TVR * FROM UserScores;
——————————————————————————-
| [-inf, 12:01) | [12:01, 12:03) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time | |
| ————————————- | ————————————- |
| | | | | | | Julie | 7 | 12:01 | |
| | | | | | | | | | |
| | | | | | | | | | |
| | | | | | | | | | |
——————————————————————————-
| [12:03, 12:07) | [12:07, now) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time |
| ————————————- | ————————————- |
| | Julie | 7 | 12:01 | | | Julie | 7 | 12:01 | |
| | Frank | 3 | 12:03 | | | Frank | 3 | 12:03 | |
| | Julie | 1 | 12:03 | | | Julie | 1 | 12:03 | |
| | | | | Julie | 4 | 12:07 | |
| ————————————- | ————————————- |
———————————————————————————

由于印刷/数字页面仍然限于二维,因此我自由地将第三维展平为二维关系的网格。但是您可以看到时变关系本质上是如何由一系列经典关系(从左到右,从上到下排序)组成的,每个经典关系在特定的时间范围内捕获了关系的完整状态(根据定义,所有这些关系,是连续的)。

以这种方式定义时变关系的重要之处在于,就所有意图和目的而言,它们实际上只是一系列经典关系,每个经典关系在各自独立的(但相邻)时间范围内独立存在,每个范围捕获一个周期。关系没有改变的时间。这很重要,因为这意味着将关系运算符应用于随时间变化的关系等同于将该运算符分别应用于对应序列中的每个经典关系。再往前走一步,将关系运算符单独应用于一系列关系的结果(每个关系都与一个时间间隔相关联)将始终产生具有相同时间间隔的对应关系序列。换句话说,结果是相应的时变关系。此定义为我们提供了两个非常重要的属性:

  • 经典关系代数中的所有运算符集在应用于时变关系时仍然有效,并且继续按照您期望的那样运行。
  • 关系代数的闭合性质在应用于时变关系时仍保持不变。

或更简洁地说,当应用于时变关系时,经典关系代数的所有规则都将继续存在。这是巨大的,因为这意味着我们将时变关系替换为经典关系并没有以任何方式改变游戏的参数。一切继续以其在经典关系领域的工作方式进行,只是在经典关系的序列上而不是在单例上。回到我们的示例,考虑原始数据集上另外两个随时间变化的关系,它们都在12:07之后的某个时间观察到。首先使用WHERE子句进行简单的过滤关系:

12:07> SELECT TVR * FROM UserScores WHERE name = “Julie”;
——————————————————————————-
| [-inf, 12:01) | [12:01, 12:03) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time | |
| ————————————- | ————————————- |
| | | | | | | Julie | 7 | 12:01 | |
| | | | | | | | | | |
| | | | | | | | | | |
——————————————————————————-
| [12:03, 12:07) | [12:07, now) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time |
| ————————————- | ————————————- |
| | Julie | 7 | 12:01 | | | Julie | 7 | 12:01 | |
| | Julie | 1 | 12:03 | | | Julie | 1 | 12:03 | |
| | | | | Julie | 4 | 12:07 | |
| ————————————- | ————————————- |
———————————————————————————

如您所料,这种关系看起来很像前面的关系,但是弗兰克的分数被过滤掉了。即使时变关系捕获了记录该数据集随时间变化所必需的额外时间维度,但鉴于您对SQL的理解,查询的行为与您期望的完全一样。
对于稍微复杂一点的事情,让我们考虑一个分组关系,在该关系中,我们对所有每位用户得分进行汇总,以生成每个用户的总总体得分:

12:07> SELECT TVR Name, SUM(Score) as Total, MAX(Time) as Time
FROM UserScores GROUP BY Name;
——————————————————————————-
| [-inf, 12:01) | [12:01, 12:03) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time | |
| ————————————- | ————————————- |
| | | | | | | Julie | 7 | 12:01 | |
| | | | | | | | | | |
——————————————————————————-
| [12:03, 12:07) | [12:07, now) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time |
| ————————————- | ————————————- |
| | Julie | 8 | 12:03 | | | Julie | 12 | 12:07 | |
| | Frank | 3 | 12:03 | | | Frank | 3 | 12:03 | |
| ————————————- | ————————————- |
———————————————————————————

同样,此查询的时变版本的行为与您期望的完全一样,序列中的每个经典关系仅包含每个用户的分数总和。实际上,无论我们选择何种查询复杂,结果始终与将查询独立应用于构成输入时变关系的公关经典关系相同。我不能太强调这有多重要!

好的,这很好,但是时变关系本身更多是理论构造,而不是数据的实际物理表现。很容易看出它们如何变得庞大而笨拙,以应对频繁变化的大型数据集。为了了解它们实际上如何与现实世界中的流处理相关联,现在让我们探索时变关系与流和表理论之间的关系。

流和表

为了进行比较,让我们再次考虑一下我们前面讨论的分组时变关系:

12:07> SELECT TVR Name, SUM(Score) as Total, MAX(Time) as Time
FROM UserScores GROUP BY Name;
——————————————————————————-
| [-inf, 12:01) | [12:01, 12:03) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time | |
| ————————————- | ————————————- |
| | | | | | | Julie | 7 | 12:01 | |
| | | | | | | | | | |
——————————————————————————-
| [12:03, 12:07) | [12:07, now) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time |
| ————————————- | ————————————- |
| | Julie | 8 | 12:03 | | | Julie | 12 | 12:07 | |
| | Frank | 3 | 12:03 | | | Frank | 3 | 12:03 | |
| ————————————- | ————————————- |
———————————————————————————

我们知道,此序列可以记录一段时间内关系的完整历史记录。鉴于我们对第6章中的表和流有理解,因此了解时变关系与流和表理论之间的关系并不难。

表非常简单:因为时变关系本质上是经典关系的序列(每个关系都在特定的时间点捕获关系的快照),并且经典关系类似于表,将时变关系视为 该表仅生成观察时间的时间点关系快照。

例如,如果我们在12:01的表中观察到以前的分组时变关系,则会得到以下内容(请注意,使用另一个假设关键字TABLE明确表明了我们对查询的期望 返回表格):

12:01> SELECT TABLE Name, SUM(Score) as Total, MAX(Time) as Time
FROM UserScores GROUP BY Name;
————————-
| Name | Total | Time |
————————-
| Julie | 7 | 12:01 |
————————-

而在12:07观察将产生预期的结果:

12:07> SELECT TABLE Name, SUM(Score) as Total, MAX(Time) as Time
FROM UserScores GROUP BY Name;
————————-
| Name | Total | Time |
————————-
| Julie | 12 | 12:07 |
| Frank | 3 | 12:03 |
————————-

在这里特别有趣的是,即使在今天,实际上在SQL中仍然支持时变关系的想法。SQL 2011标准提供了“时间表”,该表存储了表随时间的版本历史(本质上是随时间变化的关系),以及允许您显式查询和接收时间快照的AS OF SYSTEM TIME构造。您指定的任何时间点的表/时变关系。例如,即使我们在12:07执行查询,我们仍然可以看到12:03时的关系:

12:07> SELECT TABLE Name, SUM(Score) as Total, MAX(Time) as Time
FROM UserScores GROUP BY Name AS OF SYSTEM TIME ‘12:03’;
————————-
| Name | Total | Time |
————————-
| Julie | 8 | 12:03 |
| Frank | 3 | 12:03 |
————————-

因此,SQL中存在一些时变关系的先例。但是我离题了。这里的要点是,表捕获了特定时间点的时变关系的快照。在我们观察到的大多数现实表实现中,它们只是简单地跟踪实时。其他人则保留了一些额外的历史信息,在一定程度上,这相当于捕获了一段时间内整个关系的全保真时变关系。

溪流是略有不同的野兽。我们在第6章了解到,它们也捕获了表随时间的变化。但是它们的作用与迄今为止我们观察到的时变关系有所不同。它们不是捕获每次更改的整个关系的快照,而是捕获随时间变化的关系中导致这些快照的更改序列。举个例子,这里的细微差别变得更加明显。

作为回顾,请再次回顾我们的基线示例TVR查询:

12:07> SELECT TVR Name, SUM(Score) as Total, MAX(Time) as Time
FROM UserScores GROUP BY Name;
——————————————————————————-
| [-inf, 12:01) | [12:01, 12:03) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time | |
| ————————————- | ————————————- |
| | | | | | | Julie | 7 | 12:01 | |
| | | | | | | | | | |
——————————————————————————-
| [12:03, 12:07) | [12:07, now) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time |
| ————————————- | ————————————- |
| | Julie | 8 | 12:03 | | | Julie | 12 | 12:07 | |
| | Frank | 3 | 12:03 | | | Frank | 3 | 12:03 | |
| ————————————- | ————————————- |
———————————————————————————

现在,让我们观察一下随时间变化的关系,因为它存在于几个不同的时间点。在此过程的每个步骤中,我们都将比较TVR在该时间点的原始表格渲染与该点之前的流的演变。要查看时变关系的流呈现效果,我们需要引入两个新的假设关键字:

  • STREAM关键字,类似于我已经介绍的TABLE关键字,它表示我们希望查询返回一个逐事件流,以捕获随时间变化的关系随时间的变化。您可以认为这是随着时间的推移将每条记录触发器应用于该关系。
  • 可以从STREAM查询中引用的特殊Sys.Undo3列,以便标识缩进的行。稍后对此进行更多讨论。

因此,从12:01开始,我们将获得以下内容:

12:07> SELECT TABLE Name, SUM(Score) as Total, MAX(Time) as Time FROM UserScores GROUP BY Name;
————————-
| Name | Total | Time |
————————-
| Julie | 7 | 12:01 |
————————-
12:07> SELECT STREAM Name, SUM(Score) as Total, MAX(Time) as Time, Sys.Undo as Undo FROM UserScores GROUP BY Name;
————————————
| Name | Total | Time | Undo|
————————————
| Julie | 7 | 12:01 | |
……………. [12:01, 12:01] …………

此时,表和流的渲染看起来几乎相同。修改“撤消”列(在下一个示例中进行更详细的讨论),只有一个区别:而表格版本是从12:01开始完成的(由虚线的最后一行表示,关闭了关系的最底端), 流版本仍然不完整,如最后一个类似省略号的句点所示,它标志着关系的尾巴(将来可能会出现其他数据)以及迄今为止观察到的数据的处理时间范围。确实,如果在实际实现中执行,STREAM查询将无限期地等待其他数据到达。因此,如果我们等到12:03,将为STREAM查询显示三个新行。将其与12:03处TVR的新TABLE渲染进行比较:

12:07> SELECT TABLE Name,
SUM(Score) as Total,
MAX(Time) as Time
FROM UserScores GROUP BY Name;
————————-
| Name | Total | Time |
————————-
| Julie | 8 | 12:03 |
| Frank | 3 | 12:03 |
————————-

12:01> SELECT STREAM Name,
SUM(Score) as Total,
MAX(Time) as Time,
Sys.Undo as Undo
FROM UserScores GROUP BY Name;
————————————
| Name | Total | Time | Undo|
————————————
| Julie | 7 | 12:01 | |
| Frank | 3 | 12:03 | |
| Julie | 7 | 12:03 | undo |
| Julie | 8 | 12:03 | |
……………. [12:01, 12:03] …………

这里有一个值得解决的有趣问题:当原始数据集在该时间段内仅包含两行(弗兰克的3和朱莉的1)时,为什么流中会有三个新行(弗兰克的3和朱莉的undo-7和8)? 答案在于,在这里,我们正在观察变化的流向原始输入的汇总; 特别是在12:01到12:03的时间段内,信息流需要捕获有关由于新的1值到来而引起的Julie总得分变化的两条重要信息:

  • 先前报告的总数为7个不正确。
  • 新的总数是8。

这就是特殊的Sys.Undo列允许我们做的:区分正常行和撤回先前报告的值的行。

STREAM查询的一个特别好的功能是,您可以开始了解所有这些与经典在线事务处理(OLTP)表的关系:该查询的STREAM渲染本质上是捕获您所执行的一系列INSERT和DELETE操作的 可以用来在OLTP世界中随着时间的推移实现这种关系(实际上,当您考虑它时,OLTP表本身实际上是通过INSERT,UPDATE和DELETE的流随时间变化的时变关系)。

现在,如果我们不在乎信息流中的缩回,也可以不要求它们也很好。在这种情况下,我们的STREAM查询将如下所示:
12:07> SELECT TABLE Name,
SUM(Score) as Total,
MAX(Time) as Time
FROM UserScores GROUP BY Name;
————————————-
| Name | Total | Time |
————————————-
| Julie | 12 | 12:07 |
| Frank | 3 | 12:03 |
————————————-

12:01> SELECT STREAM Name,
SUM(Score) as Total,
MAX(Time) as Time,
Sys.Undo as Undo
FROM UserScores GROUP BY Name;
————————————
| Name | Total | Time | Undo|
————————————
| Julie | 7 | 12:01 | |
| Frank | 3 | 12:03 | |
| Julie | 7 | 12:03 | undo |
| Julie | 8 | 12:03 | |
| Julie | 8 | 12:07 | undo |
| Julie | 12 | 12:07 | |
……………. [12:01, 12:07] …………

到了此时,我们很清楚,时变关系的STREAM版本与表版本完全不同:表捕获特定时间点整个关系的快照,而流捕获视图 5有趣的是,这意味着STREAM渲染与我们原始的基于表格的TVR渲染有更多共同点:

12:07> SELECT TVR Name, SUM(Score) as Total, MAX(Time) as Time
FROM UserScores GROUP BY Name;
——————————————————————————-
| [-inf, 12:01) | [12:01, 12:03) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time | |
| ————————————- | ————————————- |
| | | | | | | Julie | 7 | 12:01 | |
| | | | | | | | | | |
——————————————————————————-
| [12:03, 12:07) | [12:07, now) |
| ————————————- | ————————————- |
| | Name | Score | Time | | | Name | Score | Time |
| ————————————- | ————————————- |
| | Julie | 8 | 12:03 | | | Julie | 12 | 12:07 | |
| | Frank | 3 | 12:03 | | | Frank | 3 | 12:03 | |
| ————————————- | ————————————- |
———————————————————————————

确实,可以肯定地说,STREAM查询只是提供了相应的基于表的TVR查询中存在的整个数据历史的替代呈现。STREAM渲染的价值在于其简洁性:它仅捕获TVR中每个时间点关系快照之间的变化增量。表格序列TVR呈现的价值在于它提供的清晰度:它以突出其与经典关系的自然关系的格式捕获了该关系随时间的演变,并且这样做提供了简单明了的定义。流上下文中的关系语义,以及流带来的额外时间维度。

STREAM和基于表的TVR渲染之间相似性的另一个重要方面是,它们在其编码的总体数据中基本上是相同的。这已经成为支持者长期宣扬的流/表二元性的核心:流和表6实际上只是同一枚硬币的两个不同方面。或为了重现第6章中的不良物理比喻,流和表是随时间变化的关系,而波和粒子是要照亮的:7完整的时变关系既是表又是流。表和流只是上下文的不同物理体现,具体取决于上下文。

现在,请务必记住,只有两个版本都编码相同的信息,此流/表对偶才是正确的;也就是说,当您拥有全保真表或流时。但是,在许多情况下,完全保真是不切实际的。正如我之前提到的,对时变关系的完整历史进行编码(无论是流形式还是表形式)对于大型数据源而言可能都是相当昂贵的。TVR的流和表格表现在某种程度上是有损的,这很常见。表格通常仅编码TVR的最新版本;支持时间或版本访问的应用程序通常将编码的历史记录压缩到特定的时间点快照和/或早于某个阈值的垃圾收集版本。类似地,流通常仅编码TVR演化的有限持续时间,通常是该历史的相对较新的部分。像Kafka这样的持久流提供了对整个TVR进行编码的能力,但这又是相对不常见的,因为早于某个阈值的数据通常会通过垃圾收集过程被丢弃。

这里的要点是流和表绝对是彼此的对偶,每一个都是编码时变关系的有效方式。但是实际上,TVR的物理流/表格表现在某种程度上是有损的,这很常见。这些部分逼真度的流和表会权衡总编码信息的减少,以获得某些好处,通常是减少资源成本。这些类型的权衡非常重要,因为它们通常使我们能够构建可在真正大规模数据源上运行的管道。但是它们也会使事情变得复杂,并且需要更深入的了解才能正确使用。稍后,当我们使用SQL语言扩展时,我们将更详细地讨论该主题。但是,在我们尝试对SQL扩展进行推理之前,有一点更具体地理解当今常见的SQL和非SQL数据处理方法中存在的偏差将很有用。

向后看:流和表偏向

在许多方面,为SQL添加强大的流支持的行为很大程度上是尝试将Beam Model的位置,时间和方式语义与经典SQL模型的what语义合并的一种做法。但是要干净地做到这一点,并且要保持经典SQL的外观和真实感不变,就需要了解这两种模型之间的相互关系。因此,在第6章中探讨了Beam模型与流和表理论的关系时,我们现在将使用流和表理论作为比较的基础框架,探索Beam模型与经典SQL模型的关系。。这样做时,我们将揭示每个模型中存在的内在偏差,这将为我们提供一些见识,使他们可以更好地以一种干净自然的方式将两者完美结合。

梁模型:基于流的方法

让我们从第6章的讨论开始,建立Beam模型。首先,我要讨论Beam模型中相对于流和表存在的固有流偏差。

如果回想一下图6-11和6-12,它们显示了相同的分数求和管道的两个不同视图,我们在整本书中都使用了它们作为示例:在图6-11中,是一个逻辑的Beam-Model视图,在图6-12中,是一个物理,面向流和表的视图。将两者进行比较有助于突出梁模型与流和表的关系。但是,如图8-1所示,通过将一个叠加在另一个之上,我们可以看到关系的另一个有趣方面:Beam Model固有的流偏差。
stsy_0801.png
图8-1 梁模型方法中的流偏

在此图中,我绘制了红色虚线,将逻辑视图中的转换与物理视图中的相应组件连接起来。当以这种方式观察时,突出的是,所有逻辑转换都是通过流连接的,甚至是涉及分组的操作(我们从第6章了解的结果是在某个地方创建了一个表)。用Beam的话来说,这些转换是PTransforms,它们始终应用于PCollections以产生新的PCollections。这里重要的一点是,Beam中的PCollections始终是流。结果,Beam模型是一种固有的流偏向数据处理方法:流是Beam管道(甚至是批处理管道)中的通用货币,并且始终对表进行特殊处理,将其抽象到源和接收器的边缘管道或隐藏在管道中某个位置的分组和触发操作之下。

因为Beam是根据流操作的,所以在涉及表的任何位置(源,接收器以及任何中间分组/取消分组),都需要进行某种转换以隐藏基础表。Beam中的转换看起来像这样:

  • 消耗表的源通常会对这些表的触发方式进行硬编码。用户无法指定他们要使用的表的自定义触发。可以编写源来触发对表的每个新更新作为记录,也可以将更新组批处理在一起,或者可以在某个时间点为表中的数据提供一个有限的快照。实际上,这仅取决于给定源的实用性以及源作者试图解决的用例。

  • 写表的接收器通常对它们输入流的分组方式进行硬编码。有时,这样做的方式是给用户一定程度的控制权。例如,只需将用户分配的密钥分组即可。在其他情况下,可能会隐式定义分组。例如,在将没有自然键的输入数据写入分片输出源时,通过对随机物理分区号进行分组。与来源一样,它实际上仅取决于给定接收器的实用性以及接收器的作者试图解决的用例。

  • 对于分组/取消分组操作,与源和接收器相比,Beam为用户提供了在将数据分组到表中以及将其重新分组到流中时的完全灵活性。这是设计使然。分组操作中的灵活性是必要的,因为分组数据的方式是定义管道的算法的关键组成部分。而且,取消分组的灵活性很重要,因此应用程序可以采用适合当前用例的方式来调整生成的流。

但是,这里有皱纹。请记住,从图8-1可以看出,波束模型固有地偏向流。因此,尽管可以将分组操作直接应用于流(这是Beam的GroupByKey操作),但该模型从未提供可直接应用触发器的一流表对象。结果,触发器必须应用于其他地方。这里基本上有两个选择:

触发器的预先声明
这是在实际应用触发器的表之前的管道中的某个点处指定触发器的位置。在这种情况下,您实质上是在遇到分组操作后预先指定要在管道中稍后看到的行为。以这种方式声明时,触发器将向前传播。

触发后声明
这是在触发器所应用的表之后的管道中的一点上指定触发器的位置。在这种情况下,您要指定要在声明触发器时看到的行为。以这种方式声明时,触发器将向后传播。

由于触发器的后声明使您可以在实际要观察的地方指定所需的行为,因此更加直观。不幸的是,Beam现有的(2.x和更早版本)使用了触发器的预声明(类似于也预定义了窗口的方式)。

尽管Beam提供了多种方法来解决表被隐藏的事实,但我们仍然面临这样的事实,即即使该表的内容确实是最终数据,也必须始终先触发表才能对其进行观察。您想消费的东西。这是当今存在的Beam模型的缺点,可以通过从以流为中心的模型转向将流和表都视为一流实体的模型来解决。

现在,让我们看一下Beam模型的概念反转:经典SQL。

SQL模型:基于表的方法

与Beam Model的流偏向方法相反,SQL在历史上一直采用表偏向方法:查询应用于表,并始终生成新表。这类似于我们在第6章中使用Map‐Reduce9查看的批处理模型,但是考虑一个具体的示例(如我们刚刚针对Beam模型的示例)将非常有用。

考虑下面的非规范化SQL表:

  1. UserScores (user, team, score, timestamp)

它包含用户分数,每个分数都标有相应用户及其相应团队的ID。没有主键,因此您可以假定这是一个仅追加表,每行均由其唯一的物理偏移量隐式标识。如果要从此表计算团队得分,可以使用类似以下查询的查询:

  1. SELECT team, SUM(score) as total
  2. FROM UserScores
  3. GROUP BY team;

当由查询引擎执行时,优化器可能会将查询分解为大约三个步骤:

  1. 扫描输入表(即触发它的快照)
  2. 将该表中的字段投影到团队并得分
  3. 按团队分组并汇总分数

如果我们使用类似于图8-1的图表来查看它,它将看起来像图8-2。

SCAN操作获取输入表并将其触发到有界流中,该流包含在查询执行时该表内容的快照。该流由SELECT操作消耗,该操作将四列输入行向下投影到两列输出行。作为非分组操作,它会产生另一个流。最后,两列团队和用户得分流进入GROUP BY,并按团队分组到一个表中,同一队SUM的得分相加在一起,得出我们的团队输出表及其相应的团队得分总数。

stsy_0802.png
图8-2 简单SQL查询中的表偏差

这是一个相对简单的示例,它自然地以表结尾,因此在经典SQL中突出显示表偏向确实不够。但是我们可以通过简单地将此查询的主要部分(投影和分组)分为两个单独的查询来弄清楚一些证据:

  1. SELECT team, score
  2. INTO TeamAndScore
  3. FROM UserScores;
  4. SELECT team, SUM(score) as total
  5. INTO TeamTotals
  6. FROM TeamAndScore
  7. GROUP BY team;

在这些查询中,我们首先将UserScores表投影到我们关心的两列,然后将结果存储在临时的TeamAndScore表中。然后,我们按团队将该表分组,并在此过程中汇总分数。将事情分解成两个查询的管道之后,我们的图看起来如图8-3所示。
stsy_0803.png
图8-3 将查询分为两部分,以显示更多表偏倚的证据

如果经典SQL将流公开为一流对象,则您希望第一个查询TeamAndScore的结果为流,因为SELECT操作消耗了一个流并产生了一个流。但是因为SQL的通用货币是表格,所以它必须首先将投影流转换成表格。而且,由于用户尚未指定用于分组的任何显式键,因此用户必须简单地按其身份对键进行分组(即附加语义,通常通过对每行的物理存储偏移量进行分组来实现)。

由于TeamAndScore现在是一个表,因此第二个查询必须在前面附加一个SCAN操作,以将该表扫描回到流中,以允许GROUP BY然后再次将其重新组合到表中,这一次是按team和与他们的个人分数相加。因此,由于中间表的显式实现,我们看到了两个隐式转换(从流再返回)。

也就是说,SQL中的表并不总是显式的。隐式表也可以存在。例如,如果我们要使用GROUP BY语句在查询的末尾添加HAVING子句,以过滤出分数低于特定阈值的团队,该图将变为图8-4所示。

stsy_0804.png
图8-4 带有最终HAVING子句的表偏差

加上HAVING子句,以前对用户可见的TeamTotals表现在是一个隐式的中间表。要根据HAVING子句中的规则过滤表的结果,必须将该表触发到可以过滤的流中,然后必须将该流隐式组合回到表中以产生新的输出表LargeTeamTotals。

这里重要的一点是经典SQL中存在明显的表偏差。流始终是隐式的,因此,对于任何实例化流,都需要从表到表的转换。此类转换的规则大致可分为以下几类:

输入表(即,光束模型中的源)
这些总是在特定的时间点整体隐式触发的10
(通常是查询执行时间)以生成包含表快照的有界流。这也与经典批处理所获得的完全相同。例如,我们在第6章中讨论过的MapReduce案例。

输出表(即,以“梁模型”为单位的汇)
这些表要么是查询中最终分组操作创建的表的直接体现,要么是对不以查询结尾的查询应用到查询的终端流的隐式分组(通过该行的某些唯一标识符)的结果。分组操作(例如,先前示例中的投影查询,或GROUP BY后跟HAVING子句)。与输入一样,这与经典批处理中看到的行为相匹配。

分组/取消分组操作
与Beam不同,这些操作仅在一个维度上提供了完全的灵活性:分组。传统的SQL查询提供了一整套的分组操作(GROUP BY,JOIN,CUBE等),而它们仅提供一种类型的隐式取消分组操作:在所有上游数据对其进行贡献之后,整体触发一个中间表已合并(同样,MapReduce中提供的完全相同的隐式触发器是随机操作的一部分)。结果,SQL在通过分组来整形算法方面提供了极大的灵活性,但是在整形查询执行过程中存在于幕后的隐式流时,灵活性基本上为零。

物化视图

考虑到经典SQL查询与经典批处理的相似程度,可能很想注销SQL固有的表偏差,因为它只是SQL工件不支持任何方式的流处理。但是这样做会忽略这样一个事实,即数据库在相当长的一段时间内都支持一种特定类型的流处理:实例化视图。物化视图是一种物理上已被表化为表的视图,并且随着源表的发展,数据库会随着时间的推移保持最新状态。请注意,这听起来与我们对时变关系的定义非常相似。物化视图的魅力在于,它们在不显着改变其操作方式(包括其固有的表偏差)的情况下,为SQL添加了一种非常有用的流处理形式。

例如,让我们考虑一下图8-4中的查询。我们可以将这些查询更改为CREATE MATERIALIZED VIEW11语句:

  1. CREATE MATERIALIZED VIEW TeamAndScoreView AS
  2. SELECT team, score
  3. FROM UserScores;
  4. CREATE MATERIALIZED VIEW LargeTeamTotalsView AS
  5. SELECT team, SUM(score) as total
  6. FROM TeamAndScoreView
  7. GROUP BY team
  8. HAVING SUM(score) > 100;

这样,我们将它们转换为连续的常设查询,以流方式连续处理对UserScores表的更新。即使这样,所得到的视图物理执行图看起来也几乎与一次性查询完全相同。为了支持这种流化物化视图的思想,没有将流变成显式的一流对象。物理执行计划中唯一值得注意的变化是替换了另一个触发器:SCAN-AND-STREAM而不是SCAN,如图8-5所示。
stsy_0805.png
图8-5 物化视图中的表偏差

这是“扫描并流”触发信号是什么? SCAN-AND-STREAM就像SCAN触发器一样开始,在某个时间点将表的全部内容发送到流中。但是,它并没有停止在那里并声明要完成的流(即有界),而是继续触发了对输入表的所有后续修改,从而产生了一个无界的流,该流捕获了表随时间的变化。在一般情况下,这些修改不仅包括对新值的INSERT,而且还包括对先前值的DELETE和对现有值的UPDATE(实际上,它们被视为同时的DELETE / INSERT对,或者按原样撤消/重做值)在Flink中调用)。

此外,如果我们考虑实例化视图的表/流转换规则,则唯一真正的区别是所使用的触发器:

  • 输入表是通过SCAN-AND-STREAM触发器而不是SCAN触发器隐式触发的。其他所有内容均与经典批处理查询相同。
  • 输出表的处理方式与经典批处理查询相同。
  • 分组/取消分组操作的功能与经典批处理查询相同,唯一的区别是对隐式取消分组操作使用了SCAN-AND-STREAM触发器而不是SNAP SHOT触发器。

给出此示例后,可以清楚地看到SQL固有的表偏差不仅限于限于批处理的SQL工件:物化视图使SQL能够执行特定类型的流处理而无需对方法进行任何重大更改,包括固有的偏向表。不论您将它用于批处理还是流处理,Classic SQL都是表偏向模型。

展望:迈向健壮的SQL流

现在,我们研究了时变关系,表和流提供时变关系的不同表示的方式,以及Beam和SQL模型在流和表理论方面的固有偏差。那么,所有这些都离开了我们呢?也许更重要的是,我们需要在SQL中更改或添加哪些内容以支持强大的流处理?令人惊讶的答案是:如果我们有良好的默认设置,那么数量不多。

我们知道,关键的概念变化是用时变关系代替经典的时间点关系。较早前我们看到,这是一种非常无缝的替换,这要归功于保持关系代数的关键闭包特性,从而可以将其应用于已经存在的关系运算符的全部范围。但是我们也看到直接处理时变关系通常是不切实际的。我们需要根据两种更常见的物理表现形式进行操作的能力:表格和流。这是一些具有良好默认值的简单扩展的地方。

我们还需要一些工具来可靠地推断时间,尤其是事件时间。这是时间戳,窗口和触发之类的东西发挥作用的地方。但是同样,明智地选择默认值对于减少实践中扩展这些扩展的频率非常重要。

很棒的是,我们实际上只需要这些。现在,让我们最后花一些时间详细研究这两类扩展:流/表选择和时间运算符。

流和表选择

在研究时变关系示例时,我们已经遇到了与流和表选择有关的两个关键扩展。它们是我们在SELECT关键字之后放置的TABLE和STREAM关键字,用于指示我们对给定时变关系的所需物理视图:

12:07> SELECT TABLE Name,
SUM(Score) as Total,
MAX(Time)
FROM UserScores GROUP BY Name;
————————-
| Name | Total | Time |
————————-
| Julie | 12 | 12:07 |
| Frank | 3 | 12:03 |
————————-

12:01> SELECT STREAM Name,
SUM(Score) as Total,
MAX(Time),
Sys.Undo as Undo
FROM UserScores GROUP BY Name;
—————————-
| Name | Total | Time |
—————————-
| Julie | 7 | 12:01 |
| Frank | 3 | 12:03 |
| Julie | 8 | 12:03 |
| Julie | 12 | 12:07 |
…….. [12:01, 12:07] ……

这些扩展相对简单明了,并在必要时易于使用。但是,有关流和表选择的真正重要的事情是在未明确提供默认值的情况下选择良好的默认值。这样的默认值应该尊重每个人都习惯的经典的,表偏向的SQL行为,同时还可以在包含流的世界中直观地操作。他们也应该很容易记住。此处的目标是帮助保持系统的自然感觉,同时还大大降低了我们必须使用显式扩展的频率。满足所有这些要求的默认值的一个很好的选择是:

  • 如果所有输入均为表,则输出为TABLE。
  • 如果任何输入是流,则输出是STREAM。

需要特别注意的是,这些时变关系的物理渲染实际上仅在您想要以某种方式实现TVR(直接查看或将其写入某些输出表或流)时才需要。给定一个在全保时变关系下运行的SQL系统,中间结果(例如WITH AS或SELECT INTO语句)可以作为全保TVR以系统自然处理的任何格式保留,而没有 需要将它们呈现为其他一些更有限的具体表现形式。

这实际上是用于流和表选择的。除了要直接处理流和表的能力之外,如果我们要在SQL中支持健壮的,无序的流处理,我们还需要一些更好的工具来进行时间推理。现在,让我们更详细地了解其中的含义。

时间运算符

健壮的,无序的处理的基础是事件时间时间戳:事件记录的一小部分元数据,用于捕获事件发生的时间而不是事件的观察时间。在SQL世界中,事件时间通常只是给定TVR的另一列数据,它本身就存在于源数据本身中。从这个意义上讲,在记录本身内实现记录的事件时间的想法是SQL已经通过将时间戳记放在常规列中而自然处理的事情。

在继续之前,我们来看一个例子。为了将所有这些SQL知识与我们先前在书中探讨的概念联系在一起,我们复活了一个正在运行的示例,该示例总结了团队中各个成员的9个得分,得出了该团队的总得分。如果您还记得的话,这些分数在X =事件时间/ Y =处理时间轴上绘制时,如图8-6所示。

stsy_0806.png
图8-6 我们正在运行的示例中的数据点

如果我们将这些数据想象成一个经典的SQL表,它们可能看起来像这样,按事件时间排序(图8-6中从左到右):

image.png
回想一下,当我第一次介绍该数据集时,我们已经在第二章中看到了该表。与我们通常显示的相比,此渲染提供了有关数据的更多细节,明确强调了以下事实:九个得分本身属于七个不同的用户,每个用户都是同一团队的成员。在我们开始研究示例之前,SQL提供了一种很好的,简洁的方法来查看数据的完整布局。

这种数据视图的另一个好处是,它可以完全捕获每个记录的事件时间和处理时间。您可以想象事件时间列只是原始数据的另一部分,而处理时间列则是系统提供的东西(在这种情况下,使用假设的Sys.MTime列来记录处理过程,给定行的时间修改时间戳;即该行到达源表的时间),捕获记录本身进入系统的时间。

SQL的有趣之处在于,以不同的方式查看数据非常容易。例如,如果我们想按处理时间顺序查看数据(图8-6中从下到上),则可以简单地更新ORDER BY子句:
image.png
image.png

正如我们先前所了解的,这些数据的表格渲染实际上是完整基础TVR的部分保真度视图。如果我们改为查询整个面向表的TVR(为了简洁起见,仅查询三个最重要的列),它将扩展为以下内容:
image.png
image.png

大量的数据。另外,在这种情况下,STREAM版本会更加紧凑。由于该关系中没有显式分组,因此它看起来与之前的时间点TABLE呈现基本相同,并且尾部页脚的添加描述了迄今为止在流中捕获的处理时间范围,以及 请注意,系统仍在等待流中的更多数据(假设我们将流视为无界的;不久我们将看到流的有界版本):
image.png
但这只是看原始输入记录而没有任何形式的转换。当我们开始改变关系时,更有趣的是。过去,当我们探索这个示例时,我们总是从经典的批处理开始,以总结整个数据集的得分,所以在这里做同样的事情。第一个示例管道(以前作为示例6-1提供)类似于Beam中的示例8-1。

示例8-1 求和管道

  1. PCollection<String> raw = IO.read(...);
  2. PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn()); PCollection<KV<Team, Integer>> totals =
  3. input.apply(Sum.integersPerKey());

并在世界的流和表视图中进行渲染,该管道的执行类似于图8-7。

stsy_0807.png
图8-7 经典批处理的流和表视图

既然我们已经将数据放置在适当的架构中,那么我们将不会在SQL中进行任何解析; 相反,我们专注于解析转换后管道中的所有内容。而且由于我们采用经典的批处理模型,即仅在处理完所有输入数据之后才检索单个答案,因此求和关系的TABLE和STREAM视图看起来基本上是相同的(回想一下,我们正在处理有界版本 这些初始的批处理样式示例的数据集;因此,此STREAM查询实际上以短划线和END-OF-STREAM标记终止):
image.png
更有趣的是,当我们开始将窗口添加到混合中时。这将使我们有机会开始更仔细地研究需要添加到SQL以支持可靠的流处理的临时操作。

哪里:开窗

正如我们在第6章中了解到的那样,窗口化是对按键分组的一种修改,其中窗口成为层次结构键的次要部分。与经典的程序化批处理一样,只需将时间作为GROUP BY参数的一部分,您就可以在SQL中轻松地将数据窗口化为现在存在的SQL中更简单的窗口。或者,如果所涉及的系统提供了该功能,则可以使用内置的拖带操作。我们先看一下两者的SQL示例,但首先让我们重新看一下第3章中的编程版本。回到示例6-2,窗口化的Beam管道看起来像示例8-2所示。

示例8-2 求和管道

  1. PCollection<String> raw = IO.read(...);
  2. PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn()); PCollection<KV<Team, Integer>> totals = input
  3. .apply(Window.into(FixedWindows.of(TWO_MINUTES))) .apply(Sum.integersPerKey());

该管道的执行(在图6-5中呈现的流和表中)看起来像图8-8中所示的图。
stsy_0808.png
图8-8 批处理引擎上窗口加总的流和表视图

正如我们之前看到的,从图8-7到8-8的唯一实质变化是,现在将SUM操作创建的表划分为固定的,两分钟的时间窗口,最后产生四个窗口化的答案,而不是 我们之前拥有的单个全球总和。

为了在SQL中做同样的事情,我们有两个选择:通过在GROUP BY语句中包含窗口的某些独特功能(例如结束时间戳)来隐式地进行窗口显示,或者使用内置的窗口操作。让我们看看两者。

首先,临时窗口化。在这种情况下,我们将在SQL语句中自行执行计算窗口的数学运算:
image.png
我们还可以使用显式的窗口语句(例如Apache Calcite支持的语句)获得相同的结果:
image.png
这就引出了一个问题:如果我们可以使用现有的SQL构造来隐式地进行窗口化,为什么还要费心地支持显式的窗口化构造呢?原因有两个,在本示例中只有第一个显而易见(我们将在本章的后面看到另一个在起作用):

  1. 窗口化为您处理了窗口计算数学。如果您直接指定宽度和滑动等基本参数,而不是自己计算窗口数学,则始终可以正确解决问题要容易得多。

  2. 窗口允许简洁表达更复杂的动态分组,例如会话。即使SQL在技术上能够表达定义会话窗口的每个元素之间的时间间隔内的每个元素,但相应的咒语是分析函数,自我联接和数组的混乱杂乱不能单凭任何理由相信凡人会自生自灭。

除了已经存在的临时窗口功能之外,这两者都是在SQL中提供一流窗口构造的引人注目的论点。

至此,从经典批处理/经典关系的角度来看,当我们将数据作为表使用时,我们看到了窗口的外观。但是,如果我们想将数据以流的形式使用,那么我们可以回到Beam模型的第三个问题:在处理时间何时实现输出?

时间:触发

与以前一样,该问题的答案是触发器和水印。但是,在SQL的上下文中,有一个强有力的论点是要有与我们在第3章中使用Beam模型引入的默认集不同的默认集,而不是默认使用单个水印触发器,这是更类似于SQL的默认设置 将是从物化视图中获取线索并触发每个元素。换句话说,只要有新输入到达,我们就会产生相应的新输出。

类似于SQL的默认设置:按记录触发。使用每次记录触发作为默认设置有两个引人注目的好处:

简单
每次记录更新的语义很容易理解;物化视图已经以这种方式运行了多年。

保真度
与变更数据捕获系统一样,每条记录触发可产生给定时变关系的全保真流呈现。转换过程中不会丢失任何信息。

缺点主要是成本:触发器总是在分组操作之后应用,并且分组的性质通常提供了减少流经系统的数据基数的机会,从而相应地降低了进一步处理这些汇总结果的成本下游。即便如此,对于用例来说,在成本不高的用例中,其清晰性和简便性所带来的好处还是可以超过预先默认为非全保真触发器的认知复杂性。

因此,对于我们第一次将团队总得分作为数据流使用时,让我们看看使用每条记录触发器会是什么样子。Beam本身没有精确的每条记录触发器,因此,如示例8-3中所示,我们改为使用重复的AfterCount(1)触发器,该触发器将在新记录到达时立即触发。

示例8-3 每条记录触发

  1. PCollection<String> raw = IO.read(...);
  2. PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn()); PCollection<KV<Team, Integer>> totals = input
  3. .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(AfterCount(1)))
  4. .apply(Sum.integersPerKey());

然后,该管道的流和表呈现将类似于图8-9中所示。
stsy_0809.png
图8-9 具有按记录触发的流引擎上的窗口求和的流和表视图

使用基于记录的触发器的一个有趣的副作用是,它如何在某种程度上掩盖了静止数据的影响,因为随后触发器立即将它们重新放回了运动。即使这样,由于未分组的值流从表中流走,因此分组中的合计工件仍然停留在表中。

回到SQL,现在我们可以看到将相应的时间-值关系呈现为流的效果。(毫不奇怪)它看起来很像图8-9中的动画中的值流:
image.png
image.png

但是,即使是这个简单的用例,它也很健谈。如果我们正在构建用于大型移动应用程序处理数据的管道,则我们可能不希望为处理每个上游用户评分而支付处理下游更新的费用。这是自定义触发器出现的地方。

水印触发器。例如,如果要切换Beam管道以使用水印触发器,则在TVR的流版本中,每个窗口只能获得一个输出,如示例8-4和图8-10所示。

示例8-4 水印触发

  1. PCollection<String> raw = IO.read(...);
  2. PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn()); PCollection<KV<Team, Integer>> totals = input
  3. .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(AfterWatermark())
  4. .apply(Sum.integersPerKey());

stsy_0810.png
图8-10 带水印触发的窗口求和

为了在SQL中获得相同的效果,我们需要语言支持来指定自定义触发器。类似于EMIT 语句,例如EMIT WHEN WATERMARK PAST 。这将向系统发出信号,当表的输入水印超过指定列中的时间戳记值时(在这种情况下, 在窗口的尽头)。

让我们看一下呈现为流的这种关系。从了解何时触发触发的角度来看,停止依赖原始输入的MTime值,而是捕获流中的行发射的当前时间戳也是很方便的:
image.png
正如我们在前几章中所遇到的那样,这里的主要缺点是由于使用启发式水印而导致的后期数据问题。考虑到最新数据,更好的选择是在出现延迟记录的任何时间也立即输出更新,使用支持重复延迟触发的水印触发器上的变体,如示例8-5和图8-11所示 。

示例8-5 水印触发后触发

  1. PCollection<String> raw = IO.read(...);
  2. PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn()); PCollection<KV<Team, Integer>> totals = input
  3. .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(AfterWatermark()
  4. .withLateFirings(AfterCount(1)))) .apply(Sum.integersPerKey());

stsy_0811.png
图8-11 带时间/延迟触发的窗口求和

通过允许指定两个触发器,我们可以在SQL中做同样的事情:

  • 一个水印触发器,为我们提供一个初始值:WHEN WATERMARK PAST ,窗口的结尾用作时间戳记
  • Arepeateddelaytriggerforlatedata:ANDTHENAFTER ,且为0,以提供每条记录的语义。

现在我们每个窗口获取多行,另外有两个可用的系统列也很有用:给定窗口相对于水印(Sys.EmitTiming)的每一行/窗格的计时,以及索引 给定窗口的窗格/行的数量(Sys.EmitIndex,以标识给定行/窗口的修订顺序):

image.png
image.png

借助我们的启发式水印,使用此触发器,对于每个窗格,我们都能获得一个可能是正确的准时答案。对于迟到的任何数据,我们都可以获取该行的更新版本,以修改我们之前的结果。

重复的延迟触发器。您可能想要的另一个主要的临时触发器用例是重复的延迟更新。也就是说,在任何新数据到达后一分钟(在处理时间内)触发一个窗口。请注意,这与在微分批处理系统中获得的对齐边界上的触发不同。如例8-6所示,通过相对于到达窗口/行的最新记录的延迟触发,有助于使触发负载的分布比突发,对齐的触发更均匀。它还不需要任何种类的水印支持。图8-12给出了结果。

示例8-6 重复触发,延迟一分钟

  1. PCollection<String> raw = IO.read(...);
  2. PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn()); PCollection<KV<Team, Integer>> totals = input
  3. .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(Repeatedly(UnalignedDelay(ONE_MINUTE)))
  4. .apply(Sum.integersPerKey());

stsy_0812.png
图8-12 窗口化求和,重复一分钟延迟触发

使用这种触发器的效果与我们开始时的每条记录触发非常相似,但是由于触发中引入了额外的延迟,因此闲聊略少,这使系统可以消除正在产生的某些行。通过调整延迟,我们可以调整所生成的数据量,从而在成本和及时性之间保持平衡,以适应用例。

呈现为SQL流,它看起来像这样:
image.png
image.png

数据驱动的触发器。在继续探讨Beam模型中的最后一个问题之前,有必要简要讨论一下数据驱动触发器的概念。由于SQL处理类型的动态方式,似乎数据驱动触发器将对提议的EMIT 子句进行非常自然的添加。例如,如果总分超过10,我们想触发总和怎么办? EMIT WHEN得分> 10不会很自然地起作用吗?
好吧,是的,不是的。是的,这样的结构很自然。但是,当您考虑到这种构造实际发生的情况时,您实际上将在每条记录上触发,然后执行Score> 10谓词来确定是否应将触发的行传播到下游。您可能还记得,这听起来很像HAVING子句所发生的情况。而且,的确,您只需在查询末尾添加HAVING Score> 10,即可获得完全相同的效果。在这一点上,它提出了一个问题:是否值得添加显式的数据驱动触发器?可能不是。即便如此,仍然令人鼓舞的是,使用标准SQL和精心选择的默认值可以很容易地获得数据驱动的触发器的预期效果。

如何:积累

到目前为止,在本节中,我们一直忽略了我在本章开始时介绍的Sys.Undo列。因此,我们默认使用累积模式来回答关于窗口/行的细化如何相互关联的问题。换句话说,无论何时我们观察到一个聚合行的多个修订,后来的修订都是在先前的修订的基础上构建的,将新的输入与旧的输入一起累积。我之所以选择这种方法,是因为它与上一章中使用的方法相匹配,并且相对于表世界中事物的工作方式而言,它是相对简单的翻译。

也就是说,累积模式具有一些主要缺点。实际上,正如我们在第2章中讨论过的那样,由于计数过多,对于具有两个或多个分组操作序列的任何查询/管道来说,它都是显而易见的。允许在一个查询中包含多个串行分组操作的系统中消耗一行的多个修订版的唯一明智的方法是,如果默认情况下它以累积和缩回模式进行操作。否则,您会遇到这样的问题:由于对单个行的多个修订版本的盲目合并,给定的输入记录在一次聚合中被多次包含。

因此,当涉及到将累积模式语义合并到SQL世界中的问题时,最适合我们提供直观自然体验的目标的选项是系统默认情况下是否在封面下使用缩进。15前面介绍了Sys.Undo列,如果您不关心撤消(直到现在为止在本节的示例中),则无需索取。但是,如果您要他们,他们应该在那里。

SQL世界中的撤销。要了解我的意思,我们来看另一个示例。为了适当地解决问题,让我们看一下一个没有撤消就相对不切实际的用例:构建会话窗口并将其增量写入到HBase之类的键/值存储中。在这种情况下,我们将在汇总过程中通过汇总产生增量会话。但是在许多情况下,给定的会话只是一个或多个先前会话的演变而已。在这种情况下,您确实要删除以前的会话并将其替换为新的会话。但是,你是怎么做的?判断给定会话是否替换另一会话的唯一方法是比较它们,以查看新会话是否与旧会话重叠。但这意味着在管道的单独部分中复制一些会话构建逻辑。而且,更重要的是,这意味着您不再具有幂等的输出,因此,如果您想保持端对端的一次语义,就需要跳过很多额外的工作。更好的方法是让管道简单地告诉您哪些会话已删除,哪些会话已添加。这就是退缩给你的。

为了了解这一点(以及在SQL中)的实际效果,让我们修改示例管道,以计算间隔时间为一分钟的会话窗口。为了简化和清楚起见,我们返回使用默认的每条记录触发器。请注意,在这些会话示例的处理时间内,我还转移了一些数据点,以使图表更清晰;事件时间时间戳保持不变。更新后的数据集如下所示(转换后的处理时间时间戳以黄色突出显示):
image.png
image.png

首先,让我们看一下没有缩回的管道。在明确了为什么在将增量会话写入键/值存储的用例中该管道有问题之后,我们将着眼于撤回版本。

非伸缩管道的Beam代码看起来像例8-7。结果如图8-13所示。

示例8-7 具有按记录触发和累积但没有撤回的会话窗口

  1. PCollection<String> raw = IO.read(...);
  2. PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn()); PCollection<KV<Team, Integer>> totals = input
  3. .apply(Window.into(Sessions.withGapDuration(ONE_MINUTE)) .triggering(Repeatedly(AfterCount(1))
  4. .accumulatingFiredPanes()) .apply(Sum.integersPerKey());

stsy_0813.png
图8-13 会话窗口总计,但有累积但没有撤消

最后,以SQL呈现,输出流如下所示:
image.png
在此处(在动画以及SQL渲染中)需要注意的重要事项是增量会话流的外观。从我们的整体角度来看,很容易在动画中直观地识别出以后的会话取代了之前的会话。但是,想象一下在流中一个接一个地接收元素(如SQL清单中所示),并且需要以一种最终将其包含在HBase表中的方式将它们写入HBase,该表仅包含两个最后的会话(值分别为36和12)。你会怎么做?好吧,您需要执行一系列的读-修改-写操作,以读取密钥的所有现有会话,将它们与新会话进行比较,确定哪些重叠,为过时的会话发出删除操作,最后为新的会话发出写入文件-全部花费大量的额外成本,并且失去了幂等性,最终将使您无法提供端到端的,一次精确的语义。只是不切实际。

然后将其与相同的管道进行对比,但启用缩进,如示例8-8所示,如图8-14所示。

示例8-8具有按记录触发,累积和缩回的会话窗口

  1. PCollection<String> raw = IO.read(...);
  2. PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn()); PCollection<KV<Team, Integer>> totals = input
  3. .apply(Window.into(Sessions.withGapDuration(ONE_MINUTE)) .triggering(Repeatedly(AfterCount(1))
  4. .accumulatingAndRetractingFiredPanes()) .apply(Sum.integersPerKey());

stsy_0814.png
图8-14 会话窗口求和与累积和缩回

最后,以SQL形式。对于SQL版本,我们假设系统默认情况下是在后台使用缩进,并且每次我们请求特殊的Sys.Undo列时,都会在流中对单独的缩回行进行材料化。16正如我最初描述的那样, 该列的价值在于它使我们能够将撤回行(在Sys.Undo列中标记为undo)与正常行(在Sys.Undo列中未标记)进行区分,尽管它们可以很容易地被标记 重做,代替):
image.png
image.png
包括撤回在内,会话流不仅包含新的会话,而且还包含已替换的旧会话的撤回。使用此流,随着时间的推移在HBase中正确建立会话集很简单:您只需在新会话到达时编写新会话(未标记重做行),而在新会话撤回时删除旧会话(撤消行)。好多了!

丢弃模式或缺少该模式。在此示例中,我们展示了如何简单且自然地将撤回合并到SQL中,以提供累加模式以及累加和撤消模式语义。但是丢弃模式呢?

对于特定的用例,例如非常简单的管道,这些管道通过单个分组操作部分聚合大量输入数据,然后将其写入本身支持聚合的存储系统(例如,类似数据库的系统)中,丢弃模式可能非常极端。作为节省资源的选择很有价值。但是,在那些相对狭窄的用例之外,丢弃模式令人困惑并且容易出错。因此,可能不值得直接将其合并到SQL中。需要它的系统可以在SQL语言本身之外提供它作为选项。那些不能简单地提供更自然的累积和缩回模式默认值,并且可以选择在不需要时忽略缩回。

概括

这是一段漫长的旅程,但令人着迷。在本章中,我们介绍了很多信息,因此,让我们花点时间来思考一下这一切。
首先,我们认为流数据处理和非流数据处理之间的关键区别在于时间的增加。我们观察到关系(关系代数的基础数据对象,它本身是SQL的基础)本身随时间发展,并从中推导了TVR的概念,TVR捕获了关系的演化序列随时间变化的经典快照关系。从这个定义中,我们可以看到关系代数的闭包属性在TVR世界中仍然完好无损,这意味着随着我们从时间点快照关系的世界变成了TVR的流兼容世界。

其次,我们研究了Beam模型和经典SQL模型中固有的偏差,得出的结论是Beam具有面向流的方法,而SQL采用了面向表的方法。

最后,我们研究了为SQL提供强大的流处理支持所需的假设语言扩展18,以及一些精心选择的默认设置,这些默认设置可以大大减少使用这些扩展的需求:

表格/流选择
鉴于可以使用两种不同的方式(表或流)来呈现任何时变关系,因此我们需要能够在对查询结果进行处理时选择所需的呈现方式。我们引入了TABLE,STREAM和TVR关键字,以提供一种很好的显式方式来选择所需的呈现方式。
更好的是不需要显式指定一个选择,这就是好的默认值的地方。如果所有输入都是表,那么好的默认值就是输出也应该是表。这为您提供了每个人都习惯的经典关系查询行为。相反,如果任何输入是流,则合理的默认值是输出也将是流。

加窗
尽管您可以使用现有的SQL构造来声明性地声明某些类型的简单窗口,但是使用显式窗口运算符仍然有其价值:

  • 窗口运算符封装了窗口计算数学。
  • 窗口化允许简洁,复杂的动态分组(如会话)的表达。

因此,添加用于分组的简单窗口构造可以帮助使查询减少出错的可能性,同时还提供当今在声明式SQL中无法表达的功能(如会话)。

水印
这不是SQL扩展,而是系统级别的功能。如果所讨论的系统在封面下集成了水印,则只有在确信该行的输入完成后,才可以将其与触发器结合使用以生成包含该行的单个权威版本的流。这对于用例无法轮询物化视图表以获取结果的情况非常重要,取而代之的是,必须直接将流的输出作为流使用。例如通知和异常检测。

扳机
触发器定义从TVR创建的流的形状。如果未指定,则默认值应为每条记录触发,这将提供与物化视图的语义相匹配的简单自然的语义。除了默认值之外,有用的触发器基本上有两种主要类型:

  • 水印触发器,用于在每个窗口的输入被认为完成时为每个窗口产生单个输出。
  • 重复的延迟触发器,用于提供定期更新。

这两个的组合也很有用,特别是在启发式水印的情况下,以提供我们之前看到的早期/按时间/晚期模式。

特殊系统栏
当将TVR用作流时,有一些有趣的元数据可能有用,并且最容易显示为系统级列。我们看了四个:

系统时间
在TVR中最后修改给定行的处理时间。

Sys.EmitTiming
该行的计时相对于水印发出(早,准时,晚)。

Sys.EmitIndex
此行的emit版本的从零开始的索引。

系统撤消
该行是普通行还是撤回(撤消)。默认情况下,系统应该在隐藏状态下进行缩回操作,这在任何时候可能需要进行一系列不止一次分组操作时都是必要的。如果在将TVR渲染为流时未投影Sys.Undo列,则仅返回正常行,从而提供了一种在累积模式与累积模式和缩回模式之间切换的简单方法。

使用SQL进行流处理不需要很困难。实际上,SQL中的流处理已经以物化视图的形式非常普遍。重要的部分实际上归结为捕获数据集/关系随时间的变化(通过时变关系),提供了在这些时变关系的物理表或流表示之间进行选择的方法,并提供了进行时间推理的工具。(窗口,水印和触发器),我们在整本书中一直在讨论。而且,至关重要的是,您需要良好的默认设置,以最大程度地减少在实践中需要使用这些扩展的频率。