submit资源调化
- executor_cores*num_executors
不宜太小或太大!一般不超过总队列 cores 的 25%,比如队列总 cores 400,最大不要超过100,最小不建议低于 40,除非日志量很小。
- executor_cores
不宜为1!否则 work 进程中线程数过少,一般 2~6 为宜。(也不宜过多)
- executor_memory
一般 6~10g 为宜,最大不超过 20G,否则会导致 GC 代价过高,或资源浪费严重。
- driver-memory
内存调优
submit时设置堆外内存 --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g --conf spark.driver.memoryOverhead=1g
- 一个容器最多可以申请多大资源,是由container参数 yarn.scheduler.maximum-allocation-mb决定
而spark是:三个参数相加必须 <= container最大内存
RDD复用
- 对多次使用的RDD进行持久化 (RDD1.map RDD1.reduce 这里指进行了并行的操作 而不是串联)
-
算子调优
使用reduceByKey/aggregateByKey替代groupByKey
reduceByKey相对于普通的shuffle操作(比如groupByKey),它的一个重要特点就是map端的本地聚合。减少了数据量,也即是减少了磁盘IO,同时减少了网络传输,对性能有明显提升,所以,在实际的项目中,能用reduceByKey实现的就尽量用该算子实现。
- 使用mapPartitions替代普通map
如果是Map,假如一个partition中有一万条数据,那么map中的function就要执行和计算一万次,
如果是MapPartitions,一个task只会执行一次function,如果是普通的Map,一条一条处理数据,当出现内存不够的情况时,那么就可以将已经处理掉的数据从内存里面垃圾回收掉,所以普通map通常不会出现OOM情况 。如果是MapPartitions,对于大量数据来说,如果一个partiton数据有一百万条,一次性传入function之后,可能导致内存不足,但是又没办法腾出空间,直接就导致了内存溢出,OOM 所以,当使用MapPartitons算子时,要估算每个partiton的数据能不能一下子缓存到分配给executor的内存中,如果可以,就是用该算子,对性能有显著提升。
- 使用foreachPartitions替代foreach(通常用来写入数据库)
优化写数据库性能,foreach是对每条数据进行处理的,task对partition中的每一条数据都会执行function操作,如果function中有写数据库的操作,那么有多少条数据就会创建和销毁多少个数据库连接,这对性能的影响很大 。
在生产环境中,通常都是使用foreachPartition来写数据库的,使用了该算子之后,对于用户自定义function函数,就调用一次,一次传入一个partition的所有数据,这里只需创建一个数据库连接,然后向数据库发送一条sql语句外加多组参数即可,但是这个时候要配合数据库的批处理 。同样,该算子在超大数据量面前同样会出现OOM情况。
- mapValues
针对k,v结构的rdd,mapValues直接对value进行操作,不对Key造成影响,可以减少不必要的分区操作。
- 避免 flatMap-join-groupBy 的模式。
当有两个已经按照key分组的数据集,你希望将两个数据集合并,并且保持分组,这种情况可以使用 cogroup。这样可以避免对group进行打包解包的开销。
- filter只后进行coalesce
- Top( )算子的不准确的解决办法
方案一:指定top()的排序方法,这里我们直接根据value排序:sortBy(x => x._2,false).top(10)(Ordering.by(e => e._2)
方案二:不用top(),直接用sortBy(x =>x._2,false).take(10)
方案三:既然top()底层调用的是takeOrdered(),我们也直接可以用takeOrdered(10)(Ordering.by(e => e._2)
shuffle调优
- spark.shuffle.memoryFraction
shuffle占用内存的比例,默认0.2 根据代码shuffle操作多不多调整 0.2 -> 0.3
- spark.storage.memoryFraction
持久化占用内存的比例,默认0.6 根据代码持久化操作多不多调整
- 合并map端输出文件
默认情况下,Spark是不开启合并map端输出文件机制的,所以当分批次执行task时,每批的task都会创建新的文件,而不会共用,大大影响了性能,所以当有大量map文件生成时,需要开启该机制
new SparkConf().set(“spark.shuffle.consolidateFiles”, “true”)
- map端内存缓冲
默认情况下: 每个task的内存缓冲为32kb,reduce端内存占比为0.2(即默认executor内存中划分20%给reduce task)
比如:
每个task就处理320kb,32kb,总共会向磁盘溢写320 / 32 = 10次。
每个task处理32000kb,32kb,总共会向磁盘溢写32000 / 32 = 1000次。
同理,ruduce端也一样
何时调优?
通过Spark UI查看shuffle磁盘的read和write是不是很大,如果很大则应相应调优
如何调优?
spark.shuffle.file.buffer : 32kb -> 128kb
- reduce端内存缓冲
shuffle read时每次拉取的数据量 默认48M
val conf = new SparkConf().set(“spark.reducer.maxSizeInFlight”, “96”)
- 调整reduce端拉取数据重试次数和reduce端拉取数据时间间隔(shuffle read)
使得reduce端拉取数据的重试次数增加,并且每次失败后等待的时间间隔加长。
val conf = new SparkConf()
.set(“spark.shuffle.io.maxRetries”, “60”)
.set(“spark.shuffle.io.retryWait”, “60s”)
并行度优化
task总数量 建议修改为总核数的2-3倍
让资源充分利用,如果有50个core,并行度40 就会浪费10个core。
数据倾斜的本质是某个key的数据量过大,经过shuffle都聚到同一个task了,所以这单纯加大这个参数是没用的,并不能解决数据倾斜
spark.sql.shuffle.partitions # DF DS的分区数设置。shuffle分区个数。默认200,
spark.default.parallelism # RDD的分区数设置
数据倾斜优化
- 首先看能不能取消shuffle
用broadcast join
- 再看能不能取消倾斜
过滤导致倾斜的key
- 扩大并行度(适用于有较多key数据量大的情况)
如果有100个key 数据量都很大,能让着100Key并行执行。
不治本:但是极端情况下 只有一个key数据量大,一个key如果有100万数据还是会分到一个分区去执行
- 聚合类随机key:key加前缀(适用于聚合类的shuffle操作-reduceByKey groupByKey,适用范围相对较窄 )
- join类随机key:打散大表、扩容小表(适用于大小表join,耗资源 慎用)

