一、Explain 查看执行计划

要优化 SparkSQL 应用时,一定是要了解 SparkSQL 执行计划的。发现 SQL 执行慢的根 本原因,才能知道应该在哪儿进行优化,是调整 SQL 的编写方式、还是用 Hint、还是调参, 而不是把优化方案拿来试一遍。

1.1、基本语法

.explain(mode=”xxx”)
从 3.0 开始,explain 方法有一个新的参数 mode,该参数可以指定执行计划展示格式:
➢ explain(mode=”simple”):只展示物理执行计划。
➢ explain(mode=”extended”):展示物理执行计划和逻辑执行计划。
➢ explain(mode=”codegen”) :展示要 Codegen 生成的可执行 Java 代码。
➢ explain(mode=”cost”):展示优化后的逻辑执行计划以及相关的统计。
➢ explain(mode=”formatted”):以分隔的方式输出,它会输出更易读的物理执行计划, 并展示每个节点的详细信息。

1.2、执行计划处理流程

image.png
核心的执行过程一共有5个步骤:
image.png
这些操作和计划都是 Spark SQL 自动处理的,会生成以下计划:

  • Unresolved 逻辑执行计划:== Parsed Logical Plan ==
    • Parser 组件检查 SQL 语法上是否有问题,然后生成 Unresolved(未决断)的逻辑计划, 不检查表名、不检查列名。
  • Resolved 逻辑执行计划:== Analyzed Logical Plan ==
    • 通过访问 Spark 中的 Catalog 存储库来解析验证语义、列名、类型、表名等。
  • 优化后的逻辑执行计划:== Optimized Logical Plan ==
    • Catalyst 优化器根据各种规则进行优化。
  • 物理执行计划:== Physical Plan ==
    • 1)HashAggregate 运算符表示数据聚合,一般 HashAggregate 是成对出现,第一个 HashAggregate 是将执行节点本地的数据进行全部聚合,另一个 HashAggregate 是将各个分区的数据进一步进行聚合计算。
    • 2)Exchange 运算符其实就是 shuffle,表示需要在集群上移动数据。很多时候 HashAggregate 会以 Exchange 分隔开来。
    • 3)Project 运算符是 SQL 中的投影操作,就是选择列(例如:select name, age…)。
    • 4)BroadcastHashJoin 运算符表示通过基于广播方式进行 HashJoin。
    • 5)LocalTableScan 运算符就是全表扫描本地的表

1.3、解析样例

  1. import com.atguigu.sparktuning.utils.InitUtil
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.sql.SparkSession
  4. object ExplainDemo {
  5. def main( args: Array[String] ): Unit = {
  6. val sparkConf = new SparkConf().setAppName("ExplainDemo")
  7. // .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
  8. val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
  9. val sqlstr =
  10. """
  11. |select
  12. | sc.courseid,
  13. | sc.coursename,
  14. | sum(sellmoney) as totalsell
  15. |from sale_course sc join course_shopping_cart csc
  16. | on sc.courseid=csc.courseid and sc.dt=csc.dt and sc.dn=csc.dn
  17. |group by sc.courseid,sc.coursename
  18. """.stripMargin
  19. sparkSession.sql("use sparktuning;")
  20. // sparkSession.sql(sqlstr).show()
  21. // while(true){}
  22. println("=====================================explain()-只展示物理执行计划============================================")
  23. sparkSession.sql(sqlstr).explain()
  24. println("===============================explain(mode = \"simple\")-只展示物理执行计划=================================")
  25. sparkSession.sql(sqlstr).explain(mode = "simple")
  26. println("============================explain(mode = \"extended\")-展示逻辑和物理执行计划==============================")
  27. sparkSession.sql(sqlstr).explain(mode = "extended")
  28. println("============================explain(mode = \"codegen\")-展示可执行java代码===================================")
  29. sparkSession.sql(sqlstr).explain(mode = "codegen")
  30. println("============================explain(mode = \"formatted\")-展示格式化的物理执行计划=============================")
  31. sparkSession.sql(sqlstr).explain(mode = "formatted")
  32. }
  33. }

image.png

1.4、使用Spark UI

也可以在UI上怎么看执行计划,不管是正在运行的还是运行结束的。在浏览器打开Spark UI。但是请注意你的Spark UI是运行在哪的,如果是运行在本地,并且正在运行,Spark UI的默认端口是4040,你可以通过(http://localhost:4040)去访问。在SQL标签下,可以找到你的执行计划。
image.png
点击description去看Spark有向无环图的图形界面去看执行计划。在这里你需要从上往下读信息,你可以在底部展开详细信息去看执行计划。这个信息和使用explain API是非常相似的。
image.png

二、资源调优

2.1、资源配置调优原理

为什么资源配置对于性能的提升这么明显呢?这里涉及到了应用程序对资源的申请和分配了。首先我们知道,Task从生成到执行并返回结果的流程1如下(如下图及说明):
image.png

  1. DAGScheduler将一个Job划分为多个Stage,又将Stage划分生成TaskSet。
  2. DAGScheduler将TaskSet交给TaskScheduler去提交。
  3. TaskScheduler将TaskSet中的Task逐个提交到它对应的Executor上去执行。一个Task提交给一个Executor。
  4. Executor从线程池取出一个线程执行,并返回执行结果。(可见,只要线程数(即CPU数)充足,多个Task可以提交给同一个Executor的)

由此,我们可以知道,Task是Spark任务的核心,而Task是在Executor上执行的,因此,对资源的配置,主要是为Executor配置资源。

2.2、资源配置说明

2.2.1、资源配置种类

在资源配置时,我我们主要配置以下种类的资源:Task执行并行度 = Executor数量 每个Executor的CPU数量 (当然了,每个Executor的CPU数量可能不同)
*RDD 与 Task 的关系说明
:RDD在计算的时候,每个分区都会起一个Task,所以RDD的分区数目决定了总的的Task数目。每个Task执行的结果就是生成了目标RDD的一个Partiton。

类别 说明
Executor数量 从上面的公式可知,如果Executor数量比较少,那么,能够并行执行的Task数量就比较少,就意味着,我们的Application的并行执行的能力就很弱。
  比如有10个Executor,每个Executor有10个CPU core,那么同时能够并行执行的Task就是100个。100个执行完以后,再换下一批100个task。增加了Executor数量意味着增加了能够并行执行的Task数量。比如原先是100个,现在可能可以并行执行200个,2000个,甚至20000个。那么并行能力就比之前提升了数倍,数十倍。相应的,性能(执行的速度),也能提升数倍~数十倍。
Executor CPU 同理,增加每个Executor的CPU core,也是增加了执行的并行能力。
Executor Memory 增加每个Executor的内存量。增加了内存量以后,对性能的提升,有三点:
1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。
2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给Executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能。
3、对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。
Driver Memory 增加Driver的内存量。增加Driver内存量对性能的提升主要体现在:
1、DAGscheduler在Stage划分过程中,产生大量的Task对象,会占用大量的内存,如果内存不足也会导致频繁GC。SparkContext本身维护了大量的对象,也占用很多内存。
2、Driver在待计算数据分发和计算数据接收过程中,一方面需要保存这些数据,另外一方面在发送和接收时需要进行序列化和反序列化,同样是非常耗费内存。

2.2.2、资源配置方式

在我们在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数:

  1. /usr/local/spark/bin/spark-submit \
  2. --class cn.spark.sparktest.core.WordCountCluster \
  3. --master yarn \ //使用yarn
  4. --deploy-mode cluster \ //部署模式
  5. --num-executors 3 \ //配置executor的数量
  6. --driver-memory 100m \ //配置driver的内存(影响不大)
  7. --executor-memory 100m \ //配置每个executor的内存大小
  8. --executor-cores 3 \ //配置每个executor的cpu core数量
  9. /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

2.2.3、调优原则

为任务分配,可以获取到并使用的最大资源

  • Spark Standalone模式:了解集群中可使用的资源,依据实际情况,去计算这些参数应该设置的数值。
  • Spark Yarn模式:Yarn的是资源队列进行资源的分配和调度,因此,在写编写submit脚本的时候,就根据Spark作业要提交到的资源队列,进行资源分配。(比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。)

    2.2.4、动态分配资源

    1. spark.shuffle.service.enabled true //启用External shuffle Service服务
    2. spark.shuffle.service.port 7337 //Shuffle Service默认服务端口,必须和yarn-site中的一致
    3. spark.dynamicAllocation.enabled true //开启动态资源分配
    4. spark.dynamicAllocation.minExecutors 1 //每个Application最小分配的executor数
    5. spark.dynamicAllocation.maxExecutors 30 //每个Application最大并发分配的executor数
    6. spark.dynamicAllocation.schedulerBacklogTimeout 1s
    7. spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s

2.3、内存估算

https://www.cnblogs.com/wzj4858/p/8204282.html
统一内存管理图示——堆内
image.png
统一内存管理图示——堆外
image.png

➢ 估算 Other 内存 = 自定义数据结构 每个 Executor 核数
➢ 估算 Storage 内存 = 广播变量 + cache/Executor 数量
➢ 估算 Executor 内存 = 每个 Executor 核数
(数据集大小/并行度)

调整内存配置项
一般情况下,各个区域的内存比例保持默认值即可。如果需要更加精确的控制内存分 配,可以按照如下思路:
spark.memory.fraction=(估算 storage 内存+估算 Execution 内存)/(估算 storage 内存 +估算 Execution 内存+估算 Other 内存)得到
spark.memory.storageFraction =(估算 storage 内存)/(估算 storage 内存+估算 Execution 内存)
代入公式计算:
Storage 堆内内存=(spark.executor.memory–300MB)spark.memory.fractionspark.memory.storageFraction
Execution 堆内内存= (spark.executor.memory–300MB)spark.memory.fraction(1-spark.memory.storageFraction)

三、并行度调优

并行度的合理调整,可以降低资源浪费,提高spark任务的运行效率
task的数量应该设置为spark cores的2~3倍

  1. 如果读取的数据在HDFS中,降低block的大小,相当于提高了RDD中partition个数sc.textFile(xx,numPartitions)
  2. sc.paralleize(xx,numPartitions)
  3. sc.makeRDD(xx,numPartitions)
  4. sc.parallelizePairs(xx,numPartitions)
  5. repartitions/coalesce
  6. reduceByKey/groupByKey/join — (xx,numPartitions)
  7. spark.default.parallelism=500
  8. spark.sql.shuffle.partitions=200
    1. spark.default.parallelism并行度设置对sparksql无效
  9. 自定义分区函数
  10. 如果读取的数据在SparkStreaming中
    1. Receiver:spark.streaming.blockInterval — 200ms
    2. Direct读取的topic的分区数

四、代码调优

4.1、避免创建重复的RDD

4.2、对多次使用的RDD进行持久化

4.3、持久化算子

1、可以调用rdd的cache或者persist方法对重复多次使用的RDD进行持久化
2、rdd持久化的时可以采用序列化
(1)如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许会导致OOM内存溢出。
(2)当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个字节数组;序列化后,大大减少内存的空间占用。
(3)序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。但是可以减少占用的空间和便于网络传输
(4)如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。

4.4、尽量避免使用shuffle类算子

使用广播变量来模拟join
join算子 = 广播变量 + filter 、 广播变量 + map 、 广播变量 + flatMap

4.5、使用map-join的预聚合操作

即尽量使用有combiner的shuffle类算子,可以减少数据的落盘,减少IO操作。

  • combiner概念:在map端,每一个map task计算完毕后进行的局部聚合。
  • combiner好处:
    • 降低shuffle writer写磁盘的数据量
    • 降低shuffle reader拉取数据量的大小
    • 降低reduce端聚合的次数
  • 有combiner操作的shuffle算子

    • reduceByKey:在一些场景可以替代groupByKey
    • aggregateByKey
    • combinerByKey

      4.6、尽量使用高性能的算子

  • 使用reduceByKey代替groupByKey

  • 使用mapPartition替代map
  • 使用foreachPartition替代foreach
  • filter后使用coalesce减少分区数
  • 使用repartitionAndSortWithinPartitions替代reparation与sort类操作
  • 使用repartition和coalesce算子操作分区

    4.7、使用广播变量

    image.png

    4.8、使用Keyo优化序列化性能

    默认情况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化,这种默认序列化机制的好处在于,处理起来比较方便;也不需要我们手动去做什么事情,只是,你在算子里面使用的变量,必须是实现Serializable接口的,可序列化即可。
    但是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢;序列化以后的数据,占用的内存空间相对还是比较大。
    可以手动进行序列化格式的优化,Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。

    Kryo序列化机制,一旦启用以后,会生效的几个地方:

    1、算子函数中使用到的外部变量 2、持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER 3、shuffle

    1、算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗 2、持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,

    1. task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC

    3、shuffle:可以优化网络传输的性能

SparkConf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)

首先第一步,在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类;
Kryo之所以没有被作为默认的序列化类库的原因,就要出现了:主要是因为Kryo要求,如果要达到它的最佳性能的话,那么就一定要注册你自定义的类(比如,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,否则Kryo达不到最佳性能)。

第二步,注册你使用到的,需要通过Kryo序列化的,一些自定义类,项目中的使用:

  1. // 第一种方式:设置序列化器为KryoSerializer。
  2. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  3. // 第二种方式:注册要序列化的自定义类型。
  4. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

4.9、使用高性能的库fastutil

1、fastutil介绍
fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set.
2、fastutil好处
fastutil集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度;
3、fastutil的使用
第一步:在pom.xml中引用fastutil的包

  1. <dependency>
  2. <groupId>fastutil</groupId>
  3. <artifactId>fastutil</artifactId>
  4. <version>5.0.9</version>
  5. </dependency>

第二步:平时使用List (Integer)的替换成IntList即可。List< Integer >的list对应的到fastutil就是IntList类型
使用说明:基本都是类似于IntList的格式,前缀就是集合的元素类型;特殊的就是Map,Int2IntMap,代表了key-value映射的元素类型。

五、数据本地化

5.1、调节数据本地化等待时长

Spark在Driver上对Application的每一个stage的task进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partition;Spark的task分配算法,优先会希望每个task正好分配到它要计算的数据所在的节点,这样的话就不用在网络间传输数据;
但是通常来说,有时事与愿违,可能task没有机会分配到它的数据所在的节点,为什么呢,可能那个节点的计算资源和计算能力都满了;所以这种时候,通常来说,Spark会等待一段时间,默认情况下是3秒(不是绝对的,还有很多种情况,对不同的本地化级别,都会去等待),到最后实在是等待不了了,就会选择一个比较差的本地化级别,比如说将task分配到距离要计算的数据所在节点比较近的一个节点,然后进行计算。
1、本地化级别

  • PROCESS_LOCAL:进程本地化,在同一个executor中
  • NODE_LOCAL:节点本地化
  • RACK_LOCAL:机架本地化
  • ANY:无限制

2、数据本地化等待时长:spark.locality.wait,默认是3s
3、如何调节参数并且测试
image.png
在代码中设置:
new SparkConf().set(“spark.locality.wait”,“10”)
然后把程序提交到spark集群中运行,注意观察日志,spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。
看到日志中大多数的PROCESS_LOCAL就不用了调节了,如果大多是NODE_LOCAL、ANY就需要调一下,但需要注意的是不要因为调节等待时长反而导致任务变慢了
image.png

六、Shuffle优化

image.png