Spark 调优汇总 - 图1
    阅读文本大概需要 9 分钟。

    之前的两篇文章

    Spark 调优一瞥 | shuffle 调优
    Spark 的 数据本地化

    分别讲了 shuffle 的参数调优和数据本地化的调优。

    本文将从以下几个方面来介绍一下 Spark 的调优。

    1. 资源调优
    2. 并行度调优
    3. 代码调优
    4. 数据本地化
    5. 内存调优
    6. shuffle 参数
    7. 堆外内存
    8. 数据倾斜 (内容较多, 下篇文章见)

    1
    资源调优

    在部署集群的时候,指定资源分配的默认参数,需要在 conf 下面的 spark-env.sh 里面指定

    SPARK_WORK_CORES 指定每个 worker 分配的核数
    SPARK_WORK_MEMORY 指定每个 worker 分配的内存
    SPARK_WORK_INSTANCES 指定每台机器启动的 worker 数量

    在提交 APPLICATION 的时候,给当前的任务分配更多的资源,在 spark-submit 命令后面指定以下参数:
    —executor-cores
    —executor-memory
    —total-executor-cores

    当然也可以在配置文件 Spark-default.conf 里面配置
    spark.executor.cores
    spark.executor.memory
    spark.max.cores

    动态的分配资源

    spark.dynamicAllocation.enabled true 开启动态资源分配。

    2
    并行度优化

    如果读取的数据在 HDFS 上,降低 Block 的大小,相当于提高了 RDD 中 partition 的个数。

    也可以直接加上参数 numPartitions 来手动的给定分区数。

    也可以使用 repartitions / coalesce 算子来改变分区。

    在使用 reduceByKey / groupByKey / join 的时候,也可以加上 numPartitions 这个参数的。

    自定义分区器,Partitioner 。

    如果读取的数据是在 SparkStreaming 中,Receive 模式的话,并行度是由 batch Interval 和 block Interval 来决定的,默认分别是 5 秒和 200 ms。
    Direct 模式的话,kafka 的 topic 的分区数就是 RDD 的分区的并行度。

    3
    代码调优

    避免创建重复的 RDD, 可以复用同一个 RDD。

    对多次使用的 RDD 进行持久化,这里要思考如何选择一种最合适的持久化策略。

    默认情况下,性能最优的是 MEMORY_ONLY ,但是前提是我们的内存必须足够大,要不然很容易导致 OOM。资源受限的时候,可以降一级,使用 MEMORY_ONLY_SER 对数据序列化再保存到内存中,这时的 partition 仅仅是一个字节数组,大大减少了对象的数量,并且降低了内存占用,但是这种级别加大了性能的消耗。如果这时候,数据还是很大,还是很有可能导致 OOM 的,我们还可以再降级,使用 MEMORY_AND_DISK_SER 策略,不建议后面带上副本数,这种策略会优先把数据缓存到内存中,内存缓存不下去才写到磁盘。

    这里必须提到持久化算子:cache / persist / checkpoint

    cache 只有 MEMORY_ONLY 策略
    persist 有很多选择策略,上面出现的,都可以。
    checkpoint 如果一个 RDD 的计算时间比较长或者计算起来很复杂,一般都将这个 RDD 的计算结果保存到 HDFS 上,这样数据会更加安全。
    如果一个 RDD 的依赖关系非常长,也会使用 checkpoint, 会切断依赖关系,提高容错的效率。

    注意它们三个区别,前两个是为了提高性能,任务执行结束之后会把主动地把临时存数据的目录删掉。但是 checkpoint 是为了容错,需要显示的指定一个目录,可以是 HDFS 上的目录。

    避免使用 shuffle 类的算子,这种情况下:有一个 RDD 很小,可以使用广播变量来代替 join。join 算子 = 广播变量 + filter/map/flatMap。

    使用 map-side 预聚合的 shuffle 算子,即尽量使用有 combiner 的算子。
    好处:可以减少 shuffle write 写磁盘的数据量,还可以减少 shuffle read 拉取数据的大小,最后还可以减少 reduce 任务的聚合次数。

    下面这些算子是由有 combiner 的:
    reduceByKey, 所以在有些场景下可以代替 groupByKey 。
    aggregateByKey, 可以自定义在 map 和 reduce 端的逻辑。
    combineByKey

    尽可以能使用高性能的算子,比如像下面这样:
    使用 reduceByKey 代替 groupBykey。
    使用 mapPartition 代替 map。
    使用 foreachPartition 代替 foreach。
    filter 之后使用 coalesce 减少分区。
    使用 repartition 和 coalesce 来操作分区

    使用广播变量,同一个 Executor 中的 task 公用一份变量集合。可以避免多余的变量副本占用内存,避免导致频繁的 GC。广播变量的发送方式,Executor 一开始并没有广播变量,而是 task 运行的时候需要广播变量,会找到 BlockManager 要这个变量,BlockManager 会去找 Driver 的BlockManagerMaster 要。

    使用 kryo 优化序列化,在 spark 中使用的最多的三个地方是:广播变量、RDD 自定义类型,RDD 持久化的时候。

    Kryo 比 Java 默认的序列化速度要快,序列化之后的数据要小,大概是 1/10 。Spark 默认使用 Java 的 ObjectOutputStream 来序列化,但是 Spark 也支持 Kryo 来序列化。需要在代码里面指定:

    1. SparkConf.set("spark.serializer",
    2. "org.apache.spark.serializer.KryoSerializer")
    3. .registerKyroClasses(new Class[]{xxxx.class})

    优化数据结构,Java 中有三种类型比较消耗内存:
    对象,每个对象都有对象头,引用等额外的信息,比较占用内存。
    字符串,每一个字符串内部都有一个字符数组以及长度等额外的信息。
    集合类型,集合内部会使用内部类来封装集合元素。

    官方建议,推荐使用字符串来替代对象,用原始类型 Long, Int 等来替代字符串,使用数组来代替集合类型。这样可以尽可能的减少内存占用,从而降低 GC 频率,提升性能。

    使用高性能库 fashutil 库,扩展了 Java 的标准集合框架,能够占用更小的内存,更快的存取速度。JDK 要求 7 以及以上版本。

    4
    数据本地化

    Spark 调优汇总 - 图2
    往期的文章很详细的说明了什么是本地化,数据本地化是谁来负责,具体流程以及调优建议。可以戳 Spark 的 数据本地化,提供最佳的计算节点

    5
    内存优化

    这里放一张图,堆内存对象分配。**

    Spark 调优汇总 - 图3
    Spark JVM 调优主要是降低 GC 的时间,可以修改 Executor 内存的比例参数,可以参考这篇文章 Spark 的 shuffle 文件寻址流程,文章的最后提到了 Spark 的统一内存分配以及调优。比如我们可以根据业务来调整内存的大小以及分配。
    Spark 调优汇总 - 图4
    Spark 调优汇总 - 图5

    6
    shuffle 参数调优

    Spark 基于内存进行计算,擅长迭代计算,流式处理,但也会发生shuffle 过程。shuffle 的优化,以及避免产生 shuffle 会给程序提高更好的性能。因为
    shuffle 的性能优劣直接决定了整个计算引擎的性能和吞吐量。**

    从 shuffle write 和 shuffle read 的角度来分析。可以参考这篇文章:
    Spark 调优一瞥 | shuffle 调优

    7
    调节 Executor 堆外内存

    Spark 调优汇总 - 图6
    Spark 底层 shuffle 的传输方式使用 netty 传输,
    netty 在进行网络传输的过程中会申请堆外内存(netty 是零拷贝**),所以使用了堆外内存。拉取数据的过程,可以查看这篇文章:Spark 的 shuffle 文件寻址流程,默认情况下,堆外内存默认上限是每个 Executor 的 10%,建议这个内存给到 1 G,
    可以在提交内存的时候加上配置:

    1. # yarn 模式
    2. --conf spark.yarn.executor.memeoryOverhead=20148
    1. # standalone 模式
    2. --conf spark.executor.memeoryOverhead=20148

    8
    数据倾斜

    内容较多,本文篇幅就已经很多了,所以留到下篇文章见喽。

    **
    今天的你,好看吗?