1 第一章Spark 性能调优

1.2 算子调优

1.2.1 算子调优一:mapPartitions

普通的map 算子对RDD 中的每一个元素进行操作,而mapPartitions 算子对RDD中每一个分区进行操作。如果是普通的map 算子,假设一个partition 有1 万条数据,那么map 算子中的function 要执行1 万次,也就是对每个元素进行操作。
Spark性能调优-v2 - 图1
如果是mapPartition 算子,由于一个task 处理一个RDD 的partition,那么一个task 只会执行一次function,function 一次接收所有的partition 数据,效率比较高。
Spark性能调优-v2 - 图2
比如,当要把RDD 中的所有数据通过JDBC 写入数据,如果使用map 算子,那么需要对RDD 中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用mapPartitions 算子,那么针对一个分区的数据,只需要建立一个数据库连接。mapPartitions 算子也存在一些缺点:对于普通的map 操作,一次处理一条数据,如果在处理了2000 条数据后内存不足,那么可以将已经处理完的2000 条数据从内存中垃圾回收掉; 但是如果使用mapPartitions 算子, 但数据量非常大时, function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出。
因此, mapPartitions 算子适用于数据量不是特别大的时候, 此时使用mapPartitions 算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用mapPartitions 算子,就会直接OOM)
在项目中,应该首先估算一下RDD 的数据量、每个partition 的数据量,以及分配给每个Executor 的内存资源,如果资源允许,可以考虑使用mapPartitions 算子代替map。

1.2.2 算子调优二:foreachPartition 优化数据库操作

在生产环境中, 通常使用foreachPartition 算子来完成数据库的写入, 通过foreachPartition 算子的特性,可以优化写数据库的性能。
如果使用foreach 算子完成数据库的操作, 由于foreach 算子是遍历RDD 的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用foreachPartition 算子。
与mapPartitions 算子非常相似,foreachPartition 是将RDD 的每个分区作为遍历对象,一次处理一个分区的数据,也就是说,如果涉及数据库的相关操作,一个分区的数据只需要创建一次数据库连接,
foreachPartition算子:
Spark性能调优-v2 - 图3
使用了foreachPartition 算子后,可以获得以下的性能提升:
1. 对于我们写的function 函数,一次处理一整个分区的数据;
2. 对于一个分区内的数据,创建唯一的数据库连接;
3. 只需要向数据库发送一次SQL 语句和多组参数;
在生产环境中, 全部都会使用foreachPartition 算子完成数据库操作。
foreachPartition 算子存在一个问题,与mapPartitions 算子类似,如果一个分区的数据量特别大,可能会造成OOM,即内存溢出。

1.2.3 算子调优三:filter 与coalesce 的配合使用

在Spark 任务中我们经常会使用filter 算子完成RDD 中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter 过滤后,每个分区的数据量有可能会存在较大差异,如下图所示:
Spark性能调优-v2 - 图4
我们可以发现两个问题:
1. 每个partition 的数据量变小了,如果还按照之前与partition 相等的task 个数去处理当前数据,有点浪费task 的计算资源;
2. 每个partition 的数据量不一样,会导致后面的每个task 处理每个partition 数据的时候,每个task 要处理的数据量不同,这很有可能导致数据倾斜问题。
第二个分区的数据过滤后只剩100 条,而第三个分区的数据过滤后剩下800 条,在相同的处理逻辑下,第二个分区对应的task 处理的数据量与第三个分区对应的task 处理的数据量差距达到了8 倍,这也会导致运行速度可能存在数倍的差距,这也就是数据倾斜问题。
针对上述的两个问题,我们分别进行分析:
1. 针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,比如将原来4 个分区的数据转化到2 个分区中,这样只需要用后面的两个task 进行处理即可,避免了资源的浪费。
2. 针对第二个问题,解决方法和第一个问题的解决方法非常相似,对分区数据重新分配,让每个partition 中的数据量差不多,这就避免了数据倾斜问题。
那么具体应该如何实现上面的解决思路?我们需要coalesce 算子。
repartition 与coalesce 都可以用来进行重分区,其中repartition 只是coalesce 接口中shuffle 为true 的简易实现,coalesce 默认情况下不进行shuffle,但是可以通过参数进行设置。
假设我们希望将原本的分区个数A 通过重新分区变为B,那么有以下几种情况:
1. A > B(多数分区合并为少数分区)
① A 与B 相差值不大
此时使用coalesce 即可,无需shuffle 过程。
② A 与B 相差值很大
此时可以使用coalesce 并且不启用shuffle 过程,但是会导致合并过程性能低下,所以推荐设置coalesce 的第二个参数为true,即启动shuffle 过程,所以推荐使用repartition。
2. A < B(少数分区分解为多数分区)
此时使用repartition 即可, 如果使用coalesce 需要将shuffle 设置为true, 否则coalesce 无效。
我们可以在filter 操作之后,使用coalesce 算子针对每个partition 的数据量各不相同的情况,压缩partition 的数量,而且让每个partition 的数据量尽量均匀紧凑,以便于后面的task 进行计算操作,在某种程度上能够在一定程度上提升性能。
注意:local 模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。

1.2.4 算子调优四:repartition 解决SparkSQL 低并行度问题

在第一节的常规性能调优中我们讲解了并行度的调节策略,但是,并行度的设置对于Spark SQL 是不生效的, 用户设置的并行度只对于Spark SQL 以外的所有Spark 的stage 生效。
Spark SQL 的并行度不允许用户自己指定, Spark SQL 自己会默认根据hive 表对应的HDFS 文件的split 个数自动设置Spark SQL 所在的那个stage 的并行度,用户自己通spark.default.parallelism 参数指定的并行度,只会在没Spark SQL 的stage中生效。
由于Spark SQL 所在stage 的并行度无法手动设置, 如果数据量较大,并且此stage 中后续的transformation 操作有着复杂的业务逻辑,而Spark SQL 自动设置的task 数量很少,这就意味着每个task 要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有Spark SQL 的stage 速度很慢,而后续的没有Spark SQL 的stage 运行速度非常快。
为了解决Spark SQL 无法设置并行度和task 数量的问题, 我们可以使用repartition 算子。
repartition算子使用前后对比图:
Spark性能调优-v2 - 图5
Spark SQL 这一步的并行度和task 数量肯定是没有办法去改变了,但是,对于Spark SQL 查询出来的RDD,立即使用repartition 算子,去重新进行分区,这样可以重新分区为多个partition, 从repartition 之后的RDD 操作, 由于不再涉及SparkSQL,因此stage 的并行度就会等于你手动设置的值,这样就避免了Spark SQL 所在的stage 只能用少量的task 去处理大量数据并执行复杂的算法逻辑。

1.2.5 算子调优五:reduceByKey 本地聚合

reduceByKey 相较于普通的shuffle 操作一个显著的特点就是会进行map 端的本地聚合,map 端会先对本地的数据进行combine 操作,然后将数据写入给下个stage的每个task 创建的文件中,也就是在map 端,对每一个key 对应的value,执行reduceByKey 算子函数。
reduceByKey 算子的执行过程:
Spark性能调优-v2 - 图6
使用reduceByKey 对性能的提升如下:
1. 本地聚合后,在map 端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;
2. 本地聚合后,下一个stage 拉取的数据量变少,减少了网络传输的数据量;
3. 本地聚合后,在reduce 端进行数据缓存的内存占用减少;
4. 本地聚合后,在reduce 端进行聚合的数据量减少。
基于reduceByKey 的本地聚合特征,我们应该考虑使用reduceByKey 代替其他的shuffle 算子,例如groupByKey。
reduceByKey 与groupByKey 的运行原理Spark性能调优-v2 - 图7
Spark性能调优-v2 - 图8
根据上图可知,groupByKey 不会进行map 端的聚合,而是将所有map 端的数据shuffle 到reduce 端, 然后在reduce 端进行数据的聚合操作。由于reduceByKey有map 端聚合的特性, 使得网络传输的数据量减小, 因此效率要明显高于groupByKey。

1.1 常规性能调优

1.1.1 常规性能调优一:最优资源配置

Spark 性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。
资源的分配在使用脚本提交Spark 任务时进行指定,标准的Spark 任务提交脚本:

spark-submit \
—class org.apache.spark.examples.SparkPi \
—master yarn
—deploy-mode cluster
—num-executors 80 \
—driver-memory 6g \
—executor-memory 6g \
—executor-cores 3 \
/root/apps/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar \
100

可以进行分配的资源如下:
—num-executors 配置Executor 的数量
—driver-memory 配置Driver 内存
—executor-memory 配置每个Executor 的内存大小
—executor-cores 配置每个Executor 的CPU core 数量
调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。
对于具体资源的分配,我们分别讨论Spark 的两种Cluster 运行模式:
第一种是Spark Standalone 模式,你在提交任务前,一定知道或者可以从运维部
门获取到你可以使用的资源情况,在编写submit 脚本的时候,就根据可用的资源情
况进行资源的分配,比如说集群有15 台机器,每台机器为8G 内存,2 个CPU core,
那么就指定15 个Executor,每个Executor 分配8G 内存,2 个CPU core。
第二种是Spark Yarn 模式,由于Yarn 使用资源队列进行资源的分配和调度,在写submit 脚本的时候,就根据Spark 作业要提交到的资源队列,进行资源的分配,
比如资源队列有400G 内存,100 个CPU core,那么指定50个Executor,每个Executor
分配8G 内存,2 个CPU core。
对各项资源进行了调节后,得到的性能提升下:
增加Executor·个数
在资源允许的情况下, 增加Executor的个数可以提高执行task 的并行度。比如有4 个Executor , 每个Executor 有2个CPU core,那么可以并行执行8 个task,如果将Executor 的个数增加到8个(资源允许的情况下),那么可以并行执行16 个task,此时的并行能力提升了一倍。
增加每个Executor 的CPU core 个数
在资源允许的情况下, 增加每个Executor 的Cpu core 个数,可以提高执行task 的并行度。比如有4 个Executor,每个Executor 有2 个CPU core, 那么可以并行执行8 个task,如果将每个Executor的CPU core 个数增加到4 个(资源允许的情况下),那么可以并行执行16 个task,此时的并行能力提升了一倍。
增加每个Executor 的内存量
在资源允许的情况下, 增加每个Executor 的内存量以后,对性能的提升有三点:
1. 可以缓存更多的数据( 即对RDD 进行cache) , 写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;
2. 可以为shuffle 操作提供更多内存,即有更多空间来存放reduce 端拉取的数据,写入磁盘的数据相应减少,甚至可以不写入磁盘,减少了可能的磁盘IO;
3. 可以为task 的执行提供更多内存,在task 的执行过程中可能创建很多对象, 内存较小时会引发频繁的GC,增加内存后, 可以避免频繁的GC,提升整体性能。
补充:生产环境Spark submit 脚本配置
spark-submit \
—class com.51doit.spark.WordCount \
—num-executors 80 \
—driver-memory 6g \
—executor-memory 6g \
—executor-cores 3 \
—master yarn-cluster \
—queue root.default \
—conf spark.yarn.executor.memoryOverhead=2048 \
—conf spark.core. connection.ack.wait.timeout =300 \
/root/wc.jar
参数配置参考值:
—num-executors:50~100
—driver-memory:1G~5G
—executor-memory:6G~10G
—executor-cores:3
—master:实际生产环境一定使用yarn-cluster

1.1.2 常规性能调优二:RDD 优化

RDD 复用

调用RDD算子时,要避免对同一个RDD 进行重复的计算,如下所示:
RDD2和RDD3是基于RDD1产生的同一个业务逻辑的rdd。
Spark性能调优-v2 - 图9
对上图中的RDD 计算架构进行修改,得到优化结果:
Spark性能调优-v2 - 图10

RDD 持久化

在Spark 中, 当多次对同一个RDD 执行算子操作时,每一次都会对这个RDD以之前的父RDD 重新计算一次,这种情况是必须要避免的,对同一个RDD 的重复计算是对资源的极大浪费,因此,必须对多次使用的RDD 进行持久化,通过持久化将公共RDD 的数据缓存到内存/磁盘中,之后对于公共RDD 的计算都会从内存/磁盘中直接获取RDD 数据。
对于RDD 的持久化,有两点需要说明:
第一,RDD 的持久化是可以进行序列化的,当内存无法将RDD 的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中。
第二,如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对RDD 数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。

减少频繁的filter 操作

获取到初始RDD 后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark 作业的运行效率。对rdd的filter操作,尽可能放在一个filter中执行,如果不是必须,不要连续调用多次filter算子。

1.1.3 常规性能调优三:并行度调节

Spark 作业中的并行度指各个stage 的task 的数量。
如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20个Executor,每个Executor 分配3 个CPU core,而Spark 作业有40 个task,这样每个Executor 分配到的task 个数是2 个,这就使得每个Executor 有一个CPU core 空闲,导致资源的浪费。
理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark 作业的性能和运行速度。
Spark 官方推荐, task 数量应该设置为Spark 作业总CPU core 数量的2~3 倍。之所以没有推荐task 数量与CPU core 总数相等,是因为task 的执行时间不同,有的task 执行速度快而有的task 执行速度慢,如果task 数量与CPU core 总数相等,那么执行快的task 执行完成后,会出现CPU core 空闲的情况。如果task 数量设置为CPU core 总数的2~3 倍,那么一个task 执行完毕后,CPU core 会立刻执行下一个task,降低了资源的浪费,同时提升了Spark 作业运行的效率。
Spark 作业并行度的设置:

val conf = new SparkConf()
.set(“spark.default.parallelism”, “500”)

1.1.4 常规性能调优四:广播大变量

默认情况下,task 中的算子中如果使用了外部的变量,每个task 都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对RDD 进行持久化,可能就无法将RDD 数据存入内存,只能写入磁盘,磁盘IO 将会严重消耗性能;另一方面,task 在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的GC,GC 会导致工作线程停止,进而导致Spark 暂停工作一段时间,严重影响Spark 性能。
假设当前任务配置了20 个Executor,指定500 个task,有一个20M 的变量被所有task 共用,此时会在500 个task 中产生500 个副本,耗费集群10G 的内存,如果使用了广播变量, 那么每个Executor 保存一个副本,一共消耗400M 内存,内存消耗减少了5 倍。
广播变量在每个Executor 保存一个副本,此Executor 的所有task 共用此广播变量,这让变量产生的副本数量大大减少。
在初始阶段,广播变量只在Driver 中有一份副本。task 在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor 对应的BlockManager 中尝试获取变量,如果本地没有,BlockManager 就会从Driver 或者其他节点的BlockManager上远程拉取变量的复本,并由本地的BlockManager 进行管理;之后此Executor 的所有task 都会直接从本地的BlockManager 中获取变量。

1.1.5 常规性能调优五:Kryo 序列化

默认情况下,Spark 使用Java 的序列化机制。Java 的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现Serializable 接口即可,但是,Java 序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。
Kryo 序列化机制比Java 序列化机制性能提高10 倍左右,Spark 之所以没有默认使用Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo 需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0 版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo 序列化方式了。
Kryo 序列化注册方式的实例代码:

public class MyKryoRegistrator implements KryoRegistrator
{
@Override
public void registerClasses(Kryo kryo)
{
kryo.register(StartupReportLogs.class);
}
}

配置Kryo 序列化方式的实例代码:

//创建SparkConf 对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo 序列化库,如果要使用Java 序列化库,需要把该行屏蔽掉
conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”);
//在Kryo 序列化库中注册自定义的类集合,如果要使用Java 序列化库,需要把该行屏蔽掉
conf.set(“spark.kryo.registrator”, “com.51doit.MyKryoRegistrator”);

1.1.6 常规性能调优六:调节本地化等待时长

Spark 作业运行过程中,Driver 会对每一个stage 的task 进行分配。根据Spark的task 分配算法,Spark 希望task 能够运行在它要计算的数据算在的节点(数据本地化思想),这样就可以避免数据的网络传输。通常来说,task 可能不会被分配到它处理的数据所在的节点, 因为这些节点可用的资源可能已经用尽, 此时, Spark会等待一段时间,默认3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将task 分配到比较差的本地化级别所对应的节点上,比如将task 分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。
当task 要处理的数据不在task 所在节点上时,会发生数据的传输。task 会通过所在节点的BlockManager 获取数据,BlockManager 发现数据不在本地时, 会通过网络传输组件从数据所在节点的BlockManager 处获取数据。
网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分task,那么当前的task 将有机会得到执行,这样就能够改善Spark作业的整体性能。
Spark 的本地化等级:
PROCESS_LOCAL 进程本地化, task 和数据在同一个Executor 中,性能最好。
NODE_LOCAL 节点本地化, task 和数据在同一个节点中,但是task 和数据不在同一个Executor中,数据需要在进程间进行传输。
RACK_LOCAL 机架本地化,task 和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。
NO_PREF 对于task 来说, 从哪里获取都一样, 没有好坏之分。
ANY task 和数据可以在集群的任何地方,而且不在一个机架中,性能最差。
在Spark 项目开发阶段,可以使用client 模式对程序进行测试, 此时,可以在本地看到比较全的日志信息,日志信息中有明确的task 数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看task 的本地化级别有没有提升,并观察Spark 作业的运行时间有没有缩短。
注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark 作业的运行时间反而增加了。
Spark 本地化等待时长的设置:

val conf = new SparkConf()
.set(“spark.locality.wait”, “6”)

1.3 Shuffle 调优

1.3.1 Shuffle 调优一:调节map 端缓冲区大小

在Spark 任务运行过程中, 如果shuffle 的map 端处理的数据量比较大, 但是map 端缓冲的大小是固定的,可能会出现map 端缓冲数据频繁spill 溢写到磁盘文件中的情况,使得性能非常低下,通过调节map 端缓冲的大小,可以避免频繁的磁盘IO 操作,进而提升Spark 任务的整体性能。
map 端缓冲的默认配置是32KB,如果每个task 处理640KB 的数据,那么会发生640/32 = 20 次溢写,如果每个task 处理64000KB 的数据,就会发生64000/32=2000次溢写,这对于性能的影响是非常严重的。
map端缓冲配置:

val conf = new SparkConf()
.set(“spark.shuffle.file.buffer”, “64”)

1.3.2 Shuffle 调优二:调节reduce 端拉取数据缓冲区大小

Spark Shuffle 过程中,shuffle reduce task 的buffer 缓冲区大小决定了reduce task
每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
reduce 端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight 参数进行设置,默认为48MB,
reduce端数据拉取缓冲区配置:

val conf = new SparkConf()
.set(“spark.reducer.maxSizeInFlight”, “96”)

1.3.3 Shuffle 调优三:调节reduce 端拉取数据重试次数

Spark Shuffle 过程中, reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle 操作的作业,建议增加重试最大次数(比如60 次),以避免由于JVM 的full gc 或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle 过程,调节该参数可以大幅度提升稳定性。
reduce 端拉取数据重试次数可以通过spark.shuffle.io.maxRetries 参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3,
reduce端拉取数据重试次数配置

val conf = new SparkConf()
.set(“spark.shuffle.io.maxRetries”, “6”)

1.3.4 Shuffle 调优四:调节reduce 端拉取数据等待间隔

Spark Shuffle 过程中,reduce task 拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大
间隔时长(比如60s),以增加shuffle 操作的稳定性。
reduce 端拉取数据等待间隔可以通过spark.shuffle.io.retryWait 参数进行设置,默认值为5s。
reduce端拉取数据等待间隔配置

val conf = new SparkConf()
.set(“spark.shuffle.io.retryWait”, “60s”)

1.3.5 Shuffle 调优五:调节SortShuffle 排序操作阈值1

对于SortShuffleManager,如果shuffle reduce task 的数量小于某一阈值则shuffle write 过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager 的方式去写数据,但是最后会将每个task 产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
当你使用SortShuffleManager 时, 如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task 的数量,那么此时map-side 就不会进行排序了,减少了排序的性能开销,但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write 性能有待提高。
SortShuffleManager 排序操作阈值的设置可以通过spark.shuffle.sort.bypassMergeThreshold 这一参数进行设置,默认值为200,
reduce端拉取数据等待间隔配置

val conf = new SparkConf()
.set(“spark.shuffle.sort.bypassMergeThreshold”, “400”)

1.4 JVM 调优

对于JVM 调优,首先应该明确,full gc/minor gc,都会导致JVM 的工作线程停止工作,即stop the world。

1.4.1 JVM 调优一:降低cache 操作的内存占比

2 静态内存管理机制

根据Spark 静态内存管理机制,堆内存被划分为了两块,Storage 和Execution。Storage 主要用于缓存RDD 数据和broadcast 数据,Execution 主要用于缓存在shuffle过程中产生的中间数据,Storage 占系统内存的60%,Execution 占系统内存的20%,并且两者完全独立。
在一般情况下,Storage 的内存都提供给了cache 操作,但是如果在某些情况下cache 操作内存不是很紧张,而task 的算子中创建的对象很多,Execution 内存又相对较小, 这回导致频繁的minor gc,甚至于频繁的full gc,进而导致Spark 频繁的停止工作,性能影响会很大。
在Spark UI 中可以查看每个stage 的运行情况, 包括每个task 的运行时间、gc时间等等,如果发现gc 太频繁,时间太长,就可以考虑调节Storage 的内存占比,
让task 执行算子函数式,有更多的内存可以使用。
Storage 内存区域可以通过spark.storage.memoryFraction 参数进行指定,默认为0.6,即60%,可以逐级向下递减。
Storage内存占比设置:

val conf = new SparkConf()
.set(“spark.storage.memoryFraction”, “0.4”)

3 统一内存管理机制

根据Spark 统一内存管理机制,堆内存被划分为了两块,Storage 和Execution。Storage 主要用于缓存数据,Execution 主要用于缓存在shuffle 过程中产生的中间数据,两者所组成的内存部分称为统一内存,Storage 和Execution 各占统一内存的50%,由于动态占用机制的实现,shuffle 过程需要的内存过大时, 会自动占用Storage 的内存区域,因此无需手动进行调节。

1.4.2 JVM 调优二:调节Executor 堆外内存

Executor 的堆外内存主要用于程序的共享库、Perm Space、线程Stack 和一些Memory mapping 等, 或者类C 方式allocate object。
有时,如果你的Spark 作业处理的数据量非常大,达到几亿的数据量,此时运行Spark 作业会时不时地报错,例如shuffle output file cannot find,executor lost,task lost,out of memory 等,这可能是Executor 的堆外内存不太够用,导致Executor 在运行的过程中内存溢出。
stage 的task在运行的时候,可能要从一些Executor 中去拉取shuffle map output文件,但是Executor 可能已经由于内存溢出挂掉了,其关联的BlockManager 也没有了, 这就可能会报出shuffle output file cannot find, executor lost, task lost, out of memory 等错误,此时,就可以考虑调节一下Executor 的堆外内存, 也就可以避免报错,与此同时,堆外内存调节的比较大的时候,对于性能来讲,也会带来一定的提升。
默认情况下,Executor 堆外内存上限大概为300多MB,在实际的生产环境下,对海量数据进行处理的时候,这里都会出现问题,导致Spark 作业反复崩溃,无法运行,此时就会去调节这个参数,到至少1G,甚至于2G、4G。
Executor 堆外内存的配置需要在spark-submit 脚本里配置
Executor堆外内存配置:

—conf spark.yarn.executor.memoryOverhead=2048

以上参数配置完成后,会避免掉某些JVM OOM 的异常问题,同时,可以提升整体Spark 作业的性能。

1.4.3 JVM 调优三:调节连接等待时长

在Spark 作业运行过程中,Executor 优先从自己本地关联的BlockManager 中获取某份数据,如果本地BlockManager 没有的话,会通过TransferService 远程连接其他节点上Executor 的BlockManager 来获取数据。
如果task 在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这会导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark 的Executor 进程就会停止工作,无法提供相应,此时, 由于没有响应,无法建立网络连接,会导致网络连接超时。
在生产环境下,有时会遇到file not found、file lost 这类错误,在这种情况下,很有可能是Executor 的BlockManager 在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长60s 后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark 作业的崩溃。这种情况也可能会导致DAGScheduler 反复提交几次stage,TaskScheduler 反复提交几次task,大大延长了我们的Spark 作业的运行时间。
此时,可以考虑调节连接的超时时长,连接等待时长需要在spark-submit 脚本中进行设置,
连接等待时长配置

—conf spark.core.connection.ack.wait.timeout=300

调节连接等待时长后, 通常可以避免部分的XX 文件拉取失败、XX 文件lost等报错。

4 第二章Spark 数据倾斜

4.1 数据倾斜综述

Spark 中的数据倾斜问题主要指shuffle 过程中出现的数据倾斜问题,是由于不同的key 对应的数据量不同导致的不同task 所处理的数据量不同的问题。
例如,reduce 点一共要处理100 万条数据,第一个和第二个task 分别被分配到了1 万条数据,计算5 分钟内完成, 第三个task 分配到了98 万数据,此时第三个task 可能需要10 个小时完成,这使得整个Spark 作业需要10 个小时才能运行完成,这就是数据倾斜所带来的后果。
注意,要区分开数据倾斜与数据量过量这两种情况,数据倾斜是指少数task 被分配了绝大多数的数据,因此少数task 运行缓慢; 数据过量是指所有task 被分配的数据量都很大,相差不多,所有task 都运行缓慢。

4.2 数据倾斜的表现:

  1. Spark 作业的大部分task 都执行迅速,只有有限的几个task 执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;
    2. Spark 作业的大部分task 都执行迅速,但是有的task 在运行过程中会突然报出OOM,反复执行几次都在某一个task 报出OOM 错误,此时可能出现了数据倾斜,作业无法正常运行。

    4.3 定位数据倾斜问题:

  2. 查阅代码中的shuffle 算子,例如reduceByKey、countByKey、groupByKey、join 等算子,根据代码逻辑判断此处是否会出现数据倾斜;
    2. 查看Spark 作业的log文件,log文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage,对应的shuffle算子是哪一个;

    4.4 解决方案一:聚合原数据

    4.4.1 避免shuffle 过程

    绝大多数情况下,SparkSQL作业的数据来源都是Hive 表,这些Hive 表基本都是经过ETL 之后的昨天的数据。
    为了避免数据倾斜,我们可以考虑避免shuffle 过程,如果避免了shuffle 过程,那么从根本上就消除了发生数据倾斜问题的可能。
    如果Spark 作业的数据来源于Hive 表,那么可以先在Hive 表中对数据进行聚合,例如按照key 进行分组,将同一key 对应的所有value 用一种特殊的格式拼接到一个字符串里去,这样,一个key 就只有一条数据了;之后,对一个key 的所有value进行处理时,只需要进行map 操作即可,无需再进行任何的shuffle 操作。通过上述式就避免了执行shuffle 操作,也就不可能会发生任何的数据倾斜问题。
    对于Hive 表中数据的操作,不一定是拼接成一个字符串,也可以是直接对key的每一条数据进行累计计算。

    4.5 解决方案二:过滤导致倾斜的key

    如果在Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的key 进行过滤,滤除可能导致数据倾斜的key 对应的数据,这样,在Spark 作业中就不会发生数据倾斜了。

    4.6 解决方案三:提高shuffle 操作中的reduce 并行度

    当方案一和方案二对于数据倾斜的处理没有很好的效果时, 可以考虑提高shuffle 过程中的reduce 端并行度, reduce 端并行度的提高就增加了reduce 端task的数量,那么每个task 分配到的数据量就会相应减少,由此缓解数据倾斜问题。

    4.6.1 reduce 端并行度的设置

    在大部分的shuffle 算子中, 都可以传入一个并行度的设置参数, 比如reduceByKey(+,500),这个参数会决定shuffle 过程中reduce 端的并行度,在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。对于Spark SQL 中的shuffle
    类语句,比如group by、join 等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task 的并行度,该值默认是200,对于很多场景来说都有点过小。
    增加shuffle read task 的数量,可以让原本分配给一个task 的多个key 分配给多个task,从而让每个task 处理比原来更少的数据。举例来说,如果原本有5 个key,每个key 对应10 条数据,这5 个key 都是分配给一个task 的, 那么这个task 就要处理50 条数据。而增加了shuffle read task 以后,每个task 就分配到一个key,即每个task 就处理10 条数据,那么自然每个task 的执行时间都会变短了。

    4.6.2 reduce 端并行度设置存在的缺陷

    提高reduce 端并行度并没有从根本上改变数据倾斜的本质和问题(方案一和方案二从根本上避免了数据倾斜的发生),只是尽可能地去缓解和减轻shuffle reduce task 的数据压力,以及数据倾斜的问题,适用于有较多key 对应的数据量都比较大的情况。
    该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100 万,那么无论你的task 数量增加到多少,这个对应着100 万数据的key 肯定还是会分配到一个task 中去处理,因此注定还是会发生数据倾斜的。
    所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。
    在理想情况下,reduce 端并行度提升后,会在一定程度上减轻数据倾斜的问题,甚至基本消除数据倾斜;但是,在一些情况下,只会让原来由于数据倾斜而运行缓慢的task 运行速度稍有提升,或者避免了某些task 的OOM 问题,但是,仍然运行缓慢,此时,要及时放弃方案三,开始尝试后面的方案。

    4.7 解决方案四:使用随机key 实现双重聚合

    当使用了类似于groupByKey、reduceByKey 这样的算子时,可以考虑使用随机key 实现双重聚合
    随机key 实现双重聚合:
    Spark性能调优-v2 - 图11
    首先,通过map 算子给每个数据的key 添加随机数前缀,对key 进行打散,将原先一样的key 变成不一样的key,然后进行第一次聚合, 这样就可以让原本被一个task 处理的数据分散到多个task 上去做局部聚合;随后,去除掉每个key 的前缀,再次进行聚合。
    此方法对于由groupByKey、reduceByKey 这类算子造成的数据倾斜由比较好的效果,仅仅适用于聚合类的shuffle 操作,适用范围相对较窄。如果是join 类的shuffle操作,还得用其他的解决方案。
    此方法也是前几种方案没有比较好的效果时要尝试的解决方案。

    4.8 解决方案五:将reduce join 转换为map join

    正常情况下,join 操作都会执行shuffle 过程,并且执行的是reduce join,也就是先将所有相同的key 和对应的value 汇聚到一个reduce task 中,然后再进行join。
    普通join 的过程如下图所示:
    Spark性能调优-v2 - 图12
    普通的join 是会走shuffle 过程的,而一旦shuffle,就相当于会将相同key 的数据拉取到一个shuffle read task 中再进行join,此时就是reduce join。但是如果一个RDD 是比较小的,则可以采用广播小RDD 全量数据+map 算子来实现与join 同样的效果,也就是map join,此时就不会发生shuffle 操作,也就不会发生数据倾斜。
    (注意,RDD不能进行广播的,只能将RDD 内部的数据通过collect 拉取到Driver 内存然后再进行广播)

    1. 核心思路:

    不使用join 算子进行连接操作, 而使用Broadcast 变量与map 类算子实现join操作,进而完全规避掉shuffle 类的操作,彻底避免数据倾斜的发生和出现。将较小RDD 中的数据直接通过collect 算子拉取到Driver 端的内存中来,然后对其创建一个Broadcast 变量; 接着对另外一个RDD 执行map 类算子, 在算子函数内, 从Broadcast 变量中获取较小RDD 的全量数据, 与当前RDD 的每一条数据按照连接key 进行比对,如果连接key 相同的话,那么就将两个RDD 的数据用你需要的方式连接起来。
    根据上述思路,根本不会发生shuffle 操作,从根本上杜绝了join 操作可能导致的数据倾斜问题。
    当join 操作有数据倾斜问题并且其中一个RDD 的数据量较小时, 可以优先考虑这种方式,效果非常好。map join 的过程如图所示:
    Spark性能调优-v2 - 图13

    2. 不适用场景分析:

    由于Spark 的广播变量是在每个Executor 中保存一个副本,如果两个RDD 数据量都比较大,那么如果将一个数据量比较大的RDD 做成广播变量,那么很有可能会造成内存溢出。

    4.9 解决方案六:sample 采样对倾斜key 单独进行join

    在Spark 中, 如果某个RDD 只有一个key, 那么在shuffle 过程中会默认将此key 对应的数据打散,由不同的reduce 端task 进行处理。
    当由单个key 导致数据倾斜时,可有将发生数据倾斜的key 单独提取出来,组成一个RDD,然后用这个原本会导致倾斜的key 组成的RDD 根其他RDD 单独join,此时,根据Spark 的运行机制,此RDD 中的数据会在shuffle 阶段被分散到多个task中去进行join 操作。
    倾斜key 单独join 流程:
    Spark性能调优-v2 - 图14

    1.适用场景分析:

    对于RDD 中的数据,可以将其转换为一个中间表,或者是直接使用countByKey()的方式,看一个这个RDD 中各个key 对应的数据量,此时如果你发现整个RDD 就一个key 的数据量特别多,那么就可以考虑使用这种方法。
    当数据量非常大时, 可以考虑使用sample 采样获取10%的数据, 然后分析这10%的数据中哪个key 可能会导致数据倾斜,然后将这个key 对应的数据单独提取出来。

    2. 不适用场景分析:

    如果一个RDD 中导致数据倾斜的key 很多,那么此方案不适用。

    4.10 解决方案七:使用随机数以及扩容进行join

    如果在进行join 操作时,RDD 中有大量的key 导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了,对于join 操作,我们可以考虑对其中一个RDD 数据进行扩容,另一个RDD 进行稀释后再join。
    我们会将原先一样的key 通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task 中去处理,而不是让一个task 处理大量的相同key。这一种方案是针对有大量倾斜key 的情况,没法将部分key 拆分出来进行单独处理,需要对整个RDD 进行数据扩容,对内存资源要求很高。

    1. 核心思想:

    选择一个RDD,使用flatMap 进行扩容,对每条数据的key 添加数值前缀(1~N的数值),将一条数据映射为多条数据;(扩容)
    选择另外一个RDD,进行map 映射操作,每条数据的key 都打上一个随机数作为前缀(1~N 的随机数);(稀释)
    将两个处理后的RDD,进行join 操作。
    使用随机数以及扩容进行join:
    Spark性能调优-v2 - 图15

    2. 局限性:

    如果两个RDD 都很大,那么将RDD 进行N 倍的扩容显然行不通;使用扩容的方式只能缓解数据倾斜,不能彻底解决数据倾斜问题。

    3. 使用方案七对方案六进一步优化分析:

    当RDD 中有几个key 导致数据倾斜时,方案六不再适用,而方案七又非常消耗资源,此时可以引入方案七的思想完善方案六:
    1. 对包含少数几个数据量过大的key 的那个RDD,通过sample 算子采样出一份样本来,然后统计一下每个key 的数量,计算出来数据量最大的是哪几个key。
    2. 然后将这几个key 对应的数据从原来的RDD 中拆分出来,形成一个单独的RDD,并给每个key 都打上n 以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
    3. 接着将需要join 的另一个RDD,也过滤出来那几个倾斜key 对应的数据并形成一个单独的RDD,将每条数据膨胀成n 条数据,这n 条数据都按顺序附加一个0~n 的前缀,不会导致倾斜的大部分key 也形成另外一个RDD。
    4. 再将附加了随机前缀的独立RDD 与另一个膨胀n 倍的独立RDD 进行join,此时就可以将原先相同的key 打散成n 份,分散到多个task 中去进行join 了。
    5. 而另外两个普通的RDD 就照常join 即可。
    6. 最后将两次join 的结果使用union 算子合并起来即可,就是最终的join 结果。

    5 第三章Spark Troubleshooting

    5.1 故障排除一:控制reduce 端缓冲大小以避免OOM

    在Shuffle 过程,reduce 端task 并不是等到map 端task 将其数据全部写入磁盘后再去拉取,而是map 端写一点数据,reduce 端task 就会拉取一小部分数据,然后立即进行后面的聚合、算子函数的使用等操作。
    reduce 端task 能够拉取多少数据,由reduce 拉取数据的缓冲区buffer 来决定,因为拉取过来的数据都是先放在buffer 中,然后再进行后续的处理,buffer 的默认大小为48MB。
    reduce 端task 会一边拉取一边计算,不一定每次都会拉满48MB 的数据,可能大多数时候拉取一部分数据就处理掉了。
    虽然说增大reduce 端缓冲区大小可以减少拉取次数, 提升Shuffle 性能,但是有时map 端的数据量非常大,写出的速度非常快,此时reduce 端的所有task 在拉取的时候, 有可能全部达到自己缓冲的最大极限值,即48MB,此时, 再加上reduce端执行的聚合函数的代码, 可能会创建大量的对象, 这可难会导致内存溢出, 即OOM。
    如果一旦出现reduce 端内存溢出的问题,我们可以考虑减小reduce 端拉取数据缓冲区的大小,例如减少为12MB。
    在实际生产环境中是出现过这种问题的, 这是典型的以性能换执行的原理。
    reduce 端拉取数据的缓冲区减小,不容易导致OOM,但是相应的,reudce 端的拉取次数增加,造成更多的网络传输开销,造成性能的下降。
    注意,要保证任务能够运行,再考虑性能的优化。

    5.2 故障排除二:JVM GC 导致的shuffle 文件拉取失败

    在Spark 作业中,有时会出现shuffle file not found 的错误,这是非常常见的一个报错,
    有时出现这种错误以后,选择重新执行一遍,就不再报出这种错误。
    出现上述问题可能的原因是Shuffle 操作中,后面stage 的task 想要去上一个stage 的task所在的Executor 拉取数据,结果对方正在执行GC,执行GC 会导致executor 内所有的工作现场全部停止,比如BlockManager、基于netty 的网络通信等,这就会导致后面的task 拉取数据拉取了半天都没有拉取到,就会报出shuffle file not found 的错误,而第二次再次执行就不会再出现这种错误。
    可以通过调整reduce 端拉取数据重试次数和reduce 端拉取数据时间间隔这两个参数来对Shuffle 性能进行调整,增大参数值,使得reduce 端拉取数据的重试次数增加,并且每次失败后等待的时间间隔加长。
    JVM GC导致的shuffle文件拉取失败:
val conf = new SparkConf()
.set(“spark.shuffle.io.maxRetries”, “6”)
.set(“spark.shuffle.io.retryWait”, “6s”)

5.3 故障排除三:解决各种序列化导致的报错

当Spark 作业在运行过程中报错,而且报错信息中含有Serializable 等类似词汇,那么可能是序列化问题导致的报错。
序列化问题要注意以下三点:
1. 作为RDD 的元素类型的自定义类,必须是可以序列化的;
2. 算子函数里可以使用的外部的自定义变量,必须是可以序列化的;
3. 不可以在RDD 的元素类型、算子函数里使用第三方的不支持序列化的类型,例如Connection。

5.4 故障排除四:解决算子函数返回NULL 导致的问题

在一些算子函数里,需要我们有一个返回值,但是在一些情况下我们不希望有返回值,此时我们如果直接返回NULL,会报错,例如Scala.Math(NULL)异常。
如果你遇到某些情况,不希望有返回值,那么可以通过下述方式解决:
1. 返回特殊值,不返回NULL,例如“-1”;
2. 在通过算子获取到了一个RDD 之后,可以对这个RDD 执行filter 操作,进行数据过滤,将数值为-1 的数据给过滤掉;
3. 在使用完filter 算子后,继续调用coalesce 算子进行优化。

5.5 故障排除五:解决YARN-CLIENT 模式导致的网卡流量激增问题

YARN-client 模式的运行原理如下图所示:
Spark性能调优-v2 - 图16
在YARN-client 模式下, Driver 启动在本地机器上,而Driver 负责所有的任务调度,需要与YARN 集群上的多个Executor 进行频繁的通信。
假设有100 个Executor, 1000 个task,那么每个Executor 分配到10 个task之后,Driver 要频繁地跟Executor 上运行的1000 个task 进行通信,通信数据非常多,并且通信品类特别高。这就导致有可能在Spark 任务运行过程中,由于频繁大量的网络通讯,本地机器的网卡流量会激增。
注意,YARN-client 模式只会在测试环境中使用,而之所以使用YARN-client 模式,是由于可以看到详细全面的log 信息,通过查看log,可以锁定程序中存在的问题,避免在生产环境下发送故障。
在生产环境下,使用的主要是YARN-cluster 模式。在YARN-cluster 模式下,就不会造成本地机器网卡流量激增问题,如果YARN-cluster 模式下存在网络通信的问题,需要运维团队进行解决。

5.6 故障排除六:解决YARN-CLUSTER 模式的JVM 栈内存溢出无法执行问题

YARN-cluster 模式的运行原理如下图所示:
Spark性能调优-v2 - 图17
当Spark 作业中包含SparkSQL 的内容时,可能会碰到YARN-client 模式下可以运行,但是YARN-cluster 模式下无法提交运行(报出OOM 错误)的情况。
YARN-client 模式下,Driver 是运行在本地机器上的,Spark 使用的JVM 的PermGen 的配置(JDK1.8 之前),是本地机器上的spark-class 文件,JVM 永久代的大小是128MB,这个是没有问题的,但是在YARN-cluster 模式下,Driver 运行在YARN 集群的某个节点上,使用的是没有经过配置的默认设置,PermGen 永久代大小为82MB。
SparkSQL 的内部要进行很复杂的SQL 的语义解析、语法树转换等等,非常复杂,如果sql 语句本身就非常复杂,那么很有可能会导致性能的损耗和内存的占用,特别是对PermGen的占用会比较大。
所以, 此时如果PermGen 的占用好过了82MB , 但是又小于128MB , 就会出现YARN-client 模式下可以运行,YARN-cluster 模式下无法运行的情况。
解决上述问题的方法时增加PermGen 的容量,需要在spark-submit 脚本中对相关参数进
行设置。
配置如下:

—conf spark.driver.extraJavaOptions=”-XX:PermSize=128M -XX:MaxPermSize=256M”

通过上述方法就设置了Driver 永久代的大小,默认为128MB, 最大256MB,这样就可以避免上面所说的问题。

5.7 故障排除七:解决SparkSQL 导致的JVM 栈内存溢出

当SparkSQL 的sql 语句有成百上千的or 关键字时,就可能会出现Driver 端的JVM 栈内存溢出。
JVM 栈内存溢出基本上就是由于调用的方法层级过多,产生了大量的,非常深的,超出了JVM 栈深度限制的递归。(我们猜测SparkSQL 有大量or 语句的时候,在解析SQL 时,例如转换为语法树或者进行执行计划的生成的时候,对于or 的处理是递归,or 非常多时,会发生大量的递归)
此时,建议将一条sql 语句拆分为多条sql 语句来执行,每条sql 语句尽量保证100 个以内的子句。根据实际的生产环境试验,一条sql 语句的or 关键字控制在100个以内,通常不会导致JVM 栈内存溢出。

5.8 故障排除八:持久化与checkpoint 的使用

Spark 持久化在大部分情况下是没有问题的,但是有时数据可能会丢失,如果数据一旦丢失,就需要对丢失的数据重新进行计算,计算完后再缓存和使用,为了避免数据的丢失,可以选择对这个RDD 进行checkpoint,也就是将数据持久化一份到容错的文件系统上(比如HDFS)。
一个RDD 缓存并checkpoint 后, 如果一旦发现缓存丢失, 就会优先查看checkpoint 数据存不存在,如果有, 就会使用checkpoint 数据, 而不用重新计算。也即是说,checkpoint 可以视为cache 的保障机制,如果cache 失败,就使用checkpoint的数据。
使用checkpoint 的优点在于提高了Spark 作业的可靠性,一旦缓存出现问题,不需要重新计算数据,缺点在于,checkpoint 时需要将数据写入HDFS 等文件系统,对性能的消耗较大。