一、累加器

分布式共享只写变量。

1.1、实现原理

累加器用来把Executor 端变量信息聚合到Driver 端。在Driver 程序中定义的变量,在Executor 端的每个Task 都会得到这个变量的一份新的副本,每个task 更新这些副本的值后, 传回Driver 端进行merge。
image.png
1、 累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。
2、累加器不是一个调优的操作,因为如果不这样做,结果是错的。

1.2、基本使用

  1. -- 基本api
  2. val rdd = sc.makeRDD(List(1,2,3,4,5))
  3. // 声明累加器
  4. var sum = sc.longAccumulator("sum");
  5. rdd.foreach(
  6. num => {
  7. //使用累加器,此处调用累加器的add方法
  8. sum.add(num)
  9. }
  10. )
  11. // 获取累加器的值
  12. println("sum = " + sum.value)

1.3、自定义累加器

  1. // 自定义累加器
  2. // 1. 继承 AccumulatorV2,并设定泛型
  3. // 2. 重写累加器的抽象方法
  4. import org.apache.spark.util.AccumulatorV2
  5. import scala.collection.mutable
  6. //指定泛型参数,IN的类型,OUT的类型
  7. class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{
  8. var map : mutable.Map[String, Long] = mutable.Map()
  9. // 累加器是否为初始状态
  10. override def isZero: Boolean = {
  11. map.isEmpty
  12. }
  13. // 复制累加器
  14. override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
  15. new WordCountAccumulator
  16. }
  17. // 重置累加器
  18. override def reset(): Unit = {
  19. map.clear()
  20. }
  21. // 向累加器中增加数据 (In)
  22. override def add(word: String): Unit = {
  23. // 查询 map 中是否存在相同的单词
  24. // 如果有相同的单词,那么单词的数量加 1
  25. // 如果没有相同的单词,那么在 map 中增加这个单词
  26. map(word) = map.getOrElse(word, 0L) + 1L
  27. }
  28. // Driver合并多个累加器
  29. override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
  30. val map1 = map //当前累加器的累加器变量
  31. val map2 = other.value //其他累加器的变量
  32. // 两个 Map 的合并
  33. map = map1.foldLeft(map2)(
  34. ( innerMap, kv ) => {
  35. innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
  36. innerMap
  37. }
  38. )
  39. }
  40. // 返回累加器的结果 (Out)
  41. override def value: mutable.Map[String, Long] = map
  42. }
  43. ---------------------
  44. 使用:
  45. val acc = new WordCountAccumulator()
  46. sc.register(acc,"累加器自定义名字")
  47. // 这样就可以正常使用累加器 acc

1.4、注意实现

少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
多加:转换算子中调用累加器,如果有多个行动算子的话,那么会多次执行

二、广播变量

分布式共享只读变量。

2.1、实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表, 广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。
由于闭包的数据,都是以Task为单位进行发送的,这样每个Task都可能会包含相同的闭包数据;就会导致同一个Executor包含大量重复数据,从而导致内存爆炸。
image.png
而使用广播变量则可以将闭包的数据存放在Executor的JVM内存中,每个Task访问该共享变量即可,从而提高共享效率。
image.png

2.2、基本使用

  1. val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
  2. val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
  3. // 声明广播变量
  4. val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
  5. val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
  6. case (key, num) => {
  7. var num2 = 0
  8. // 使用广播变量
  9. for ((k, v) <- broadcast.value) {
  10. if (k == key) {
  11. num2 = v
  12. }
  13. }
  14. (key, (num, num2))
  15. }
  16. }