4.1 Spark物理执行计划概览

接下来讨论如何将应用的逻辑处理流程转化为物理执行计划,使得应用程序可以被分布执行。

如图4.1所示,我们构建了一个ComplexApplication应用,该应用包含map()、partitionBy()、union()和join()等多种数据操作。

image.png
图4.1 ComplexApplication应用的逻辑处理流程

4.2 Spark物理执行计划生成方法

执行步骤

Spark具体采用3个步骤来生成物理执行计划,具体如下所述。

image.png
图4.2 ComplexApplication应用生成的物理执行图

(1)根据action()操作顺序将应用划分为作业(job)。

这一步主要解决何时生成job,以及如何生成job逻辑处理流程的问题。当应用程序出现action()操作时,如resultRDD.action(),表示应用会生成一个job,该job的逻辑处理流程为从输入数据到resultRDD的逻辑处理流程。

(2)根据ShuffleDependency依赖关系将job划分为执行阶段(stage)。

对于每个job,从其最后的RDD(图4.2中连接results的MapPartitionsRDD)往前回溯整个逻辑处理流程,如果遇到NarrowDependency,则将当前RDD的parent RDD纳入,并继续往前回溯。当遇到ShuffleDependency时,停止回溯,将当前已经纳入的所有RDD按照其依赖关系建立一个执行阶段,命名为stage i。

(3)根据分区计算将各个stage划分为计算任务(task)。

执行第2步后,我们可以发现整个job被划分成了大小适中(相比想法2中的划分方法)、逻辑分明的执行阶段stage。接下来的问题是如何生成计算任务。我们之前的想法是每个分区上的计算逻辑相同,而且是独立的,因此每个分区上的计算可以独立成为一个task。Spark也采用了这个策略,根据每个stage中最后一个RDD的分区个数决定生成task的个数。

如图4.3中粗箭头所示,在stage 2中,每个task负责ShuffledRDD=>CoGroupedRDD=>MapPartitionsRDD=>MapPartitionsRDD中一个分区的计算。

相关问题

经过以上3个步骤,Spark可以将一个应用的逻辑处理流程划分为多个job,每个job又可以划分为多个stage,每个stage可以生成多个task,而同一个阶段中的task可以同时分发到不同的机器并行执行。看起来已经很完美了,但还有3个执行方面的问题:一个应用生成了多个job、stage和task,如何确定它们的计算顺序?task内部如何存储和计算中间数据?前后stage中的task间如何传递和计算数据?

(1)job、stage和task的计算顺序

job的提交时间与action()被调用的时间有关,当应用程序执行到rdd.action()时,就会立即将rdd.action()形成的job提交给Spark。job的逻辑处理流程实际上是一个DAG图,经过stage划分后,仍然是DAG图形状。每个stage的输入数据要么是job的输入数据,要么是上游stage的输出结果。因此,计算顺序从包含输入数据的stage开始,从前到后依次执行,仅当上游的stage都执行完成后,再执行下游的stage。在图4.3中,stage 0和stage 1由于都包含了job的输入数据,两者都可以先开始计算,仅当两者都完成后,stage 2才开始计算。stage中每个task因为是独立而且同构的,可以并行运行没有先后之分。

(2)task内部数据的存储与计算问题(流水线计算)

讨论完job、stage、task的执行顺序后,我们聚焦在task内部,讨论task如何计算每个RDD中的数据。

Spark采用“流水线”式计算来提高task的执行效率,减少内存使用量。这也是Spark可以在有限内存中处理大规模数据的原因。然而,对于某些需要聚合中间计算结果的操作,还是需要占用一定的内存空间,也会在一定程度上影响流水线计算的效率。

观察RDD分区之间的关系,发现上游分区包含的record和下游分区包含的record之间经常存在一对一的数据依赖关系。例如,在图4.3中的第1个计算模式(pattern)中,f ()和g()函数每次读取一个record,计算并生成一个新record,这类型的操作包括map()、filter()等。也就是说,f ()函数在计算record1的时候,并不需要知道record2是什么,同样,g()函数在计算record1′的时候也只需要f ()函数输出record1′,并不需要f ()函数此时就计算出record2′。因此,在第1个pattern中执行f ()和g()函数的时候,可以采用以下步骤进行“流水线”式的计算:

  • 读取record1=>f (record1)=>record1′=>g(record1′)=>输出record1′′
  • 读取record2=>f (record2)=>record2′=>g(record2′)=>输出record2′′
  • 读取record3=>f (record3)=>record3′=>g(record3′)=>输出record3′′

“流水线”式计算的好处是可以有效地减少内存使用空间,在task计算时只需要在内存中保留当前被处理的单个record即可,不需要保存其他record或者已经被处理完的record。例如,在第1个pattern中,没有必要在执行f (record1)之前将record2和record3提前算出来放入内存中。

image.png
图4.3 task内部中间数据的4种计算模式,即根据上游数据计算下游数据的不同方式

对于其他类型的操作,是否还可以采用“流水线”式计算呢?第2个pattern中的g()函数、第3个pattern中的f ()函数、第4个pattern中的f ()和g()函数都需要一次性读取上游分区中所有的record来计算。这样的函数主要出现在mapPartitions(func f),zipPartitions(func g)等操作中。

在第2个pattern中,由于f ()函数仍然是一一映射的,所以仍然可以采用“流水线”式计算,计算流程如下:

  • 读取record1=>f (record1)=>record1′=>g(record1′)=>record1′进入g()函数中的iter.next()进行计算=>g()函数将计算结果存入g()函数中的list。
  • 读取record2=>f (record2)=>record2′=>g(record2′)=>record2′进入g()函数中的iter.next()进行计算=>g()函数将计算结果存入g()函数中的list。
  • 读取record3=>f (record3)=>record3′=>g(record3′)=>record3′进入g()函数中的iter.next()进行计算=>g()函数将计算结果存入g()函数中的list。
  • g()函数一条条输出list中的record。

从计算流程可以看到,f ()函数每生成一条数据,都进入类似上面mapPartitions()的例子g()函数的iter.next()中进行计算,g()函数需要在内存中保存这些中间计算结果,并在输出时将中间结果依次输出。当然,有些g()函数逻辑简单,不需要使用数据结构来保存中间结果,如求record的max值,只需要保存当前最大的record即可。

在第3个pattern中,由于f ()函数需要将[record1,record2,record3]都算出后才能计算得到[record1′,record2′,record3′],因此会先执行f ()函数,完成后再计算g()函数。实际的执行过程:首先执行f ()函数算出[record1′,record2′,record3′],然后使用g()函数依次计算g(record1′)=>record1′′,g(record2′)=>record2′′,g(record3′)=>record3′′。也就是说,f ()函数的输出结果需要保存在内存中,而g()函数计算完每个record′并得到record′′后,可以对record′进行回收。

在第4个pattern中,计算顺序仍然是从前到后,但不能进行record的“流水线”式计算。与第3个pattern类似,f ()函数需要一次性读取[record1,record2,record3]后才能算出[record1′,record2′,record3′],同样,g()函数需要一次性读取[record1′,record2′,record3′]且计算后才能输出[record1′′,record2′′,record3′′]。这两个函数只是依次执行,“流水线”式计算退化到“计算-回收”模式:每执行完一个操作,回收之前的中间计算结果。

(3)task间的数据传递与计算问题

讨论了task内部数据计算后,还有一个问题是不同stage之间的task如何传递数据进行计算。

回顾一下,stage之间存在的依赖关系是ShuffleDependency,而ShuffleDependency是部分依赖的,也就是下游stage中的每个task需要从parent RDD的每个分区中获取部分数据。

ShuffleDependency的数据划分方法包括Hash划分、Range划分等,也就是要求上游stage预先将输出数据进行划分,按照分区存放,分区个数与下游task的个数一致,这个过程被称为“Shuffle Write”。

按照分区存放完成后,下游的task将属于自己分区的数据通过网络传输获取,然后将来自上游不同分区的数据聚合在一起进行处理,这个过程被称为“Shuffle Read”。

总的来说,不同stage的task之间通过Shuffle Write+Shuffle Read传递数据,至于如何具体进行Shuffle操作,分区数据是写内存还是磁盘,如何对这些数据进行聚合,这些问题将在后续章节中详细介绍。

stage和task命名方式

在MapReduce中,stage只包含两类:map stage和reduce stage,map stage中包含多个执行map()函数的任务,被称为map task;reduce stage中包含多个执行reduce()函数的任务,被称为reduce task。

在Spark中,stage可以有多个,有些stage既包含类似reduce()的聚合操作又包含map()操作,所以一般不区分是map stage还是reduce stage,而直接使用stage i来命名,只有当生成的逻辑处理流程类似MapReduce的两个执行阶段时,我们才会依据习惯区分map/reduce stage。

虽然在Spark中一般不区分map/reduce stage,但可以对stage中的task使用不同的命名,如果task的输出结果需要进行Shuffle Write,以便传输给下一个stage,那么这些task被称为ShuffleMapTasks;而如果task的输出结果被汇总到Driver端或者直接写入分布式文件系统,那么这些task被称为ResultTasks。如图4.2所示,stage 0和stage 1中的task是ShuffleMapTasks,stage 2中的task是ResultTasks,直接输出结果。

快速了解一个应用的物理执行计划

我们可以利用Spark UI界面提供的信息快速分析出Spark应用的物理执行图。

(1)查看job信息:例如,在ComplexApplication的用户代码中,只包含一个action()操作(foreach()操作),因此只生成一个job。我们可以在Spark job界面看到foreach()生成的job信息,如图4.5所示。

image.png
图4.4 ComplexApplication应用生成的job信息

(2)查看job包含的stage:从Details for job 0界面可以看到该job包含3个stage,如图4.6所示,其中stage 0包含3个task,共Shuffle Write了376.0B,stage 1包含4个task,共Shuffle Write了988.0B,而stage 2包含3个task,一共Shuffle Read了1364.0B=376.0B+988.0B。

image.png
图4.5 ComplexApplication应用生成的stage信息

单击“DAG Visualization”查看stage之间的数据依赖关系,如图4.6所示。

image.png
图4.6 ComplexApplication应用中stage之间的依赖关系

图4.6展示了ComplexApplication应用的job被划分为3个stage,每个stage包含一个或多个数据操作,每个黑色实心圆圈代表一个RDD。但这个图稍显混乱,stage 0中parallelize操作生成的RDD应该是被stage 2中的partitionBy处理的,与stage 1中的parallelize无关,也就是stage 0到stage 2的横箭头不应该贯穿stage 1。如果想进一步了解黑色实心圆圈代表哪些RDD,则可以进入stage的UI界面或者直接单击图4.6中的stage,最终可以看到图4.7这样的stage结构图,可以看到每个stage的信息与我们之前分析的一致。与图4.2不同的是,图4.7详细展示了每个操作会生成哪些RDD(如join()操作生成了CoGroupedRDD及两个MapPartitionsRDD),但没有展示stage之间的连接关系。

image.png
图4.7 ComplexApplication应用中每个stage包含的操作及生成的RDD信息

(3)查看每个stage包含的task:进入Details for stage i的界面可以看到每个stage包含的task信息。如图4.8所示,stage 0包含3个task,每个task都进行了Shuffle Write,写入了2~3个record,也就是说Spark UI中也会统计Shuffle Write/Read的record数目。

image.png
图4.8 ComplexApplication应用中stage 0包含的task信息

如图4.9所示,stage 1包含4个task,每个task都进行了Shuffle Write,写入了2个record。

image.png
图4.9 ComplexApplication应用中stage 1包含的task信息

如图4.10所示,stage 2包含3个task,每个task从上游的stage 0/1那里Shuffle Read了5~6个record。

image.png
图4.10 ComplexApplication应用中stage 2包含的task信息

4.3 常用数据操作生成的物理执行计划

宽依赖(ShuffleDependency)和窄依赖(NarraowDependency)的区别是child RDD的各个分区中的数据是否完全依赖其parent RDD的一个或者多个分区。完全依赖指parent RDD中的一个分区不需要进行划分就可以流入child RDD的分区中。如果是完全依赖,那么数据依赖关系是窄依赖。如果是不完全依赖,也就是parent RDD的一个分区中的数据需要经过划分(如HashPartition或者RangePartition)后才能流入child RDD的不同分区中,那么数据依赖关系是宽依赖。

对于NarrowDependency,parent RDD和child RDD的分区之间是完全依赖的,我们可以将parent RDD和child RDD直接合并为一个stage。在合成的stage中(图4.11中为stage 0),对于OneToOneDependency,每个task读取parent RDD中的一个分区,并计算child RDD中的一个分区。对于ManyToOneDependency或者ManyToManyDependency,每个task读取parent RDD中多个分区,并计算出child RDD中的一个分区。

image.png
图4.11 NarrowDependency的stage划分原则

对于ShuffleDependency,如图4.12所示,将parent RDD和child RDD进行划分,形成两个或多个stage,每个stage产生多个task,stage之间通过Shuffle Write和Shuffle Read来传递数据。

image.png
图4.12 ShuffleDependency的stage划分原则
_
接下来将常用数据操作归类到这几种依赖关系中,并举例详细介绍其物理执行计划。

OneToOneDependency类型的操作

image.png

image.png
图4.13 OneToOneDependency数据依赖划分举例

图4.13展示了flatMap()和mapPartitionsWithIndex()操作的stage和task划分图,这两个操作都生成了一个stage,stage中不同颜色的箭头表示不同的task。每个task负责处理一个分区,进行流水线计算,且计算逻辑清晰。这两个操作作唯一不同的是flatMap()每读入一条record就处理和输出一条,而mapPartitionsWithIndex()等到全部record都处理完后再输出record。图4.13右图中的mapPartitionsWithIndex()是计算每个分区中奇数的和及偶数的和。

RangeDependency类型的操作

image.png

image.png
图4.14 RangeDependency数据依赖划分举例

图4.14展示了在一般情况下union()操作的stage和task划分图,该操作将两个RDD合并为一个RDD,只生成了一个stage,stage中不同颜色的箭头表示不同的task,每个task负责处理一个分区。

ManyToOneDependency类型的操作

image.png

image.png
图4.15 ManyToOneDependency数据依赖划分举例
_
如图4.15所示,coalesce(shuffle=false)、特殊情况下的union()(见第3章的说明),以及zipPartitions()操作对应的数据依赖关系都是ManyToOneDependency,child RDD中的每个分区需要从parent RDD中获取所依赖的多个分区的全部数据。由于ManyToOneDependency是窄依赖,所以Spark将parent RDD和child RDD组合为一个stage,该stage生成的task个数与最后的RDD的分区个数相等。与OneToOneDependency形成的task相比,这里每个task需要同时在parent RDD中获取多个分区中的数据。

ManyToManyDependency类型的操作

image.png

image.png
图4.16 ManyToManyDependency数据依赖划分举例

如图4.16所示,cartesian()操作对应的数据依赖关系是ManyToManyDependency,child RDD中的每个分区需要从两个parent RDD中获取所依赖的分区的全部数据。虽然ManyToManyDependency形似ShuffleDependency,却属于NarrowDependency,因此Spark将parent RDD和child RDD组合为一个stage,该stage生成的task个数与最后的RDD的分区个数相等。与ManyToOneDependency形成的task相比,这里每个task需要同时在多个parent RDD中获取分区中的数据。

单一ShuffleDependency类型的操作

image.png

image.png
图4.17单一ShuffleDependency数据依赖划分举例

如图4.17所示,aggregateByKey()和sortByKey()操作形成的是单一的ShuffleDependency数据依赖关系,也就是只与一个parent RDD形成ShuffleDependency。根据划分原则,Spark将parent RDD和child RDD分开,分别形成一个stage,每个stage中的task个数与该stage中最后一个RDD中的分区个数相等。为了进行跨stage的数据传递,上游stage中的task将输出数据进行Shuffle Write,child stage中的task 通过Shuffle Read同时获取parent RDD中多个分区中的数据。与NarrowDependency不同,这里从parent RDD的分区中获取的数据是划分后的部分数据。

多ShuffleDependency类型的操作
**
image.png

image.png
图4.18 多ShuffleDependency数据依赖划分举例
_
如图4.18所示,join()操作在不同配置下会生成多种不同类型的数据依赖关系。在图4.18(d)中,由于rdd1、rdd2和CoGoupedRDD具有相同的partitioner,parent RDD和child RDD之间只存在窄依赖ManyToOneDependency,因此只形成一个stage。图4.18(b)、图4.18(c)都同时包含OneToOneDependency和ShuffleDependency,根据Spark的stage划分原则,只对ShuffleDependency进行划分,得到两个stage,stage 1中的task既需要读取上游stage中的多个分区中的数据,也需要处理通过OneToOneDependency连接的RDD中的数据。图4.18(a)最复杂,包含了多个ShuffleDependency,依据Spark的划分原则,需要对多个ShuffleDependency都进行划分,得到多个stage(这里划分出3个stage)。下游stage需要等待上游stage完成后再执行,Shuffle Read获取上游stage的输出数据。

4.4 扩展阅读

实际上,为了优化数据处理流程,尤其是SQL数据处理流程,数据库领域已经有很多相关的优化工作,包括基于规则的优化、基于性能模型的优化和基于自适应的优化等。Spark SQL引擎也将这些优化技术引入了用户代码的逻辑处理流程和物理执行计划转化过程中。Spark SQL使用基于规则的优化技术,如谓词下推、算子组合、常量折叠等技术来对逻辑处理流程进行优化;使用基于性能模型的优化技术选择最优的物理执行计划;使用基于自适应的优化执行技术,根据应用运行时的信息来动态调整执行计划(包括自动Shuffle分区个数的确定、数据倾斜的处理等),提高执行效率。读者可以进一步阅读Spark SQL论文和相关技术文档来了解更多的优化技术。