<font style="color:rgb(51, 51, 51);">STREAMING</font>
模式下执行的作业可能会产生增量更新(类似于数据库中的upserts),而批处理作业只会在最后产生一个最终结果。如果正确解释,最终结果将是相同的,但达到最终结果的方式可能不同。
通过启用批处理执行,我们允许Flink应用一些仅在知道输入是有界的情况下才能进行的额外优化。例如,可以使用不同的连接/聚合(join/aggregation)策略,以及允许更高效的任务调度和故障恢复行为的不同洗牌(shuffle)实现。下面我们将介绍一些执行行为的细节。
什么时候使用批处理
<font style="color:rgb(51, 51, 51);">BATCH</font>
执行模式只能用于有界的作业/ Flink程序。有界性是数据源的一个属性,它告诉我们在执行之前是否已知该源的所有输入,或者新数据是否会出现,可能是无限的。如果所有作业的源都是有界的,那么该作业就是有界的;否则就是无界的。
<font style="color:rgb(51, 51, 51);">STREAMING</font>
执行模式可以用于有界和无界的作业。
作为经验法则,当程序是有界的时候,应该使用<font style="color:rgb(51, 51, 51);">BATCH</font>
执行模式,因为这样更高效。当程序是无界的时候,必须使用<font style="color:rgb(51, 51, 51);">STREAMING</font>
执行模式,因为只有这种模式足够通用,能够处理连续的数据流。
一个明显的例外是当你想要使用有界作业来引导某个作业的状态,然后在无界作业中使用该状态。例如,通过使用<font style="color:rgb(51, 51, 51);">STREAMING</font>
模式运行有界作业,创建一个保存点,然后在无界作业中恢复该保存点。这是一个非常特定的用例,可能很快就会过时,因为我们将允许将保存点作为<font style="color:rgb(51, 51, 51);">BATCH</font>
执行作业的附加输出生成。
另一种情况下,可能会使用<font style="color:rgb(51, 51, 51);">STREAMING</font>
模式来运行有界作业,是为了为最终将使用无界源的代码编写测试。在这些情况下,使用有界源可能更自然一些。
如何使用批处理
想要将执行模式设置为批处理有三种方式:配置文件、命令行、代码
可以修改配置文件中的execution.runtime-mode
这一项为STREAMING
/BATCH
/AUTOMATIC
来设置。其中AUTOMATIC
让系统根据数据是否有界来决定是流模式还是批模式。
可以通过命令行如下命令将程序以批处理的模式执行。
$ bin/flink run -Dexecution.runtime-mode=BATCH <jarFile>
也可以使用下面的代码。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
需要注意的是建议用户不要在程序中设置运行时模式,而是在提交应用程序时使用命令行进行设置。保持应用程序代码免配置可以提供更大的灵活性,因为相同的应用程序可以在任何执行模式下执行。
流批行为差异
任务调度 & 网络数据洗牌
Flink作业由不同的操作组成,这些操作在数据流图中连接在一起。系统决定如何在不同的进程/机器(TaskManagers)上调度这些操作的执行,以及如何在它们之间洗牌(发送)数据。 可以使用称为链式操作的特性将多个操作/算子链接在一起。Flink将一组一个或多个(链接的)算子视为调度的单位,称为任务。通常使用子任务一词来指称在多个TaskManagers上并行运行的任务的各个实例,但在这里我们只使用任务一词。
对于BATCH
和STREAMING
执行模式,任务调度和网络洗牌的工作方式不同。主要是因为我们知道在BATCH
执行模式下输入数据是有界的,这使得Flink可以使用更高效的数据结构和算法。
我们将使用这个示例来解释任务调度和网络传输之间的差异:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements(...);
source.name("source")
.map(...).name("map1")
.map(...).name("map2")
.rebalance()
.map(...).name("map3")
.map(...).name("map4")
.keyBy((value) -> value)
.map(...).name("map5")
.map(...).name("map6")
.sinkTo(...).name("sink");
操作(如map()
、flatMap()
或filter()
)之间存在1对1的连接模式,可以直接将数据传递给下一个操作,这使得这些操作可以被链式连接在一起。这意味着Flink通常不会在它们之间插入网络洗牌。
另一方面,像keyBy()或rebalance()这样的操作需要在不同的任务并行实例之间进行数据洗牌。这会引发网络洗牌的操作。
对于上述示例,Flink将操作分组为以下任务: 任务1:source、map1和map2 任务2:map3、map4 任务3:map5、map6和sink 在任务1和任务2之间以及任务2和任务3之间进行网络洗牌。这是该作业的可视化表示:
流处理
在STREAMING
执行模式下,所有的任务都需要始终在线/运行。这样可以使Flink能够立即通过整个流水线处理新记录,这对于连续和低延迟的流处理是必需的。这也意味着分配给作业的TaskManagers需要具有足够的资源来同时运行所有的任务。
网络洗牌是以管道方式进行的,也就是说记录会立即发送给下游任务,并在网络层进行一些缓冲。同样,这是必需的,因为在处理连续的数据流时,不存在可以在任务(或任务流水线)之间实例化数据的自然点(在时间上)。这与BATCH
执行模式形成对比,在BATCH
执行模式中可以实例化中间结果,如下所述。
批处理
在BATCH
执行模式下,作业的任务可以分为多个阶段,可以按顺序执行这些阶段。我们之所以能够这样做,是因为输入是有界的,因此Flink可以在继续下一个阶段之前完全处理完一个阶段的流水线。在上面的例子中,该作业将有三个阶段,对应于通过洗牌障碍分隔的三个任务。
与上述STREAMING
模式中立即发送记录给下游任务不同,阶段性处理要求Flink将任务的中间结果实例化到一些非临时存储中,使得上游任务已经离线后,下游任务可以读取这些中间结果。这会增加处理的延迟,但具有其他有趣的特性。首先,这使得Flink在发生故障时可以回溯到最新可用的结果,而不是重新启动整个作业。另一个副作用是,BATCH
作业可以使用更少的资源(以TaskManagers可用槽位为基准),因为系统可以按顺序一个接一个地执行任务。
TaskManagers会保留中间结果,至少在下游任务未消耗这些结果之前。(技术上讲,在消费的流水线区域产生其输出之前,它们将被保留。)之后,它们将被保留尽可能长的时间,以便在发生故障时可以回溯到较早的结果,具体取决于可用的空间。
State Backends / State
在STREAMING
模式下,Flink使用StateBackend
来控制状态的存储和检查点机制的工作方式。
在BATCH
模式下,配置的状态后端将被忽略。相反,针对有键操作的输入数据会按照键进行分组(使用排序),然后依次处理每个键的所有记录。这样可以同时仅保留一个键的状态。在处理下一个键时,给定键的状态将被丢弃。
数据处理顺序
在BATCH
和STREAMING
执行模式下,操作符或用户定义的函数(UDF)处理记录的顺序可能不同。
在STREAMING
模式下,用户定义的函数不应做任何关于传入记录顺序的假设。数据一到达就会立即进行处理。
在BATCH
执行模式下,有一些操作可以保证顺序。顺序可能是特定任务调度、网络洗牌和状态后端的副作用(参见上文),或者是系统的有意选择。
我们可以区分三种一般类型的输入:
- 广播输入:来自广播流的输入(也可参考广播状态)
- 常规输入:既不是广播输入也不是键控输入的输入
- 键控输入:来自键控流的输入
对于消费多种输入类型的函数或运算符,它们的处理顺序如下:
- 首先处理广播输入
- 其次处理常规输入
- 最后处理键控输入
对于从多个常规或广播输入中消费的函数(如CoProcessFunction
),Flink有权以任何顺序处理来自该类型的任何输入的数据。
对于从多个键控输入中消费的函数(如KeyedCoProcessFunction
),Flink会先处理所有来自所有键控输入的同一键的记录,然后再继续处理下一个键的记录。
事件时间/水位线
支持事件时间时,Flink的流处理运行时基于一种悲观的假设,即事件可能是无序的,即具有时间戳 t 的事件可能在具有时间戳 t+1 的事件之后出现。因此,系统无法确定在未来不会再出现具有t < T的时间戳 T 的元素。为了在保持系统实用性的同时分摊这种无序对最终结果的影响,在<font style="color:rgb(51, 51, 51);">STREAMING</font>
模式下,Flink使用了一种称为Watermarks的启发式方法。具有时间戳T的水位线表示不会再出现具有t < T的元素。
在<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下,输入数据集是事先已知的,因此不需要使用这种启发式方法,至少可以通过时间戳对元素进行排序,以便按时间顺序处理它们。对于熟悉流处理的读者来说,在<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下我们可以假设存在“完美的水位线”。
基于上述情况,在<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下,我们只需要在每个键的输入结束时或者在输入流没有键时,在输入的末尾设置一个<font style="color:rgb(51, 51, 51);">MAX_WATERMARK</font>
。根据这个方案,所有注册的计时器将在时间的末尾触发,并且用户定义的WatermarkAssigners或WatermarkGenerators将被忽略。然而,仍然很重要指定WatermarkStrategy,因为它的TimestampAssigner仍然用于为记录分配时间戳。
处理时间(Processing Time)
处理时间(Processing Time)是记录被处理的机器上的挂钟时间,即在记录被处理的具体时刻。根据这个定义,基于处理时间进行的计算结果是不可重现的。这是因为同一条记录被处理两次将具有两个不同的时间戳。 尽管如上所述,在<font style="color:rgb(51, 51, 51);">STREAMING</font>
模式下使用处理时间是有用的。原因在于流水线通常以实时方式获取其无界输入,因此事件时间和处理时间之间存在相关性。此外,由于前面提到的原因,在<font style="color:rgb(51, 51, 51);">STREAMING</font>
模式下1小时的事件时间通常几乎等于1小时的处理时间或挂钟时间。因此,使用处理时间可以用于提前(不完全)触发,从而提供关于预期结果的提示。
然而,在批处理世界中不存在这种相关性,其中输入数据集是静态的且事先已知。鉴于此,在<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下,我们允许用户请求当前处理时间并注册处理时间计时器,但是与事件时间一样,所有计时器都将在输入的末尾触发。
从概念上讲,我们可以想象,在作业执行过程中,处理时间不会推进,而是在整个输入被处理时快进到时间的末尾。
故障恢复(Failure Recovery)
在<font style="color:rgb(51, 51, 51);">STREAMING</font>
执行模式下,Flink使用检查点(checkpoint)来进行故障恢复。可以查看检查点文档以了解如何配置和使用检查点。还有一节关于通过状态快照实现容错性的入门部分,以更高层次的概念解释故障恢复的原理。
检查点故障恢复的一个特性是,Flink会从检查点重新启动所有正在运行的任务,以防发生故障。这可能比在<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下所需的操作更加耗费资源(如下面所述),这也是在可能的情况下应该使用<font style="color:rgb(51, 51, 51);">BATCH</font>
执行模式的原因之一。
在<font style="color:rgb(51, 51, 51);">BATCH</font>
执行模式下,Flink会尝试回溯到之前仍有中间结果可用的处理阶段。潜在地,只有发生故障的任务(或它们在图中的前置任务)需要重新启动,这可以改善处理效率和作业的整体处理时间,与从检查点重新启动所有任务相比。
重要因素
在<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下,与经典的<font style="color:rgb(51, 51, 51);">STREAMING</font>
执行模式相比,某些功能可能无法按预期工作。某些功能的工作方式可能稍有不同,而其他功能则不受支持。
<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下的行为变化: 在<font style="color:rgb(51, 51, 51);">STREAMING</font>
模式下,例如<font style="color:rgb(51, 51, 51);">reduce()</font>
或<font style="color:rgb(51, 51, 51);">sum()</font>
等“滚动”操作会为每个新记录发出增量更新。在<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下,这些操作不会滚动。它们只会发出最终结果。
- 检查点(Checkpointing)和任何依赖检查点的操作都无法工作。
- Iterations
检查点(Checkpointing)
如上所述,批处理程序的故障恢复不使用检查点(Checkpointing)。 重要的是要记住,由于没有检查点,某些功能(例如CheckpointListener)以及由此导致的Kafka的EXACTLY_ONCE模式或File Sink的OnCheckpointRollingPolicy等功能将无法使用。如果您需要在BATCH模式下使用工作的事务性sink。 仍然可以使用所有状态原语,只是用于故障恢复的机制将不同。编写自定义操作符
:::info
注意:自定义操作符是Apache Flink的高级使用模式。对于大多数用例,考虑使用(有键的)ProcessFunction。:::
在编写自定义操作符时,重要的是记住<font style="color:rgb(51, 51, 51);">BATCH</font>
执行模式的假设。否则,一个在<font style="color:rgb(51, 51, 51);">STREAMING</font>
模式下正常工作的操作符可能会在<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下产生错误的结果。操作符从未局限于特定的键,这意味着它们可以看到Flink尝试利用的一些BATCH处理属性。
首先,您不应该在操作符中缓存最后一个水位线。在<font style="color:rgb(51, 51, 51);">BATCH</font>
模式下,我们按键逐个处理记录。因此,水位线在每个键之间从<font style="color:rgb(51, 51, 51);">MAX_VALUE</font>
切换到<font style="color:rgb(51, 51, 51);">MIN_VALUE</font>
。您不应该假设水位线在操作符中始终是递增的。出于同样的原因,计时器将首先按键顺序触发,然后在每个键内按时间戳顺序触发。此外,不支持手动更改键的操作。