2.4 Spark 调优

  1. spark1.6以下的版本中execution和storage的内存是各自固定的,执行内存负责transform算子和shuffle算子,storage负责catch部分和广播变量的存储。通过以下两个参数改变它们的大小。spark.shuffle.memoryFraction=0.2 spark.storage.memoryFraction=0.6
  2. spark1.6以上的版本时使用联合内存机制,两者可以互相借用内存,但是如果执行内存不够时会强制回收storage借走的内存。因此如果要进行大缓存任务时建议手动设置固定内存机制。
  3. 1.6以上的版本还额外增加了堆外内存,调用persist方法时指定StorageLevel.OFF_HEAP参数,配合分布式内存文件系统Tachyon将需要缓存很久的数据存放到堆外内存,大幅降低full GC的发生频率。. 使用repartition增加分区数量,降低每个task的大小
  4. 当合并分区的数据量过大时,可以使用repartition并手动指定使用shuffle来进行带shuffle的合并操作,可以在合并前先进行一次聚合。
  5. 使用shuffle算子时指定分区数量或指定自定义分区器避免数据倾斜
  6. 当某个数据重复很多时,尽量用一个对象来代表这些数据,可以是string,也可以是带计数器的map。
  7. 使用mapPartition代替map可以提升效率,但要注意内存紧缺时不能使用。
  8. map端join:当join小表时,可以先用collect将数据收集到driver端,然后用广播变量的方式发送到各个节点上,避免大数据的迁移。
  9. 可以使用map端reduce的方式进一步减少网络IO。调用combineByKey算子。
  10. 内存不足时使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER),直接缓存到磁盘。
  11. spark集群节点应该覆盖hbase,因为spark读取hbase时是按region读取,在同一个节点上可以避免大量数据迁移
  12. 参数设置: spark.driver.memory (default:1G) 设置driver端内存 spark.rdd.compress 设置压缩内存的rdd数据,减少内存的占用,但是增加CPU负担 spark.serializer 设置默认kyro spark.memory.storageFraction 设置storage在内存中的比例,根据缓存的大小决定 spark.locality.wait 设置等待任务的等待时间,如果某个任务等待数据到达的时间超过该时间,就会被下调优先级 spark.speculation 设置空闲节点是否执行某个长时间未结束的task,有点类似hive的预测执行,建议开启。

总结

减少GC:增加计算用的内存;把频繁使用的大缓存缓存到堆外内存;使用计数器存储重复的数据
增加并行度:shuffle时指定分区数量、repartition增加分区(可用线程的2-3倍)、减小分区可以指定带shuffle的repartition进行局部聚合
减少shuffle:使用指定的分区器进行分区,使得相同的key都处于同一分区中(主要用在数据清洗时按自定义分区器存储数据)
map端join:先读取小表到driver端存成广播变量,再读取大表使用广播变量进行join
map端reduce:使用combineByKey算子
指定persist : 内存不足时用persisit指定缓存磁盘来代替catch
参数调优:设置压缩RDD(节约内存,加重CPU)、设置kyro序列化、延长下调本地化级别的等待时间、开启预测执行等等