一、Explain 查看执行计划
要优化 SparkSQL 应用时,一定是要了解 SparkSQL 执行计划的。发现 SQL 执行慢的根 本原因,才能知道应该在哪儿进行优化,是调整 SQL 的编写方式、还是用 Hint、还是调参, 而不是把优化方案拿来试一遍。
1.1、基本语法
.explain(mode=”xxx”)
从 3.0 开始,explain 方法有一个新的参数 mode,该参数可以指定执行计划展示格式:
➢ explain(mode=”simple”):只展示物理执行计划。
➢ explain(mode=”extended”):展示物理执行计划和逻辑执行计划。
➢ explain(mode=”codegen”) :展示要 Codegen 生成的可执行 Java 代码。
➢ explain(mode=”cost”):展示优化后的逻辑执行计划以及相关的统计。
➢ explain(mode=”formatted”):以分隔的方式输出,它会输出更易读的物理执行计划, 并展示每个节点的详细信息。
1.2、执行计划处理流程
核心的执行过程一共有5个步骤:
这些操作和计划都是 Spark SQL 自动处理的,会生成以下计划:
- ➢ Unresolved 逻辑执行计划:== Parsed Logical Plan ==
- Parser 组件检查 SQL 语法上是否有问题,然后生成 Unresolved(未决断)的逻辑计划, 不检查表名、不检查列名。
- ➢ Resolved 逻辑执行计划:== Analyzed Logical Plan ==
- 通过访问 Spark 中的 Catalog 存储库来解析验证语义、列名、类型、表名等。
- ➢ 优化后的逻辑执行计划:== Optimized Logical Plan ==
- Catalyst 优化器根据各种规则进行优化。
- ➢ 物理执行计划:== Physical Plan ==
- 1)HashAggregate 运算符表示数据聚合,一般 HashAggregate 是成对出现,第一个 HashAggregate 是将执行节点本地的数据进行全部聚合,另一个 HashAggregate 是将各个分区的数据进一步进行聚合计算。
- 2)Exchange 运算符其实就是 shuffle,表示需要在集群上移动数据。很多时候 HashAggregate 会以 Exchange 分隔开来。
- 3)Project 运算符是 SQL 中的投影操作,就是选择列(例如:select name, age…)。
- 4)BroadcastHashJoin 运算符表示通过基于广播方式进行 HashJoin。
- 5)LocalTableScan 运算符就是全表扫描本地的表
1.3、解析样例
import com.atguigu.sparktuning.utils.InitUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object ExplainDemo {
def main( args: Array[String] ): Unit = {
val sparkConf = new SparkConf().setAppName("ExplainDemo")
// .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
val sqlstr =
"""
|select
| sc.courseid,
| sc.coursename,
| sum(sellmoney) as totalsell
|from sale_course sc join course_shopping_cart csc
| on sc.courseid=csc.courseid and sc.dt=csc.dt and sc.dn=csc.dn
|group by sc.courseid,sc.coursename
""".stripMargin
sparkSession.sql("use sparktuning;")
// sparkSession.sql(sqlstr).show()
// while(true){}
println("=====================================explain()-只展示物理执行计划============================================")
sparkSession.sql(sqlstr).explain()
println("===============================explain(mode = \"simple\")-只展示物理执行计划=================================")
sparkSession.sql(sqlstr).explain(mode = "simple")
println("============================explain(mode = \"extended\")-展示逻辑和物理执行计划==============================")
sparkSession.sql(sqlstr).explain(mode = "extended")
println("============================explain(mode = \"codegen\")-展示可执行java代码===================================")
sparkSession.sql(sqlstr).explain(mode = "codegen")
println("============================explain(mode = \"formatted\")-展示格式化的物理执行计划=============================")
sparkSession.sql(sqlstr).explain(mode = "formatted")
}
}
1.4、使用Spark UI
也可以在UI上怎么看执行计划,不管是正在运行的还是运行结束的。在浏览器打开Spark UI。但是请注意你的Spark UI是运行在哪的,如果是运行在本地,并且正在运行,Spark UI的默认端口是4040,你可以通过(http://localhost:4040)去访问。在SQL标签下,可以找到你的执行计划。
点击description去看Spark有向无环图的图形界面去看执行计划。在这里你需要从上往下读信息,你可以在底部展开详细信息去看执行计划。这个信息和使用explain API是非常相似的。
二、资源调优
2.1、资源配置调优原理
为什么资源配置对于性能的提升这么明显呢?这里涉及到了应用程序对资源的申请和分配了。首先我们知道,Task从生成到执行并返回结果的流程1如下(如下图及说明):
- DAGScheduler将一个Job划分为多个Stage,又将Stage划分生成TaskSet。
- DAGScheduler将TaskSet交给TaskScheduler去提交。
- TaskScheduler将TaskSet中的Task逐个提交到它对应的Executor上去执行。一个Task提交给一个Executor。
- Executor从线程池取出一个线程执行,并返回执行结果。(可见,只要线程数(即CPU数)充足,多个Task可以提交给同一个Executor的)
由此,我们可以知道,Task是Spark任务的核心,而Task是在Executor上执行的,因此,对资源的配置,主要是为Executor配置资源。
2.2、资源配置说明
2.2.1、资源配置种类
在资源配置时,我我们主要配置以下种类的资源:Task执行并行度 = Executor数量 每个Executor的CPU数量 (当然了,每个Executor的CPU数量可能不同)
*RDD 与 Task 的关系说明:RDD在计算的时候,每个分区都会起一个Task,所以RDD的分区数目决定了总的的Task数目。每个Task执行的结果就是生成了目标RDD的一个Partiton。
类别 | 说明 |
---|---|
Executor数量 | 从上面的公式可知,如果Executor数量比较少,那么,能够并行执行的Task数量就比较少,就意味着,我们的Application的并行执行的能力就很弱。 比如有10个Executor,每个Executor有10个CPU core,那么同时能够并行执行的Task就是100个。100个执行完以后,再换下一批100个task。增加了Executor数量意味着增加了能够并行执行的Task数量。比如原先是100个,现在可能可以并行执行200个,2000个,甚至20000个。那么并行能力就比之前提升了数倍,数十倍。相应的,性能(执行的速度),也能提升数倍~数十倍。 |
Executor CPU | 同理,增加每个Executor的CPU core,也是增加了执行的并行能力。 |
Executor Memory | 增加每个Executor的内存量。增加了内存量以后,对性能的提升,有三点: 1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。 2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给Executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能。 3、对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。 |
Driver Memory | 增加Driver的内存量。增加Driver内存量对性能的提升主要体现在: 1、DAGscheduler在Stage划分过程中,产生大量的Task对象,会占用大量的内存,如果内存不足也会导致频繁GC。SparkContext本身维护了大量的对象,也占用很多内存。 2、Driver在待计算数据分发和计算数据接收过程中,一方面需要保存这些数据,另外一方面在发送和接收时需要进行序列化和反序列化,同样是非常耗费内存。 |
2.2.2、资源配置方式
在我们在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数:
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--master yarn \ //使用yarn
--deploy-mode cluster \ //部署模式
--num-executors 3 \ //配置executor的数量
--driver-memory 100m \ //配置driver的内存(影响不大)
--executor-memory 100m \ //配置每个executor的内存大小
--executor-cores 3 \ //配置每个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
2.2.3、调优原则
为任务分配,可以获取到并使用的最大资源
- Spark Standalone模式:了解集群中可使用的资源,依据实际情况,去计算这些参数应该设置的数值。
- Spark Yarn模式:Yarn的是资源队列进行资源的分配和调度,因此,在写编写submit脚本的时候,就根据Spark作业要提交到的资源队列,进行资源分配。(比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。)
2.2.4、动态分配资源
spark.shuffle.service.enabled true //启用External shuffle Service服务
spark.shuffle.service.port 7337 //Shuffle Service默认服务端口,必须和yarn-site中的一致
spark.dynamicAllocation.enabled true //开启动态资源分配
spark.dynamicAllocation.minExecutors 1 //每个Application最小分配的executor数
spark.dynamicAllocation.maxExecutors 30 //每个Application最大并发分配的executor数
spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
2.3、内存估算
https://www.cnblogs.com/wzj4858/p/8204282.html
统一内存管理图示——堆内
统一内存管理图示——堆外
➢ 估算 Other 内存 = 自定义数据结构 每个 Executor 核数
➢ 估算 Storage 内存 = 广播变量 + cache/Executor 数量
➢ 估算 Executor 内存 = 每个 Executor 核数 (数据集大小/并行度)
调整内存配置项
一般情况下,各个区域的内存比例保持默认值即可。如果需要更加精确的控制内存分 配,可以按照如下思路:
spark.memory.fraction=(估算 storage 内存+估算 Execution 内存)/(估算 storage 内存 +估算 Execution 内存+估算 Other 内存)得到
spark.memory.storageFraction =(估算 storage 内存)/(估算 storage 内存+估算 Execution 内存)
代入公式计算:
Storage 堆内内存=(spark.executor.memory–300MB)spark.memory.fractionspark.memory.storageFraction
Execution 堆内内存= (spark.executor.memory–300MB)spark.memory.fraction(1-spark.memory.storageFraction)
三、并行度调优
并行度的合理调整,可以降低资源浪费,提高spark任务的运行效率
task的数量应该设置为spark cores的2~3倍
- 如果读取的数据在HDFS中,降低block的大小,相当于提高了RDD中partition个数sc.textFile(xx,numPartitions)
- sc.paralleize(xx,numPartitions)
- sc.makeRDD(xx,numPartitions)
- sc.parallelizePairs(xx,numPartitions)
- repartitions/coalesce
- reduceByKey/groupByKey/join — (xx,numPartitions)
- spark.default.parallelism=500
- spark.sql.shuffle.partitions=200
- spark.default.parallelism并行度设置对sparksql无效
- 自定义分区函数
- 如果读取的数据在SparkStreaming中
- Receiver:spark.streaming.blockInterval — 200ms
- Direct读取的topic的分区数
四、代码调优
4.1、避免创建重复的RDD
4.2、对多次使用的RDD进行持久化
4.3、持久化算子
1、可以调用rdd的cache或者persist方法对重复多次使用的RDD进行持久化
2、rdd持久化的时可以采用序列化
(1)如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许会导致OOM内存溢出。
(2)当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个字节数组;序列化后,大大减少内存的空间占用。
(3)序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。但是可以减少占用的空间和便于网络传输
(4)如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。
4.4、尽量避免使用shuffle类算子
使用广播变量来模拟join
join算子 = 广播变量 + filter 、 广播变量 + map 、 广播变量 + flatMap
4.5、使用map-join的预聚合操作
即尽量使用有combiner的shuffle类算子,可以减少数据的落盘,减少IO操作。
- combiner概念:在map端,每一个map task计算完毕后进行的局部聚合。
- combiner好处:
- 降低shuffle writer写磁盘的数据量
- 降低shuffle reader拉取数据量的大小
- 降低reduce端聚合的次数
有combiner操作的shuffle算子
使用reduceByKey代替groupByKey
- 使用mapPartition替代map
- 使用foreachPartition替代foreach
- filter后使用coalesce减少分区数
- 使用repartitionAndSortWithinPartitions替代reparation与sort类操作
- 使用repartition和coalesce算子操作分区
4.7、使用广播变量
4.8、使用Keyo优化序列化性能
默认情况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化,这种默认序列化机制的好处在于,处理起来比较方便;也不需要我们手动去做什么事情,只是,你在算子里面使用的变量,必须是实现Serializable接口的,可序列化即可。
但是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢;序列化以后的数据,占用的内存空间相对还是比较大。
可以手动进行序列化格式的优化,Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。Kryo序列化机制,一旦启用以后,会生效的几个地方:
1、算子函数中使用到的外部变量 2、持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER 3、shuffle
1、算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中内存的占用和消耗 2、持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,
task执行的时候,创建的对象,就不至于频繁的占满内存,频繁发生GC。
3、shuffle:可以优化网络传输的性能
SparkConf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
首先第一步,在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类;
Kryo之所以没有被作为默认的序列化类库的原因,就要出现了:主要是因为Kryo要求,如果要达到它的最佳性能的话,那么就一定要注册你自定义的类(比如,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,否则Kryo达不到最佳性能)。
第二步,注册你使用到的,需要通过Kryo序列化的,一些自定义类,项目中的使用:
// 第一种方式:设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 第二种方式:注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
4.9、使用高性能的库fastutil
1、fastutil介绍
fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来替代自己平时使用的JDK的原生的Map、List、Set.
2、fastutil好处
fastutil集合类,可以减小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度;
3、fastutil的使用
第一步:在pom.xml中引用fastutil的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
第二步:平时使用List (Integer)的替换成IntList即可。List< Integer >的list对应的到fastutil就是IntList类型
使用说明:基本都是类似于IntList的格式,前缀就是集合的元素类型;特殊的就是Map,Int2IntMap,代表了key-value映射的元素类型。
五、数据本地化
5.1、调节数据本地化等待时长
Spark在Driver上对Application的每一个stage的task进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partition;Spark的task分配算法,优先会希望每个task正好分配到它要计算的数据所在的节点,这样的话就不用在网络间传输数据;
但是通常来说,有时事与愿违,可能task没有机会分配到它的数据所在的节点,为什么呢,可能那个节点的计算资源和计算能力都满了;所以这种时候,通常来说,Spark会等待一段时间,默认情况下是3秒(不是绝对的,还有很多种情况,对不同的本地化级别,都会去等待),到最后实在是等待不了了,就会选择一个比较差的本地化级别,比如说将task分配到距离要计算的数据所在节点比较近的一个节点,然后进行计算。
1、本地化级别
- PROCESS_LOCAL:进程本地化,在同一个executor中
- NODE_LOCAL:节点本地化
- RACK_LOCAL:机架本地化
- ANY:无限制
2、数据本地化等待时长:spark.locality.wait,默认是3s
3、如何调节参数并且测试
在代码中设置:
new SparkConf().set(“spark.locality.wait”,“10”)
然后把程序提交到spark集群中运行,注意观察日志,spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。
看到日志中大多数的PROCESS_LOCAL就不用了调节了,如果大多是NODE_LOCAL、ANY就需要调一下,但需要注意的是不要因为调节等待时长反而导致任务变慢了