代码层面

算子选择

  • 使用reduceByKey/aggregateByKey替代groupByKey
  • 使用mapPartitions替代普通map
  • 使用foreachPartitions替代foreach
  • 使用filter之后进行coalesce操作
  • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

上面条目解释:
reduceByKey、aggregateByKey代替groupByKey,groupByKey不会先在每个分区内聚合。
mapPartition提速但增大内存。
迭代器(Iterator)包裹回调逻辑可提速,单会在把计算逻辑转移到使用时,会造成程序最后内存增大,可能导致程序崩溃。
之所以需要filter之后进行coalesce操作,是一种针对小文件问题的解决方案。举个例子,一共十个分区每个分区存了100条数据,我提一种极端的情况,经过了多重过滤,每个分区只剩下了10条数据,那么这是不是对分区资源的一种浪费。所以需要重新分区。

广播大变量

广播大变量的重点在于一个“大”字,首先我们要了解一下spark的部分执行流程,rdd使用外部变量,是将这个外部变量的副本通过网络传输的方式存入到每一个运行这个应用程序的task上去。如果这个变量足够大,那么在网络I/O上消耗的时间是十分可观的,而且还需要占用十分多的存储空间。针对这种“大”的外部变量(100M,甚至1G),我们可以将变量广播,使这个变量的副本只存在于每个Executer中,这样就大大减少了变量的副本数,大大的提升了效率。

重分区

  1. rdd.partitionBy(new HashPartitioner(100)) // 构造100个分区
  2. .persist()

持久化

必须计算时间大于正反序列化时间cache才有意义。要不然反而速度会变慢。
单action中重复使用一个RDD,cache也是有效的。
所以一般没有shuffle的计算结果cache都可能变慢。
测试代码

  1. val broadcastVar = sc.broadcast(Array(1, 2, 3))
  2. //print(broadcastVar.value)
  3. val accum = sc.longAccumulator("My Accumulator")
  4. val array = new Array[Int](1000)
  5. for (i <- 0 until 1000) {
  6. array(i) = i
  7. }
  8. val a = sc.parallelize(array)
  9. val startTime = LocalDateTime.now()
  10. val b = a.map(x => {
  11. var a = x
  12. for (i <- 0 until 40000000) {
  13. a += 1
  14. a -= 1
  15. }
  16. a
  17. }).cache()
  18. val c = b.map{ _ + 2}
  19. val d = b.map{ _ + 2}
  20. val r = d.union(c)
  21. r.foreach(x => {
  22. accum.add(x)
  23. })
  24. val endTime = LocalDateTime.now()
  25. import java.time.Duration
  26. val duration = Duration.between(startTime, endTime)
  27. println(duration)
  28. println(accum.value)

序列化

官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

  1. // 创建SparkConf对象。
  2. val conf = new SparkConf().setMaster(...).setAppName(...)
  3. // 设置序列化器为KryoSerializer。
  4. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  5. // 注册要序列化的自定义类型。
  6. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

配置层面

Worker CPU数尽量一致。5T以下尽量少Worker方式配置。

JVM

在通常的使用中建议:

  1. --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

Shuffle缓存

Shuffle过程中所有缓存大小可调大。减少读写次数。

数据倾斜问题、

参考资料

Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势
http://www.treselle.com/blog/apache-spark-performance-tuning-degree-of-parallelism/
数据倾斜(Data Skew)的N种姿势
https://www.cnblogs.com/shishanyuan/p/8454310.html
num-executors,executor-cores,executor-memory

https://www.cnblogs.com/haozhengfei/p/5fc4a976a864f33587b094f36b72c7d3.html
https://www.jianshu.com/p/391f8776e66f
https://www.jianshu.com/p/67606a11415b

spark.default.parallelism官网推荐2-3倍CPU核数,但实际情况要考虑内存、并行度越小每个partition占用的内存越大,很有可能跑的时候内存爆了。实际情况很有可能是5倍CPU核数。如果不制定这个参数,默认的partition会根据HDFS的block决定很有可能和Spark的各项参数不协调,导致计算时间不顾增加。


https://blog.csdn.net/yhb315279058/article/details/51035631

combineByKey是key-value型rdd自带的API,可以直接使用。
综合资料
https://www.raychase.net/3546