6.1 Shuffle的意义及设计挑战

运行在不同stage、不同节点上的task间如何进行数据传递。这个数据传递过程通常被称为Shuffle机制。Shuffle解决的问题是如何将数据重新组织,使其能够在上游和下游task之间进行传递和计算。

Shuffle的设计和实现需要面对多个挑战。

image.png
图6.1 包含ShuffleDependency的典型数据操作的逻辑处理流程
_
(1)计算的多样性

Shuffle机制分为Shuffle Write和Shuffle Read两个阶段,前者主要解决上游stage输出数据的分区问题,后者主要解决下游stage从上游stage获取数据、重新组织、并为后续操作提供数据的问题。

有些操作需要进行聚合,groupByKey()需要将Shuffle Read的 record聚合为 record。有些操作需要进行combine(),如reduceByKey()需要在Shuffle Write端进行combine()。有些操作需要进行排序,如sortByKey()需要对Shuffle Read的数据按照Key进行排序。

(2)计算的耦合性

有些操作包含用户自定义聚合函数,如aggregateByKey (seqOp,combOp)中的seqOp和combOp,以及reduceByKey(func)中的func,这些函数的计算过程和数据的Shuffle Write/Read过程耦合在一起。

(3)中间数据存储问题

在Shuffle机制中需要对数据进行重新组织(分区、聚合、排序等),也需要进行一些计算(执行聚合函数),那么在Shuffle Write/Read过程中的中间数据如何表示?如何组织?如何存放?如果Shuffle的数据量太大,那么内存无法存下怎么办?

6.2 Shuffle的设计思想

6.2.1 解决数据分区和数据聚合问题

(1)数据分区问题

数据分区问题解决方案:该问题包含两个子问题。第1个问题是如何确定分区个数?分区个数与下游stage的task个数一致。

分区个数可以由用户自定义,如groupByKey(numPartitions)中的numPartitions一般被定义为集群中可用CPU个数的1~2倍,即将每个map task的输出数据划分为numPartitions份,相应地,在reduce stage中启动numPartitions个task来获取并处理这些数据。如果用户没有定义,则默认分区个数是parent RDD的分区个数的最大值。如图6.2的左图所示,在没有定义join(numPartitions)中的分区个数numPartitions的情况下,取两个parent RDD的分区的最大值为2。

第2个问题是如何对map task输出数据进行分区?解决方法是对map task输出的每一个record,根据Key计算其partitionId,具有不同partitionId的record被输出到不同的分区(文件)中。如图6.2的右图所示,下游stage中只有两个task,分区个数为2。map task需要将其输出数据分为两份,方法是让map()操作计算每个输出record的partitionId=Hash(Key)%2,根据partitionId将record直接输出到不同分区中。不支持Shuffle Write端的combine()操作。

image.png
图6.2 join()的逻辑处理流程与Shuffle Write/Read过程

(2)数据聚合问题

数据聚合问题解决方案:数据聚合的本质是将相同Key的record放在一起,并进行必要的计算,这个过程可以利用C++/Java语言中的HashMap实现。方法是使用两步聚合(two-phase aggregation),先将不同tasks获取到的 record存放到HashMap中,HashMap中的Key是K,Value是list(V)。然后,对于HashMap中每一个 record,使用func计算得到 record。

image.png
图6.3 两步聚合和在线聚合的区别

优化方案:对于reduceByKey(func)等包含聚合函数的操作来说,我们可以采用一种在线聚合(Online aggregation)的方法来减少内存空间占用。如图6.3的右图所示,该方案在每个record加入HashMap时,同时进行func()聚合操作,并更新相应的聚合结果。

在线聚合将Shuffle Read和聚合函数计算耦合在一起,可以加速速计算。但是,对于不包含聚合函数的操作,如groupByKey()等,在线聚合和两步聚合没有差别,因为这些操作不包含聚合函数,无法减少中间数据规模。

6.2.2 解决map()端combine问题

需要进行combine操作:进行combine操作的目的是减少Shuffle的数据量,只有包含聚合函数的数据操作需要进行map()端的combine,具体包括reduceByKey()、foldByKey()、aggregateByKey()、combineByKey()、distinct()等。对于不包含聚合函数的操作,如groupByKey(),我们即使进行了combine操作,也不能减少中间数据的规模。

combine解决方案:从本质上讲,combine和Shuffle Read端的聚合过程没有区别,都是将 record聚合成,不同的是,Shuffle Read端聚合的是来自所有map task输出的数据,而combine聚合的是来自单一task输出的数据。因此仍然可以采用Shuffle Read端基于HashMap的解决方案。具体地,首先利用HashMap进行combine,然后对HashMap中每一个record进行分区,输出到对应的分区文件中。

6.2.3 解决sort问题

有些操作如sortByKey()、sortBy()需要将数据按照Key进行排序,那么如何在Shuffle机制中完成排序呢?该问题包含以下两个子问题。

(1)在哪里执行sort?

首先,在Shuffle Read端必须执行sort,因为从每个task获取的数据组合起来以后不是全局按Key进行排序的。其次,理论上,在Shuffle Write端不需要排序,但如果进行了排序,那么Shuffle Read获取到(来自不同task)的数据是已经部分有序的数据,可以减少Shuffle Read端排序的复杂度。

(2)何时进行排序,即如何确定排序和聚合的顺序?

第1种方案是先排序再聚合,这种方案需要先使用线性数据结构如Array,存储Shuffle Read的 record,然后对Key进行排序,排序后的数据可以直接从前到后进行扫描聚合,不需要再使用HashMap进行hash-based聚合。这种方案也是Hadoop MapReduce采用的方案,方案优点是既可以满足排序要求又可以满足聚合要求;缺点是需要较大内存空间来存储线性数据结构,同时排序和聚合过程不能同时进行,即不能使用在线聚合,效率较低。

第2种方案是排序和聚合同时进行,我们可以使用带有排序功能的Map,如TreeMap来对中间数据进行聚合,每次Shuffle Read获取到一个record,就将其放入TreeMap中与现有的record进行聚合,过程与HashMap类似,只是TreeMap自带排序功能。这种方案的优点是排序和聚合可以同时进行;缺点是相比HashMap,TreeMap的排序复杂度较高,TreeMap的插入时间复杂度是O(nlogn),而且需要不断调整树的结构,不适合数据规模非常大的情况。

第3种方案是先聚合再排序,即维持现有基于HashMap的聚合方案不变,将HashMap中的record或record的引用放入线性数据结构中进行排序。这种方案的优点是聚合和排序过程独立,灵活性较高,而且之前的在线聚合方案不需要改动;缺点是需要复制(copy)数据或引用,空间占用较大。Spark选择的是第3种方案,设计了特殊的HashMap来高效完成先聚合再排序的任务。

6.2.4 解决内存不足问题

Shuffle数据量过大导致内存放不下怎么办?由于我们使用HashMap对数据进行combine和聚合,在数据量大的时候,会出现内存溢出。这个问题既可能出现在Shuffle Write阶段,又可能出现在Shuffle Read阶段。

解决方案:使用内存+磁盘混合存储方案。先在内存(如HashMap)中进行数据聚合,如果内存空间不足,则将内存中的数据spill到磁盘上,此时空闲出来的内存可以继续处理新的数据。此过程可以不断重复,直到数据处理完成。然而,问题是spill到磁盘上的数据实际上是部分聚合的结果,并没有和后续的数据进行过聚合。因此,为了得到完整的聚合结果,我们需要在进行下一步数据操作之前对磁盘上和内存中的数据进行再次聚合,这个过程我们称为“全局聚合”。为了加速全局聚合,我们需要将数据spill到磁盘上时进行排序,这样全局聚合才能够按顺序读取spill到磁盘上的数据,并减少磁盘I/O。具体做法将在后面详细描述。

6.3 Spark中Shuffle框架的设计

image.png
图6.4 在Shuffle机制中Spark典型数据操作的计算需求
_
在Shuffle Write端,目前只支持combine功能,并不支持按Key排序功能。当然,未来有些数据操作可能同时需要这两个功能,所以,Shuffle框架还是需要支持全部的功能。

6.3.1 Shuffle Write框架设计和实现

在Shuffle Write阶段,数据操作需要分区、聚合和排序3个功能,但如图6.4所示,每个数据操作只需要其中的一个或两个功能。Spark为了支持所有的情况,设计了一个通用的Shuffle Write框架,框架的计算顺序为“map()输出→数据聚合→排序→分区”输出。

如图6.5所示,map task每计算出一个record及其partitionId,就将record放入类似HashMap的数据结构中进行聚合;聚合完成后,再将HashMap中的数据放入类似Array的数据结构中进行排序,既可按照partitionId,也可以按照partitionId+Key进行排序;最后根据partitionId将数据写入不同的数据分区中,存放到本地磁盘上。其中,聚合(aggregate,即combine)和排序(sort)过程是可选的,如果数据操作不需要聚合或者排序,那么可以去掉相应的聚合或排序过程。

image.png
图6.5 通用的Shuffle Write框架(包含“map()输出→数据聚合排序→分区输出”的过程)_

下面我们介绍在Shuffle Write框架下,Spark如何针对不同情况构建最适合的Shuffle Write方式。

(1)不需要map()端聚合(combine)和排序。

这种情况最简单,只需要实现分区功能。如图6.6所示,map()依次输出 record,并计算其partitionId(PID),Spark根据partitionId,将record依次输出到不同的buffer中,每当buffer填满就将record溢写到磁盘上的分区文件中。分配buffer的原因是map()输出record的速度很快,需要进行缓冲来减少磁盘I/O。在实现代码中,Spark将这种Shuffle Write方式称为BypassMergeSortShuffleWriter,即不需要进行排序的Shuffle Write方式。

image.png
图6.6 不需要map()端聚合(combine)和排序的Shuffle Write流程(BypassMergeSortShuffleWriter)

该模式的优缺点:优点是速度快,直接将record输出到不同的分区文件中。缺点是资源消耗过高,每个分区都需要一个buffer(大小由spark.Shuffle.file.buffer控制,默认为32KB),且同时需要建立多个分区文件进行溢写。当分区个数太大,如10 000时,每个map task需要约320MB的内存,会造成内存消耗过大,而且每个task需要同时建立和打开10 000个文件,造成资源不足。因此,该Shuffle方案适合分区个数较少的情况(<200)。

该模式适用的操作类型:map()端不需要聚合(combine)、Key不需要排序且分区个数较少(<=spark.Shuffle.sort.bypassMergeThreshold,默认值为200)。例如,groupByKey(100),partitionBy(100),sortByKey(100)等。

(2)不需要map()端聚合(combine),但需要排序。

在这种情况下需要按照partitionId+Key进行排序。如图6.7所示,Spark采用的实现方法是建立一个Array(图6.7中的PartitionedPairBuffer)来存放map()输出的record,并对Array中元素的Key进行精心设计,将每个 record转化为<(PID,K),V> record存储;然后按照partitionId+Key对record进行排序;最后将所有record写入一个文件中,通过建立索引来标示每个分区。

image.png
图6.7 不需要map()端聚合(combine),但需要排序的Shuffle Write流程设计(SortShuffleWriter)

如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上,等待map()输出完以后,再将Array中的record与磁盘上已排序的record进行全局排序,得到最终有序的record,并写入文件中。

该Shuffle模式被命名为SortShuffleWriter(KeyOrdering=true),使用的Array被命名为PartitionedPairBuffer。

该Shuffle模式的优缺点:优点是只需要一个Array结构就可以支持按照partitionId+Key进行排序,Array大小可控,而且具有扩容和spill到磁盘上的功能,支持从小规模到大规模数据的排序。同时,输出的数据已经按照partitionId进行排序,因此只需要一个分区文件存储,即可标示不同的分区数据,克服了BypassMergeSortShuffleWriter中建立文件数过多的问题,适用于分区个数很大的情况。缺点是排序增加计算时延。

该Shuffle模式适用的操作:map()端不需要聚合(combine)、Key需要排序、分区个数无限制。目前,Spark本身没有提供这种排序类型的数据操作,但不排除用户会自定义,或者系统未来会提供这种类型的操作。sortByKey()操作虽然需要按Key进行排序,但这个排序过程在Shuffle Read端完成即可,不需要在Shuffle Write端进行排序。

将“按PartitionId+Key排序”改为“只按PartitionId排序”,就可以支持“不需要map()端combine、不需要按照Key进行排序,分区个数过大”的操作。例如,groupByKey(300)、partitionBy(300)、sortByKey(300)。

(3)需要map()端聚合(combine),需要或者不需要按Key进行排序。

在这种情况下,需要实现按Key进行聚合(combine)的功能。如图6.8的上图所示,Spark采用的实现方法是建立一个类似HashMap的数据结构对map()输出的record进行聚合。HashMap中的Key是“partitionId+Key”,HashMap中的Value是经过相同combine的聚合结果。
_
如果不需要按Key进行排序,如图6.8的上图所示,那么只按partitionId进行排序;如果需要按Key进行排序,如图6.8的下图所示,那么按partitionId+Key进行排序。最后,将排序后的record写入一个分区文件中。

image.png
图6.8 包含combine的Shuffle Write流程设计(SortShuffleWriterWithCombine)

如果HashMap存放不下,则会先扩容为两倍大小,如果还存放不下,就将HashMap中的record排序后spill到磁盘上。此时,HashMap被清空,可以继续对map()输出的record进行聚合,如果内存再次不够用,那么继续spill到磁盘上,此过程可以重复多次。当map()输出完成以后,将此时HashMap中的reocrd与磁盘上已排序的record进行再次聚合(merge),得到最终的record,输出到分区文件中。

该Shuffle模式的优缺点:优点是只需要一个HashMap结构就可以支持map()端的combine功能,HashMap具有扩容和spill到磁盘上的功能,支持小规模到大规模数据的聚合,也适用于分区个数很大的情况。在聚合后使用Array排序,可以灵活支持不同的排序需求。缺点是在内存中进行聚合,内存消耗较大,需要额外的数组进行排序,而且如果有数据spill到磁盘上,还需要再次进行聚合。在实现中,Spark在Shuffle Write端使用一个经过特殊设计和优化的HashMap,命名为PartitionedAppendOnlyMap,可以同时支持聚合和排序操作,相当于HashMap和Array的合体。

该Shuffle模式适用的操作:适合map()端聚合(combine)、需要或者不需要按Key进行排序、分区个数无限制的应用,如reduceByKey()、aggregateByKey()等。

Shuffle Write框架需要执行的3个步骤是“数据聚合→排序→分区”。如果应用中的数据操作不需要聚合,也不需要排序,而且分区个数很少,那么可以采用直接输出模式,即BypassMergeSortShuffleWriter。为了克服BypassMergeSortShuffleWriter打开文件过多、buffer分配过多的缺点,也为了支持需要按Key进行排序的操作,Spark提供了SortShuffleWriter,使用基于Array的方法来按partitionId或partitionId+Key进行排序,只输出单一的分区文件即可。最后,为了支持map()端combine操作,Spark提供了基于HashMap的SortShuffleWriter,将Array替换为类似HashMap的操作来支持聚合操作,在聚合后根据partitionId或partitionId+Key对record进行排序,并输出分区文件。因为SortShuffleWriter按partitionId进行了排序,所以被称为sort-based Shuffle Write。

6.3.2 Shuffle Read框架设计和实现

在Shuffle Read阶段,数据操作需要3个功能:跨节点数据获取、聚合和排序。Spark为了支持所有的情况,设计了一个通用的Shuffle Read框架,框架的计算顺序为“数据获取→聚合→排序”输出。

如图6.9所示,reduce task不断从各个map task的分区文件中获取数据(Fetch records),然后使用类似HashMap的结构来对数据进行聚合(aggregate),该过程是边获取数据边聚合。聚合完成后,将HashMap中的数据放入类似Array的数据结构中按照Key进行排序(sort by Key),最后将排序结果输出或者传递给下一个操作。如果不需要聚合或者排序,则可以去掉相应的聚合或排序过程。

image.png
图6.9 通用的Shuffle Read框架(包含“数据获取→聚合→排序输出”的过程)
_
(1)不需要聚合,不需要按Key进行排序。

这种情况最简单,只需要实现数据获取功能即可。如图6.10所示,等待所有的map task结束后,reduce task开始不断从各个map task获取 record,并将record输出到一个buffer中(大小为spark.reducer.maxSizeInFlight=48MB),下一个操作直接从buffer中获取数据即可。

image.png
图6.10 不需要聚合,不需要按Key进行排序的Shuffle Read流程设计
_
该Shuffle模式的优缺点:优点是逻辑和实现简单,内存消耗很小。缺点是不支持聚合、排序等复杂功能。

该Shuffle模式适用的操作:适合既不需要聚合也不需要排序的应用,如partitionBy()等。

(2)不需要聚合,需要按Key进行排序。

在这种情况下,需要实现数据获取和按Key排序的功能。

如图6.11所示,获取数据后,将buffer中的record依次输出到一个Array结构(PartitionedPairBuffer)中。由于这里采用了本来用于Shuffle Write端的PartitionedPairBuffer结构,所以还保留了每个record的partitionId。然后,对Array中的record按照Key进行排序,并将排序结果输出或者传递给下一步操作。

当内存无法存下所有的record时,PartitionedPairBuffer将record排序后spill到磁盘上,最后将内存中和磁盘上的record进行全局排序,得到最终排序后的record。

image.png
图6.11 不需要聚合、需要按Key进行排序的Shuffle Read流程设计
_
该Shuffle模式的优缺点:优点是只需要一个Array结构就可以支持按照Key进行排序,Array大小可控,而且具有扩容和spill到磁盘上的功能,不受数据规模限制。缺点是排序增加计算时延。

该Shuffle模式适用的操作:适合reduce端不需要聚合,但需要按Key进行排序的操作,如sortByKey()、sortBy()等。

(3)需要聚合,不需要或需要按Key进行排序。

在这种情况下,需要实现按照Key进行聚合,根据需要按Key进行排序的功能。如图6.12的上图所示,获取record后,Spark建立一个类似HashMap的数据结构(ExternalAppendOnlyMap)对buffer中的record进行聚合,HashMap中的Key是record中的Key,HashMap中的Value是经过相同聚合函数(func())计算后的结果。在图6.12中,聚合函数是sum()函数,那么Value中存放的是多个record对应Value相加后的结果。之后,如果需要按照Key进行排序,如图6.12的下图所示,则建立一个Array结构,读取HashMap中的record,并对record按Key进行排序,排序完成后,将结果输出或者传递给下一步操作。

image.png
图6.12 需要聚合,不需要或需要按Key进行排序的Shuffle Read流程设计

如果HashMap存放不下,则会先扩容为两倍大小,如果还存放不下,就将HashMap中的record排序后spill到磁盘上。此时,HashMap被清空,可以继续对buffer中的record进行聚合。如果内存再次不够用,那么继续spill到磁盘上,此过程可以重复多次。当聚合完成以后,将此时HashMap中的reocrd与磁盘上已排序的record进行再次聚合,得到最终的record,输出到分区文件中。

该Shuffle模式的优缺点:优点是只需要一个HashMap和一个Array结构就可以支持reduce端的聚合和排序功能,HashMap 具有扩容和spill到磁盘上的功能,支持小规模到大规模数据的聚合。边获取数据边聚合,效率较高。缺点是需要在内存中进行聚合,内存消耗较大,如果有数据spill到磁盘上,还需要进行再次聚合。另外,经过HashMap聚合后的数据仍然需要拷贝到Array中进行排序,内存消耗较大。在实现中,Spark使用的HashMap是一个经过特殊优化的HashMap,命名为ExternalAppendOnlyMap,可以同时支持聚合和排序操作,相当于HashMap和Array的合体,其实现细节将在6.4节中介绍。

该Shuffle模式适用的操作:适合reduce端需要聚合、不需要或需要按Key进行排序的操作,如reduceByKey()、aggregateByKey()等。

Shuffle Read框架需要执行的3个步骤是“数据获取→聚合→排序输出”。如果应用中的数据操作不需要聚合,也不需要排序,那么获取数据后直接输出。对于需要按Key进行排序的操作,Spark 使用基于Array的方法来对Key进行排序。对于需要聚合的操作,Spark提供了基于HashMap的聚合方法,同时可以再次使用Array来支持按照Key进行排序。总体来讲,Shuffle Read框架使用的技术和数据结构与Shuffle Write过程类似,而且由于不需要分区,过程比Shuffle Write更为简单。当然,还有一些可优化的地方,如聚合和排序如何进行统一来减少内存copy和磁盘I/O等,这部分内容将在6.4节中介绍。

6.4 支持高效聚合和排序的数据结构

为了提高聚合和排序性能,Spark为Shuffle Write/Read的聚合和排序过程设计了3种数据结构,如图6.13所示。这几种数据结构的基本思想是在内存中对record进行聚合和排序,如果存放不下,则进行扩容,如果还存放不下,就将数据排序后spill到磁盘上,最后将磁盘和内存中的数据进行聚合、排序,得到最终结果。

image.png
图6.13 支持高效聚合和排序的数据结构

我们会发现Shuffle机制中使用的数据结构的两个特征:一是只需要支持record的插入和更新操作,不需要支持删除操作,这样我们可以对数据结构进行优化,减少内存消耗;二是只有内存放不下时才需要spill到磁盘上,因此数据结构设计以内存为主,磁盘为辅。Spark中的PartitionedAppendOnlyMap和ExternalAppendOnlyMap都基于AppendOnlyMap实现。因此,我们先介绍AppendOnlyMap的原理。

6.4.1 AppendOnlyMap的原理

AppendOnlyMap实际上是一个只支持record添加和对Value进行更新的HashMap。与Java HashMap采用“数组+链表”实现不同,AppendOnlyMap只使用数组来存储元素,根据元素的Hash值确定存储位置,如果存储元素时发生Hash值冲突,则使用二次地址探测法(Quadratic probing)来解决Hash值冲突。

image.png
图6.13使用数组和二次地址探测法来模拟HashMap(__(蓝色部分存储Key,白色部分存储Value)
_
扩容:AppendOnlyMap使用数组来实现的问题是,如果插入的record太多,则很快会被填满。Spark的解决方案是,如果AppendOnlyMap的利用率达到70%,那么就扩张一倍,扩张意味着原来的Hash()失效,因此对所有Key进行rehash,重新排列每个Key的位置。

排序:由于AppendOnlyMap采用了数组作为底层存储结构,可以支持快速排序等排序算法。实现方法,如图6.14所示,先将数组中所有的 record转移到数组的前端,用begin和end来标示起始位置,然后调用排序算法对[begin,end]中的record进行排序。对于需要按Key进行排序的操作,如sortByKey(),可以按照Key值进行排序;对于其他操作,只按照Key的Hash值进行排序即可。

输出:迭代AppendOnlyMap数组中的record,从前到后扫描输出即可。

image.png
图6.14 对AppendOnlyMap中的元素进行排序输出

6.4.2 ExternalAppendOnlyMap

AppendOnlyMap的优点是能够将聚合和排序功能很好地结合在一起,缺点是只能使用内存,难以适用于内存空间不足的情况。为了解决这个问题,Spark基于AppendOnlyMap设计实现了基于内存+磁盘的ExternalAppendOnlyMap,用于Shuffle Read端大规模数据聚合。同时,由于Shuffle Write端聚合需要考虑partitionId,Spark也设计了带有partitionId的ExternalAppendOnlyMap,名为PartitionedAppendOnlyHashMap。这两个数据结构功能类似,我们先介绍ExternalAppendOnlyMap。

ExternalAppendOnlyMap的工作原理是,先持有一个AppendOnlyMap来不断接收和聚合新来的record,AppendOnlyMap快被装满时检查一下内存剩余空间是否可以扩展,可直接在内存中扩展,不可对AppendOnlyMap中的record进行排序,然后将record都spill到磁盘上。因为record不断到来,可能会多次填满AppendOnlyMap,所以这个spill过程可以出现多次,最终形成多个spill文件。等record都处理完,此时AppendOnlyMap中可能还留存一些聚合后的record,磁盘上也有多个spill文件。因为这些数据都经过了部分聚合,还需要进行全局聚合(merge)。因此,ExternalAppendOnlyMap的最后一步是将内存中AppendOnlyMap的数据与磁盘上spill文件中的数据进行全局聚合,得到最终结果。

(1)AppendOnlyMap的大小估计

虽然我们知道AppendOnlyMap中持有的数组的长度和大小,但数组里面存放的是Key和Value的引用,并不是它们的实际对象(object)大小,而且Value会不断被更新,实际大小不断变化。因此,想准确得到AppendOnlyMap的大小比较困难。

Spark设计了一个增量式的高效估算算法,在每个record插入或更新时根据历史统计值和当前变化量直接估算当前AppendOnlyMap的大小,算法的复杂度是O(1),开销很小。在record插入和聚合过程中会定期对当前AppendOnlyMap中的record进行抽样,然后精确计算这些record的总大小、总个数、更新个数及平均值等,并作为历史统计值。进行抽样是因为AppendOnlyMap中的record可能有上万个,难以对每个都精确计算。之后,每当有record插入或更新时,会根据历史统计值和历史平均的变化值,增量估算AppendOnlyMap的总大小,详见Spark源码中的SizeTracker.estimateSize()方法。抽样也会定期进行,更新统计值以获得更高的精度。

(2)Spill过程与排序

当AppendOnlyMap达到内存限制时,会将record排序后写入磁盘中。排序是为了方便下一步全局聚合(聚合内存和磁盘上的record)时可以采用更高效的merge-sort(外部排序+聚合)。

问题是根据什么对record进行排序的?自然想到的是根据record的Key进行排序的,但是这就要求操作定义Key的排序方法,如sortByKey()等操作定义了按照Key进行的排序。大部分操作,如groupByKey(),并没有定义Key的排序方法,也不需要输出结果是按照Key进行排序的。

在这种情况下,Spark采用按照Key的Hash值进行排序的方法,这样既可以进行merge-sort,又不要求操作定义Key排序的方法。然而,这种方法的问题是会出现Hash值冲突,也就是不同的Key具有相同的Hash值。为了解决这个问题,Spark在merge-sort的同时会比较Key的Hash值是否相等,以及Key的实际值是否相等。

解决了spill时如何对record进行排序的问题后,每当AppendOnlyMap超过内存限制,就会将其内部的record排序后spill到磁盘上,如图6.15所示,AppendOnlyMap被填满了4次,也被spill到磁盘上4次。

image.png
图6.15 ExternalAppendOnlyMap中的record被spill到磁盘上并进行全局聚合

(3)全局聚合(merge-sort)

前面提到过,由于最终的spill文件和内存中的AppendOnlyMap都是经过部分聚合后的结果,其中可能存在相同Key的record,因此还需要一个全局聚合阶段将AppendOnlyMap中的record与spill文件中的record进行聚合,得到最终聚合后的结果。全局聚合的方法就是建立一个最小堆或最大堆,每次从各个spill文件中读取前几个具有相同Key(或者相同Key的Hash值)的record,然后与AppendOnlyMap中的record进行聚合,并输出聚合后的结果。

在图6.15中,在全局聚合时,Spark分别从4个spill文件中提取第1个 record,与还留在AppendOnlyMap中的第1个record组成最小堆,然后不断从最小堆中提取具有相同Key的record进行聚合(merge)。由于每个spill文件中的record是经过排序的,按顺序读取和聚合可以保证能够对每个record得到全局聚合的结果。

6.4.3 PartitionedAppendOnlyMap

PartitionedAppendOnlyMap用于在Shuffle Write端对record进行聚合(combine)。PartitionedAppendOnlyMap的功能和实现与ExternalAppendOnlyMap的功能和实现基本一样,唯一区别是PartitionedAppendOnlyMap中的Key是“PartitionId+Key”,这样既可以根据partitionId进行排序(面向不需要按Key进行排序的操作),也可以根据partitionId+Key进行排序(面向需要按Key进行排序的操作),从而在Shuffle Write阶段可以进行聚合、排序和分区。

6.4.4 PartitionedPairBuffer

PartitionedPairBuffer本质上是一个基于内存+磁盘的Array,随着数据添加,不断地扩容,当到达内存限制时,就将Array中的数据按照partitionId或partitionId+Key进行排序,然后spill到磁盘上,该过程可以进行多次,最后对内存中和磁盘上的数据进行全局排序,输出或者提供给下一个操作。

6.5 与Hadoop MapReduce的Shuffle机制对比

Hadoop MapReduce有明显的两个阶段,即map stage和reduce stage。如图6.16所示,在map stage中,每个map task首先执行map(K,V)函数,再读取每个record,并输出新的 record。这些record首先被输出到一个固定大小的spill buffer里(一般为100MB),spill buffer如果被填满就将spill buffer中的record按照Key排序后输出到磁盘上。这个过程类似Spark将map task输出的record放到一个排序数组(PartitionedPairBuffer)中,不同的是Hadoop MapReduce是严格按照Key进行排序的,而PartitionedPairBuffer排序更灵活(可以按照partitionId进行排序,也可以按照partitionId+Key进行排序)。另外,由于spill buffer中的record只进行排序,不能完成聚合(combine)功能,所以Hadoop MapReduce在完成map()、等待所有的record都spill到磁盘上后,启动一个专门的聚合阶段(图6.16中的merge phase),使用combine()将所有spill文件中的record进行全局聚合,得到最终聚合结果。注意,这里需要进行多次全局聚合,因为每次只针对某个分区的spill文件进行聚合

image.png
图6.16 Hadoop MapReduce的Shuffle机制

在Shuffle Read阶段,Hadoop MapReduce先将每个map task输出的相应分区文件通过网络获取,然后放入内存,如果内存放不下,就先对当前内存中的record进行聚合和排序,再spill到磁盘上,图6.16中的a,b,c,d,…代表从不同map task获取的分区文件,每个文件里面包含许多个record。由于每个分区文件中包含的record已经按Key进行了排序,聚合时只需要一个最小堆或者最大堆保存当前每个文件中的前几个record即可,聚合效率比较高,但需要占用大量内存空间来存储这些分区文件。等获取所有的分区文件时,此时可能存在多个spill文件及内存中剩余的分区文件,这时再启动一个专门的reduce阶段(图6.16中的reduce phase)来将这些内存和磁盘上的数据进行全局聚合,这个过程与Spark的全局聚合过程没有什么区别,最后得到聚合后的结果。

下面总结一下Hadoop MapReduce的Shuffle机制的优点和缺点。

优点:

  • Hadoop MapReduce的Shuffle流程固定,阶段分明,每个阶段读取什么数据、进行什么操作、输出什么数据都是确定性的。这种确定性使得实现起来比较容易。Hadoop MapReduce框架的内存消耗也是确定的,map阶段框架只需要一个大的spill buffer,reduce阶段框架只需要一个大的数组(MergeQueue)来存放获取的分区文件中的record。这样,什么时候将数据spill到磁盘上是确定的,也易于实现和内存管理。当然,用户定义的聚合函数,如combine()和reduce()的内存消耗是不确定的。
  • Hadoop MapReduce对Key进行了严格排序,使得可以使用最小堆或最大堆进行聚合,非常高效。而且可以原生支持sortByKey()。
  • Hadoop MapReduce按Key进行排序和spill到磁盘上的功能,可以在Shuffle大规模数据时仍然保证能够顺利进行。

缺点:

  • Hadoop MapReduce强制按Key进行排序,大多数应用其实不需要严格地按照Key进行排序,如groupByKey(),排序增加计算量。
  • Hadoop MapReduce不能在线聚合,不管是map()端还是reduce()端,都是先将数据存放到内存或者磁盘上后,再执行聚合操作的,存储这些数据需要消耗大量的内存和磁盘空间。如果能够一边获取record一边聚合,那么对于大多数聚合操作,可以有效地减少存储空间,并减少时延。
  • Hadoop MapReduce产生的临时文件过多,如果map task个数为M,reduce task个数为N,那么map阶段集群会产生M×N个分区文件,当M和N较大时,总的临时文件个数过多。

克服第1个缺点(强制排序)的方法是对操作类型进行分类,如Spark提供了按partitionId排序、按Key排序等多种方式来灵活应对不同操作的排序需求。克服第2个缺点(不能在线聚合)的方法是采用hash-based聚合,也就是利用HashMap的在线聚合特性,将record插入HashMap时自动完成聚合过程,即Spark为什么设计AppendOnlyMap等数据结构。克服第3个缺点(临时文件问题)的方法是将多个分区文件合并为一个文件,按照partitionId的顺序存储,这也是Spark为什么要按照partitionId进行排序的原因。总的来说,Spark采用的是hash+sort-based Shuffle的方法,融合了hash-based和sort-based Shuffle的优点,根据不同操作的需求,灵活选择最合适的Shuffle方法。

另外,由于Hadoop MapReduce采用独立阶段聚合,而Spark采用在线聚合的方法,两者的聚合函数还有一个大的区别。MapReduce的聚合函数reduce()接收的是一个 record,可以对每个record中的list(V)进行任意处理,而Spark中的聚合函数每当接收到一个 record时,就要立即进行处理,在流程上有一些受限。两者的区别类似下面的处理逻辑.在聚合过程中Spark需要对每个到来的record进行立即处理,而Hadoop MapReduce没有这个要求,所以更加灵活。

image.png