Spark 基于内存进行计算,擅长迭代计算,流式处理,但也会发生shuffle 过程。shuffle 的优化,以及避免产生 shuffle 会给程序提高更好的性能。因为 shuffle 的性能优劣直接决定了整个计算引擎的性能和吞吐量。
下图是官方的说明,1.2 版本之后默认是使用 sort shuffle 。这样会更加高效得利用内存。之前版本默认是 hash shuffle。
SortShuffleManager 的运行机制主要分成两种,一种是普通运行机制,另一种是 bypass 运行机制。当 shuffle reduce task 的数量小于等于bypassMergeThreshold 参数的值时(默认为200),就会启用bypass机制。
我们看下面,图 1-1 是 spark shuffle 过程的普通机制。
图 1-1 sort shuffle 普通机制
看图说话,map task 的计算结果会写入一个内存数据结构中,这个数据结构根据算子,如使用 reduceByKey 这类聚合算子的话,这个内存结构是 Map, 一边通过 Map聚合,一边写入内存;如是使用 join 这类普通算子的话,这个内存结构是 Array,直接写入内存。这个内存结构默认大小是 5 M。
在 shuffle 的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过 5 M 时,比如现在内存结构中的数据是 5.01 M 那么它会申请 5.01*2-5 = 5.02 M 的内存给内存结构,如果成功不会发生溢写,不成功则会发生溢写。
在溢写之前,会根据 key 对内存结构的数据进行排序,然后分批写入磁盘。每一批默认是 10000 条数据。也就是排序好的数据会以每批1 w 条数据的形式写入磁盘。写入磁盘时,由 Java 的 BufferedOutputStream 来实现的,作为缓冲流,现将数据写入缓冲区,等待缓冲区满了再溢出到磁盘,这样减少了磁盘的 IO ,提高了写的性能。
task 完成会后,会将所有的磁盘文件进行一次 Merge 成为一个磁盘文件,所以一个 task 只对应一个磁盘文件,但是还要为下游的 stage 提供数据,所以还要有一个索引文件,其中标识了下游的各个 task 的数据在磁盘文件中的 start offset 和 end offset。
上面提到了,下游的 stage 需要去寻找上一个 stage 产生的数据,也就是所谓的 shuffle 文件寻址。可以看我这篇文章:Spark 的 shuffle 文件寻址流程
bypass 机制,先来看一张流程图 1-2,与上面的普通机制进行对比。
图 1-2 bypass 机制
前面也提到了,bypass 的触发条件是 shuffle reduce task 的数量小于我们设置的 bypassMergeThreshoold 参数。(默认是 200)
通过对比普通机制,可以看出 bypass 机制不会进行排序的过程。shuffle write 过程不会对数据进行排序,这样的话,就节省了这部分的性能开销。
大多数 Spark 作业的性能主要就是消耗在了 shuffle 环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。
因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。
注意,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。
后续的文章将会依次从代码开发、资源参数、数据倾斜方面展开。本文只讲解 shuffle 参数的调优。
spark.shuffle.file.buffer
参数:默认是 32 k。
说明:表示写入磁盘文件之前缓冲区的大小。
建议:如果资源充足,可以适当按倍数增加,比如 64 k, 从而减少 shuffle write 过程中溢写到磁盘文件的系数,减少磁盘 IO 次数,进而提升性能。1-5%
spark.reducer.maxSizeInFlight
参数:默认是 48 M。
说明:表示 shuffle read 过程拉取数据的 buffer 大小。
建议**:如果资源充足,可以适当按倍数增加,比如 96 M, 从而减少拉取次数,减少网络传输的次数,进而提升性能。1-5%
spark.shuffle.io.maxRetries
参数:默认是 3。
说明:表示拉取数据的时候,执行失败重试的时间间隔。
建议:如果一个作业的 shuffle 过程特别耗时,可以加大该参数,比如 60 次,以避免由于 JVM 的 full gc 或者网络原因造成数据拉取失败。
spark.shuffle.io.retryWait
参数:默认是 5 s。
说明:表示拉取数据的时候,重试的最大时长。如果超过这个次数还没有拉取成功,这个任务就会失败。
建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
spark.shuffle.manager
参数:默认是 sort。
说明:用于设置 ShuffleManager 的类型。
建议:如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。
spark.shuffle.sort.bypassMergeThreshold
参数:默认是 200。
说明:用于设置ShuffleManager的类型。
建议:如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。
spark.shuffle.consolidateFiles
参数:默认是 false。
说明:如果使用 HashShuffleManager,该参数有效。如果设置为true,那么就会开启 consolidate 机制,会大幅度合并 shuffle write 的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
**
注意,影响一个Spark作业性能的因素,主要还有代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。可以关注我后续的文章。
如果对您有帮助,欢迎点**好看**、关注、转发。
您觉得本文好看吗?