1. 综述

结构化流是一个建立在Spark SQL引擎上的可扩展和容错的流处理引擎。你可以像表达静态数据的批处理计算一样表达你的流计算。Spark SQL引擎将负责增量地、持续地运行它,并在流式数据不断到达时更新最终结果。你可以使用Scala、Java、Python或R中的Dataset/DataFrame API来表达流式聚合、事件时间窗口、流式批量连接等。计算是在同一个优化的Spark SQL引擎上执行的。最后,该系统通过检查点和Write-Ahead Logs确保了端到端的精确容错保证。简而言之,结构化流提供了快速、可扩展、容错、端到端的完全一次性流处理,用户无需对流进行推理。
在内部,默认情况下,结构化流查询是使用微批处理引擎来处理的,该引擎将数据流作为一系列小批作业来处理,从而实现低至100毫秒的端到端延迟和精确的容错保证。然而,从Spark 2.3开始,我们引入了一种新的低延迟处理模式,称为连续处理,它可以实现低至1毫秒的端到端延迟,并有最少一次的保证。在不改变查询中的数据集/数据帧操作的情况下,你将能够根据你的应用需求来选择模式。
在本指南中,我们将引导你了解编程模型和API。我们将主要使用默认的微批处理模式来解释这些概念,然后再讨论连续处理模式。首先,让我们从一个结构化流查询的简单例子开始—流式字数统计。

2. 编程模型

结构化流的关键思想是将实时数据流视为一个正在不断追加的表。这导致了一种新的流处理模型,与批处理模型非常相似。你将把你的流计算作为标准的批处理查询来表达,就像在一个静态的表上一样,而Spark将其作为一个增量查询在无界的输入表中运行。让我们更详细地了解这个模型。

2.1 基础概念

将输入数据流视为 “输入表”。每一个到达数据流的数据项就像一个新的行被追加到输入表中。
image.png
对输入的查询将产生 “结果表”。每一个触发区间(例如,每1秒),新的行会被追加到输入表,最终更新结果表。每当结果表被更新时,我们希望将改变后的结果行写到一个外部数据源中。
image.png
输出 “被定义为被写入外部存储的内容”。输出可以用不同的模式来定义。

  • 完整模式:整个更新的结果表将被写到外部存储中。由存储连接器来决定如何处理整个表的写入。
  • 追加模式:自上次触发以来,只有在结果表中附加的新行将被写入外部存储。这只适用于结果表中的现有行不会发生变化的查询。
  • 更新模式:只有自上次触发后在结果表中被更新的行将被写入外部存储(自Spark 2.1.1起可用)。请注意,这与完整模式不同,该模式只输出自上次触发后发生变化的行。如果查询不包含聚合,它将等同于Append模式。

请注意,每种模式都适用于某些类型的查询。这一点将在后面详细讨论。

为了说明这个模式的使用,让我们结合上面的快速例子来理解这个模式。第一行DataFrame是输入表,而最后的wordCounts DataFrame是结果表。请注意,在流式DataFrame上生成wordCounts的查询与静态DataFrame完全相同。然而,当这个查询开始时,Spark将不断检查来自套接字连接的新数据。如果有新的数据,Spark将运行一个 “增量 “查询,将之前运行的计数与新的数据结合起来,计算出更新的计数,如下图所示。
image.png
请注意,结构化流媒体并没有将整个表物化。它从流媒体数据源中读取最新的可用数据,对其进行增量处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小的中间状态数据(例如前面例子中的中间计数)。
这种模式与许多其他的流处理引擎明显不同。许多流处理系统要求用户自己维护运行中的聚合,因此必须对容错性和数据一致性(至少一次,或最多一次,或完全一次)进行推理。在这个模型中,Spark负责在有新数据时更新结果表,从而将用户从推理中解脱出来。作为一个例子,让我们看看这个模型如何处理基于事件时间的处理和晚到的数据。请注意,结构化流媒体并没有将整个表物化。它从流媒体数据源中读取最新的可用数据,对其进行增量处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小的中间状态数据(例如前面例子中的中间计数)。

2.2 处理事件时间和延迟数据

事件时间 是嵌入在数据本身的时间。对于许多应用来说,你可能想对这个事件时间进行操作。例如,如果你想得到物联网设备每分钟产生的事件数量,那么你可能想使用数据产生的时间(也就是数据中的事件时间),而不是Spark接收它们的时间。这个事件时间在这个模型中被非常自然地表达出来—来自设备的每个事件是表中的一行,而事件时间是表中的一个列值。这使得基于窗口的聚合(例如每分钟的事件数)只是事件时间列上的一种特殊的分组和聚合—每个时间窗口是一个组,每一行可以属于多个窗口/组。因此,这种基于事件时间窗口的聚合查询可以在静态数据集(例如来自收集的设备事件日志)和数据流上一致地定义,使用户的生活变得更加容易。
此外,这个模型可以自然地处理那些根据事件时间晚于预期到达的数据。由于Spark正在更新结果表,所以当有晚到的数据时,它可以完全控制更新旧的聚合,以及清理旧的聚合以限制中间状态数据的大小。从Spark 2.1开始,我们支持 水印,允许用户指定晚期数据的 阈值,并允许引擎相应地清理旧状态。这些在后面的 窗口操作 部分会有更详细的解释。

2.3 容错语义

提供 端到端的完全一次性语义 是设计 结构化流 的关键目标之一。为了实现这一目标,我们设计了结构化流的 Source、Sink 和执行引擎,以可靠地跟踪处理的确切进度,这样它就可以通过重新启动和/或重新处理来处理任何类型的失败。每个流式 Source 都被认为有偏移量(类似于Kafka的偏移量,或Kinesis的序列号),以跟踪流中的读取位置。该引擎使用 检查点写前日志 来记录每个触发器中正在处理的数据的偏移范围。流式 Sink 的设计是为了处理再加工的瞬时性。通过使用可重放的 Source 和空闲的 Sink ,结构化流可以确保在任何故障情况下的端到端完全一次性语义。

3. DataSet 和 DataFrame API 的使用

自Spark 2.0以来,DataFrames和Datasets可以表示静态的、有界的数据,也可以表示流式的、无界的数据。与静态Datasets/DataFrames类似,你可以使用通用入口SparkSession(Scala/Java/Python/R文档)来从流式数据源创建流式DataFrames/Datasets,并对其应用与静态DataFrames/Datasets相同的操作。如果你不熟悉Datasets/DataFrames,强烈建议你使用《DataFrame/Dataset编程指南》熟悉它们。

3.1 创建流式 DataFrame 和流式 DataSet

流式 DataFrames 可以通过 SparkSession.readStream() 返回的 DataStreamReader 接口(Scala/Java/Python文档)来创建。在 R 中,用 read.stream() 方法。与创建静态 DataFrame 的 read 接口类似,你可以指定 Source 的细节:数据格式、模式、选项等。

3.1.1 输入 Source

  • 文本 数据源:读写在一个目录中的文件,作为数据流。支持的文件格式有text, csv, json, orc, parquet。参见DataStreamReader接口的文档以获得更多的最新列表,以及每种文件格式的支持选项。注意,文件必须被 原子化 地放置在给定的目录中,在大多数文件系统中,这可以通过文件移动操作实现。
  • Kafka 数据源:从 Kafka 中读取数据,支持 Kafka 0.10.0 以及更高版本
  • Socket 数据源(用于测试):从一个 Socket 连接中读取UTF8文本数据。监听的服务器 Socket 是在驱动程序。注意,这应该只用于测试,因为这不提供端到端的容错保证。
  • Rate 数据源(用于测试):以每秒指定的行数生成数据,每个输出行包含一个时间戳和值。其中timestamp是Timestamp类型,包含消息发送的时间,value是Long类型,包含消息计数,从0开始作为第一行。这个源是用于 测试 基准测试 的。

有些 Source 不是容错的,因为它们不能保证在故障后可以使用检查点的偏移量来重放数据。参见前面关于容错语义的部分。下面是Spark中所有源的细节。

  1. val spark: SparkSession = ...
  2. // socket 数据源
  3. val socketDF = spark
  4. .readStream
  5. .format("socket")
  6. .option("host", "localhost")
  7. .option("port", 9999)
  8. .load()
  9. // Read all the csv files written atomically in a directory
  10. val userSchema = new StructType().add("name", "string").add("age", "integer")
  11. val csvDF = spark
  12. .readStream
  13. .option("sep", ";")
  14. .schema(userSchema) // 指定 csv 文件的 schema
  15. .csv("/path/to/directory") // 指定 csv 文件的存储目录

这些例子生成的流式DataFrame是无类型的,这意味着DataFrame的模式在编译时不被检查,只在运行时提交查询时检查。一些操作,如map、flatMap等,需要在编译时知道其类型。要做到这些,你可以使用与静态DataFrame相同的方法将这些未类型化的流式DataFrames转换为类型化的流式数据集。更多细节请参见《SQL编程指南》。此外,关于支持的流式数据源的更多细节将在本文后面讨论。

3.1.2 模式推理和流式DataFrame和DataSet的分区

默认情况下,基于文件来源的结构化流需要你指定模式,而不是依靠Spark自动推断。这一限制确保了流式查询将使用一个一致的模式,即使在失败的情况下也是如此。对于特定的用例,你可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。
当名为 /key=value/ 的子目录存在时,分区发现确实会发生,listing会自动递归到这些目录中。如果这些列出现在用户提供的模式中,它们将由Spark根据正在读取的文件的路径来填写。构成分区方案的目录必须在查询开始时存在,并且必须保持静态。例如,当/data/year=2015/存在时,添加/data/year=2016/是可以的,但改变分区列是无效的(即通过创建目录/data/date=2016-04-17/)。

3.2 流式 DataFrame 和 DataSet 的操作

你可以在流式DataFrame/DataSet上应用各种操作:从无类型的、类似SQL的操作(如select、where、groupBy),到类似RDD的类型化操作(如map、filter、flatMap)。更多细节见SQL编程指南。让我们来看看你可以使用的几个操作例子:

3.2.1 基础操作:选择、映射和聚合

DataFrame/Dataset上的大多数常见操作都支持流式处理。少数不支持的操作将在本节后面讨论。

  1. case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
  2. val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
  3. val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
  4. // Select the devices which have signal more than 10
  5. df.select("device").where("signal > 10") // using untyped APIs
  6. ds.filter(_.signal > 10).map(_.device) // using typed APIs
  7. // Running count of the number of updates for each device type
  8. df.groupBy("deviceType").count() // using untyped API
  9. // Running average signal for each device type
  10. import org.apache.spark.sql.expressions.scalalang.typed
  11. ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))

你也可以把一个流式的DataFrame/Dataset注册为一个 临时视图,然后在其上应用SQL命令。

  1. df.createOrReplaceTempView("updates")
  2. spark.sql("select count(*) from updates")

【注意】你可以通过使用 df.isStreaming 来识别一个 DataFrame/Dataset 是否有流数据。

3.2.2 对事件时间的窗口操作

在一个滑动的事件时间窗口上的聚合,可以很直接的使用结构化流,而且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每一个独特的值保留聚合值(例如计数)。在基于窗口的聚合中,聚合值是为某一行的事件时间所处的每个窗口而维护的。让我们通过一个插图来理解这一点。
想象一下,我们的例子被修改了,现在的流数据包含了行数据和行产生时的时间。我们想在10分钟内计算字数,而不是运行字数,每5分钟更新一次。也就是说,在10分钟窗口12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20之间收到的字数统计,等等。请注意,12:00 - 12:10意味着12:00之后但在12:10之前到达的数据。现在,考虑一个在12:07收到的字。这个词应该增加对应于12:00-12:10和12:05-12:15两个窗口的计数。因此,计数将由分组键(即词)和窗口(可以从事件时间计算出来)来索引。
结果表将看起来像下面这样:
image.png
由于这种 窗口化 与分组类似,在代码中,你可以使用 groupBy()window() 操作来表达窗口化聚合。你可以在Scala/Java/Python中看到下面例子的完整代码。

  1. import spark.implicits._
  2. val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
  3. val windowedCounts = words.groupBy(
  4. window($"timestamp", "10 minutes", "5 minutes"),
  5. $"word"
  6. ).count()

3.2.2.1 处理延时数据和水印

现在考虑一下,如果其中一个事件延时到达应用程序会发生什么。例如,比如说,一个在12:04(即事件时间)产生的词可能在12:11被应用程序收到。应用程序应该使用12:04而不是12:11的时间来更新12:00-12:10窗口的旧计数。这在我们基于窗口的分组中自然而然地发生了:结构化流可以在很长一段时间内维持部分聚合的中间状态,这样延时数据可以正确地更新旧窗口的聚合,如下图所示。
image.png
然而,要想连续几天运行这个查询,系统有必要限制其积累的中间内存状态的数量。这意味着系统需要知道何时可以将一个旧的聚合体从内存状态中删除,因为应用程序不会再收到该聚合体的后期数据。为了实现这一点,在Spark 2.1中,我们引入了 水印,让引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态。你可以通过指定事件时间列和预计数据在事件时间方面的晚期阈值来定义查询的水印。对于从时间T开始的特定窗口,引擎将保持状态,并允许迟到的数据更新状态,直到(引擎看到的最大事件时间-迟到阈值>T)。换句话说,在阈值内的晚期数据将被聚合,但晚于阈值的数据将开始被丢弃(具体保证见后面的章节)。让我们通过一个例子来理解这一点。我们可以使用withWatermark()在前面的例子中轻松地定义水印,如下图所示。

  1. import spark.implicits._
  2. val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
  3. // Group the data by window and word and compute the count of each group
  4. val windowedCounts = words
  5. .withWatermark("timestamp", "10 minutes")
  6. .groupBy(
  7. window($"timestamp", "10 minutes", "5 minutes"),
  8. $"word")
  9. .count()

在这个例子中,我们将查询的水印定义在列 “timestamp” 的值上,并将 “10分钟 “定义为 允许数据迟到的阈值。如果这个查询在 更新输出模式 下运行(在后面的输出模式部分讨论),引擎将持续更新结果表中的一个窗口的计数,直到该窗口时间比水印的大,即比列 “timestamp “中的当前事件时间晚10分钟。下面是一个例子。
image.png
如图所示,引擎跟踪的最大事件时间是蓝色虚线,而在每次触发开始时设置为(最大事件时间-‘10分钟’)的水印是红线。例如,当引擎观察到数据(12:14,狗)时,它将下一次触发的水印设置为12:04。这个水印让引擎在额外的10分钟内保持中间状态,以允许后期数据被计算。例如,数据(12:09,猫)是失序的,也是晚的,它落在12:00-12:10和12:05-12:15的窗口。由于,它仍然领先于触发器中的水印12:04,引擎仍然保持中间计数的状态,并正确更新相关窗口的计数。然而,当水印被更新到12:11时,窗口(12:00 - 12:10)的中间状态被清除,所有随后的数据(例如(12:04,驴))被认为是 “太晚”,因此被忽略。请注意,在每次触发后,更新的计数(即紫色行)作为触发输出被写入 Sink 中,这是由 更新模式 决定的。
一些 Sink(如文件)可能不支持更新模式所要求的细粒度更新。为了解决这个问题,我们也支持 追加模式,即只将最终的计数写入 Sink 中。如下图所示:
【注意】在一个 非流式 数据集上使用 withWatermark 是不可行的。由于水印不应该以任何方式影响任何批次的查询,我们将直接忽略它。
image.png
与前面的 更新模式 类似,引擎保持每个窗口的中间计数。然而,部分计数不会被更新到结果表,也不会被写入 Sink 中。引擎等待 “10分钟 “来计算迟到的日期,然后放弃窗口<水印>的中间状态,并将最终计数附加到结果表/汇中。例如,只有在水印更新为12:11后,窗口12:00-12:10的最终计数才被追加到结果表。

3.2.2.2 水印触发清除聚合状态的条件

需要注意的是,在聚合查询中,水印 必须满足以下条件才能清除状态(截止到Spark 2.1.1,将来可能会有变化)。

  • 输出模式必须是 AppendUpdate 。完整模式要求保留所有的聚合数据,因此不能使用水印来放弃中间状态。关于每种输出模式的语义的详细解释,请参见输出模式部分。
  • 聚合必须有事件时间列,或者事件时间列的窗口。
  • withWatermark 必须与聚合中使用的时间戳列在同一列上调用。例如,df.withWatermark(“time”, “1 min”).groupBy(“time2”).count() 在 Append 输出模式下是无效的,因为水印是在与聚合列不同的列上定义的。
  • withWatermark 必须在聚合之前被调用,才能触发水印。例如,df.groupBy(“time”).count().withWatermark(“time”, “1 min”) 在 Append 输出模式下是无效的。

    3.2.2.3 带水印的聚合的语义保证
  • 水印延迟(用withWatermark设置)为 “2小时”,保证引擎永远不会放弃任何延迟少于2小时的数据。换句话说,任何落后于(就事件时间而言)最新处理的数据少于2小时的数据,都保证会被汇总

  • 然而,这个保证只在一个方向上是严格的。延迟2小时以上的数据不保证会被丢弃;它可能会也可能不会被汇总。数据延迟的越多,引擎处理它的可能性就越小。

    3.2.3 聚合操作

    结构化流支持将一个流式 DataFrame/DataSet 与一个静态数据集/数据框以及另一个流式 DataFrame/DataSet 相连接。流式连接的结果是 增量生成 的,类似于上一节中的流式聚合的结果。在本节中,我们将探讨在上述情况下支持哪种类型的连接(即内部、外部等)。请注意,在所有支持的连接类型中,与流式 DataFrame/DataSet 的连接结果将与包含流中相同数据的静态 DataFrame/DataSet 完全相同。
    ……
    流式关联查询的关联矩阵:
    image.png
    关于支持的连接的其他细节:

  • 连接可以是 级联 的,也就是说,你可以做 df1.join(df2, …).join(df3, …).join(df4, ….)。

  • 从Spark 2.3开始,只有当查询处于 Append输出模式 时,你才能使用 join。还不支持其他的输出模式。
  • 从Spark 2.3开始,你不能在连接之前使用其他非map类操作。下面是一些不能使用的例子。
  • 不能在连接前使用流式聚合。
  • 在连接前不能使用 mapGroupsWithStateflatMapGroupsWithState 的更新模式。

    3.2.4 流式重复数据删除

    你可以使用事件中的唯一标识符来重复删除数据流中的记录。这与使用唯一标识符列的静态重复数据删除完全相同。查询将从以前的记录中存储必要的数据量,从而可以过滤重复的记录。与聚合类似,你可以在有或没有水印的情况下使用重复数据删除。

  • 有水印:如果对重复记录到达的时间有一个上限,那么你可以在事件时间列上定义一个水印,并使用指南和事件时间列进行重复计算。查询将使用水印从过去的记录中删除旧的状态数据,这些记录预计不会再有任何重复的记录。这就 限制**了查询需要维护的状态的数量**。

  • 没有水印:由于对重复记录可能到来的时间没有限制,查询将所有过去记录的数据存储为状态。 ```scala val streamingDf = spark.readStream. … // columns: guid, eventTime, …

// 无水印状态的重复数据删除 streamingDf.dropDuplicates(“guid”)

// 有水印状态的重复数据删除(限制了维护状态数据的量) streamingDf .withWatermark(“eventTime”, “10 seconds”) .dropDuplicates(“guid”, “eventTime”)

  1. <a name="lG5BR"></a>
  2. #### 3.2.5 处理多个水印的策略
  3. 一个流式查询可以有多个输入流,它们被联合或连接在一起。每个输入流都可以有一个不同的迟到数据阈值,需要容忍有状态的操作。你在每个输入流上使用withWatermarks("eventTime", delay)指定这些阈值。例如,考虑一个在inputStream1和inputStream2之间有流-流连接的查询:
  4. ```scala
  5. inputStream1.withWatermark("eventTime1", "1 hour")
  6. .join(inputStream2.withWatermark("eventTime2", "2 hours"), joinCondition)

在执行查询时,结构化流式数据单独跟踪每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并与它们一起选择一个全局水印,用于有状态操作。默认情况下,选择最小值作为全局水印,因为它可以确保在其中一个流落后于其他流的情况下(例如,其中一个流由于上游故障而停止接收数据),没有数据被意外地丢弃为太晚。换句话说,全局水印将以最慢的流的速度安全地移动,查询输出将相应地延迟。
然而,在某些情况下,你可能希望得到更快的结果,即使这意味着放弃最慢的流中的数据。从Spark 2.4开始,你可以通过设置SQL配置 spark.sql.streaming.multipleWatermarkPolicymax (默认为 min ),将多重水印策略设置为选择最大值作为全局水印。这让全局水印以最快的流的速度移动。然而,作为一个副作用,来自较慢数据流的数据将被积极地丢弃。因此,请谨慎地使用这个配置。

3.2.6 任意的有状态操作

许多用例需要比聚合更高级的 有状态操作。例如,在许多用例中,你必须从事件的数据流中跟踪会话。为了实现这样的会话化,你必须将任意类型的数据保存为状态,并在每个触发器中使用数据流事件对状态进行任意操作。从 Spark 2.2 开始,这可以通过操作 mapGroupsWithState 和更强大的操作flatMapGroupsWithState 来实现。这两个操作都允许你在分组的数据集上应用用户定义的代码来更新用户定义的状态。关于更具体的细节,请看API文档(Scala/Java)和例子(Scala/Java)。

3.2.7 不支持的操作

有一些 DataFrame/DataSet 操作不支持流式 DataFrame/DataSet。其中一些如下:

  • 流式数据集还不支持多重流式聚合(即流式DF上的聚合链)。
  • 流式数据集上不支持限制和取前N行。
  • 不支持流式数据集上的去重操作。
  • 流式数据集上的排序操作只有在聚合后和完整输出模式下才被支持。
  • 流式数据集上不支持少数类型的外部连接。更多细节请参见 “连接操作 “章节中的支持矩阵。

此外,有一些 DataSet 方法在流式数据集上不起作用。它们是会立即运行查询并返回结果的动作,这在流式数据集上是没有意义的。相反,这些功能可以通过明确地启动流式查询来完成,如下:

  • count() :不能从流式数据集中返回一个单一的计数。取而代之的是使用 ds.groupBy().count(),它返回一个包含运行计数的流式数据集。
  • foreach() : 取而代之的是使用 ds.writeStream.foreach(…) (见下一节)。
  • show() :取而代之的是使用 Console sink(见下一节)。

如果你尝试这些操作,你会看到一个 AnalysisException,如 “操作XYZ不支持流式数据帧/数据集”。虽然其中一些操作可能会在Spark的未来版本中得到支持,但还有一些操作从根本上说很难在流式数据上有效实现。例如,不支持对输入流进行排序,因为它需要跟踪流中收到的所有数据。因此,这从根本上来说是很难有效执行的。

3.3 开启流式查询

一旦你定义了最终结果的 DataFrame/Dataset,剩下的就是让你开始流式计算。要做到这一点,你必须使用通过Dataset.writeStream() 返回的 DataStreamWriter(Scala/Java/Python文档)。你必须在这个接口中指定以下一项或多项内容。

  • 输出 Sink 的细节:数据格式、位置等。
  • 输出模式:指定什么会被写入输出 Sink 中。
  • 查询名称:可选择指定一个唯一的查询名称,以便识别。
  • 触发区间:可选地,指定触发时间间隔。如果没有指定,系统将在前一个处理过程完成后立即检查新数据的可用性。如果因为之前的处理没有完成而错过了触发时间,那么系统将立即触发处理。
  • 检查点位置:对于一些可以保证端到端容错的输出汇,指定系统将写入所有检查点信息的位置。这应该是HDFS兼容的容错文件系统中的一个目录。检查点的语义将在下一节详细讨论。

    3.3.1 输出模式

    以下是一些类型的输出模式:

  • 附加模式(默认):这是默认模式,只有自上次触发后添加到结果表中的新行才会被输出到汇中。这只支持那些添加到结果表的行永远不会改变的查询。因此,这种模式保证每条记录只被输出一次(假设水槽是容错的)。例如,只有select、where、map、flatMap、filter、join等的查询才支持Append模式。

  • 完整模式:每次触发后,整个结果表将被输出到汇中。这对聚合查询来说是支持的。
  • 更新模式:(从Spark 2.1.1开始可用)只有结果表中自上次触发后更新的行才会被输出到汇中。更多信息将在未来版本中添加。

不同流式查询支持不同的输出模式,以下是支持矩阵:
image.png

3.3.2 输出 Sinks

3.3.3 触发器

流式查询的触发器设置定义了流式数据处理的时间,查询是作为具有固定批处理间隔的微批查询执行还是作为连续处理查询执行。以下是所支持的不同种类的触发器。

触发器类型 描述信息
未指定(默认) 如果没有明确指定触发器设置,那么默认情况下,查询将在微批模式下执行,只要前一个微批处理完毕,就会产生下一批次微批处理。
固定间隔的微批次 该查询将以微批模式执行,微批将在用户指定的时间间隔内被启动。
- 如果前一个微批处理在该间隔内完成,那么引擎将等待该间隔结束后再启动下一个微批。
- 如果前一个微批的完成时间超过了间隔时间(即如果错过了间隔边界),那么下一个微批将在前一个微批完成后立即启动(即不会等待下一个间隔边界)。
- 如果没有新的数据,那么就不会启动微批处理。
一次性的微型批次 该查询将只执行 ”仅一次“ 微批处理所有可用的数据,然后自行停止。这在你想定期启动一个集群,处理自上一期以来的所有可用数据,然后关闭集群的情况下是很有用的。在某些情况下,可以显著的节约资源成本。
连续的,有固定检查点间隔的
(实验特性)
该查询将在新的低延迟、连续处理模式下执行。请在下面的连续处理部分阅读更多相关内容。
import org.apache.spark.sql.streaming.Trigger

// Default trigger:默认微批模式启动
df.writeStream
  .format("console")
  .start()

// ProcessingTime trigger with two-seconds micro-batch interval:固定时间间隔的触发器
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger:仅一次触发器
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Continuous trigger with one-second checkpointing interval:检查点为1s的连续触发器
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()

3.4 管理流式查询

当查询开始时创建的 StreamingQuery 对象可以用来监控和管理查询:

val query = df.writeStream.format("console").start()   // get the query object
// 从检查点数据中获取本次执行查询的唯一标识符,该标识符在重新启动时持续存在。
query.id     
// 得到本次查询运行的唯一ID,这个ID将在每次启动/重启时产生。
query.runId
// 得到自动生成的或用户指定的名称
query.name
// 打印查询的详细说明
query.explain()
// 停止查询动作
query.stop()
// 挂起流式查询直到其被终止,执行 stop() 方法或者触发 error 报错
query.awaitTermination()
// 如果查询因错误而被终止,则为异常。
query.exception
// 本次查询最新进展的数组(包含最近几个批次的状态信息)
query.recentProgress
// 本次流式查询的最新进展情况(仅包含最后一个批次的状态信息)
query.lastProgress

你可以在 一个 SparkSession 中启动任何数量的查询。它们都将 并发 地运行,共享 集群资源。你可以使用 sparkSession.streams() 来获得 StreamingQueryManager (Scala/Java/Python文档),它可以用来管理当前活动的查询。

val spark: SparkSession = ...
// 获取当前活动中的流式查询列表
spark.streams.active
// 根据唯一ID获取查询对象
spark.streams.get(id)
// 挂起所有流式查询,直到其中任意一个终止
spark.streams.awaitAnyTermination()

3.5 监控流式查询

有多种方法来监控活动的流式查询。你可以使用 Spark 的 Dropwizard Metrics 支持将指标推送到外部系统,或者以编程方式访问它们。

3.5.1 交互式读取指标参数

你可以使用 streamingQuery.lastProgress()streamingQuery.status() 直接获得活动查询的当前状态和指标。 lastProgress() 在Scala和Java中返回一个 StreamingQueryProgress 对象,在Python中返回一个有相同字段的字典。它有关于流的最后一次触发所取得的进展的所有信息:处理了什么数据,处理率是多少,延迟等。还有一个 streamingQuery.recentProgress ,它返回最后几个进度的数组。
此外, streamingQuery.status() 在Scala和Java中返回一个 StreamingQueryStatus 对象,在 Python 中返回一个具有相同字段的字典。它给出了关于查询正在立即进行的信息:触发器是否激活,数据是否正在处理,等等。

val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.
{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/

println(query.status)

/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/

3.5.2 使用异步API以编程方式报告指标

你也可以通过附加一个 StreamingQueryListener (Scala/Java文档)来异步监控与SparkSession相关的所有查询。一旦你用 sparkSession.streams.attachListener() 附加了你的自定义StreamingQueryListener对象,当查询开始和停止时,以及当活动查询有进展时,你会得到回调。下面是一个例子。

val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})

3.5.3 使用Dropwizard报告指标

Spark 支持使用 Dropwizard库 报告指标。为了使结构化流查询的指标也能被报告,你必须在SparkSession中明确启用配置 spark.sql.streaming.metricsEnabled

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")

启用此配置后,在 SparkSession 中启动的所有查询将通过 Dropwizard 向任何已配置的 Sinks(例如Ganglia、Graphite、JMX等)报告指标。

3.6 基于检查点进行故障恢复

在发生故障或故意关机的情况下,你可以恢复以前的查询进度和状态,并在其停止的地方继续。这是用 检查点 写头日志 完成的。你可以给查询配置一个检查点位置,查询将把所有的进度信息(即每个触发器中处理的偏移量范围)和运行的总量(如快速例子中的字数)保存到检查点位置。这个检查点位置必须是HDFS兼容文件系统中的一个路径,可以在启动查询时作为DataStreamWriter的一个选项来设置。

aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()

3.7 流式查询变更后的恢复语义

流式查询的哪些变化在同一检查点位置重启时是被允许的,这里有一些限制。这里有几种变化是不允许的,或者变化的效果没有明确的定义。对于以下所有的这些:

4. 持续处理过程

4.1 实验特性

连续处理是Spark 2.3中引入的一种新的、实验性的流式执行模式,可以实现低(约1毫秒)的端到端延迟和至少一次的容错保证。这与默认的微批处理引擎相比,后者可以实现完全一次的保证,但最多可以实现~100ms的延迟。对于某些类型的查询(在下面讨论),你可以选择在哪种模式下执行,而不需要修改应用逻辑(即不改变DataFrame/Dataset的操作)。
要在连续处理模式下运行支持的查询,你所需要做的就是指定一个连续触发器,并将所需的检查点间隔作为参数。比如说

import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", "10")
  .option("")

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start()

1秒的检查点间隔意味着连续处理引擎将每秒记录查询的进展。产生的检查点的格式与微批处理引擎兼容,因此任何查询都可以用任何触发器重新启动。例如,一个用微批模式启动的支持的查询可以在连续模式下重新启动,反之亦然。请注意,任何时候你切换到连续模式,你将得到至少一次的容错保证。

4.2 支持的查询

从Spark 2.3开始,在连续处理模式下只支持以下类型的查询:
【Operations 算子】

  • 在连续模式下,只支持类似 map 的Dataset/DataFrame操作,也就是说,只支持映射(select、map、flatMap、mapPartitions等)和过滤(where、filter等)。
  • 除了聚合函数(因为还不支持聚合)、current_timestamp()和current_date()(使用时间的确定性计算具有挑战性),所有的SQL函数都支持。

【Sources 算子】

  • Kafka Source:所有选项都支持。
  • Rate Source:适合于测试。在连续模式下支持的选项只有numPartitions和rowsPerSecond。

【Sinks 算子】

  • Kafka Sink:所有选项都支持。
  • Memory Sink:有利于调试。
  • Console Sink:有利于调试。所有选项都支持。注意,控制台将打印你在连续触发中指定的每个检查点间隔。

关于它们的更多细节,请参见输入 Source 和输出 Sink 部分。虽然控制台汇很适合测试,但用Kafka作为 Source 和 Sink 可以最好地观察端到端的低延迟处理,因为这允许引擎处理数据,并在输入主题中的输入数据可用的几毫秒内使结果在输出主题中可用。

4.3 注意事项

连续处理引擎会启动多个长期运行的任务,不断从 source 读取数据,进行处理,并不断写入 sink 中。查询所需的任务数量取决于查询能从源头并行读取多少个分区。因此,在开始一个连续处理查询之前,你必须确保集群中有足够的核心来并行处理所有的任务。例如,如果你从一个有10个分区的Kafka主题中读取,那么集群必须有至少10个核心才能使查询取得进展。
停止一个连续的处理流可能会产生虚假的任务终止警告。这些可以被安全地忽略。
目前还没有自动重试失败的任务。任何失败都会导致查询被停止,需要从检查点手动重新启动。

5. 附加信息