实现原理
    累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。
    基础编程
    系统累加器

    1. val rdd = sc.makeRDD(List(1,2,3,4,5))
    2. // 声明累加器
    3. var sum = sc.longAccumulator("sum");
    4. rdd.foreach(
    5. num => {
    6. // 使用累加器
    7. sum.add(num)
    8. } )
    9. // 获取累加器的值
    10. println("sum = " + sum.value)

    自定义累加器

    1. // 自定义累加器
    2. // 1. 继承 AccumulatorV2,并设定泛型
    3. // 2. 重写累加器的抽象方法
    4. class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String,
    5. Long]]{
    6. var map : mutable.Map[String, Long] = mutable.Map()
    7. // 累加器是否为初始状态
    8. override def isZero: Boolean = {
    9. map.isEmpty
    10. }
    11. // 复制累加器
    12. override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    13. new WordCountAccumulator
    14. }
    15. // 重置累加器
    16. override def reset(): Unit = {
    17. map.clear()
    18. }
    19. // 向累加器中增加数据 (In)
    20. override def add(word: String): Unit = {
    21. // 查询 map 中是否存在相同的单词
    22. // 如果有相同的单词,那么单词的数量加 1
    23. // 如果没有相同的单词,那么在 map 中增加这个单词
    24. map(word) = map.getOrElse(word, 0L) + 1L
    25. }
    26. // 合并累加器
    27. override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]):
    28. Unit = {
    29. val map1 = map
    30. val map2 = other.value
    31. // 两个 Map 的合并
    32. map = map1.foldLeft(map2)(
    33. ( innerMap, kv ) => {
    34. innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
    35. innerMap
    36. }
    37. ) }
    38. // 返回累加器的结果 (Out)
    39. override def value: mutable.Map[String, Long] = map
    40. }

    广播变量
    实现原理
    广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。
    基础编程

    1. val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
    2. val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
    3. // 声明广播变量
    4. val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
    5. val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
    6. case (key, num) => {
    7. var num2 = 0
    8. // 使用广播变量
    9. for ((k, v) <- broadcast.value) {
    10. if (k == key) {
    11. num2 = v
    12. }
    13. }
    14. (key, (num, num2))
    15. } }