分布式共享只写变量

1.使用

如果没有累加器,那么Executor中计算的值是不会传回Driver中的

  1. val sumAcc = sc.longAccumulator("name")
  2. rdd.foreach(
  3. num => {
  4. sumAcc.add(num)
  5. }
  6. )
  7. println(sumAcc.value)

image.png

2.问题

少加:如果没有行动算子,转换算子就不会执行

多加:多次调用行动算子,转换算子会多次执行 导致多加

  1. object Spark03_Acc {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ACC2")
  4. val sc= new SparkContext(conf)
  5. val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5,6))
  6. val sum: LongAccumulator = sc.longAccumulator("sum1")
  7. val res: RDD[Int] = rdd.map(
  8. num => {
  9. sum.add(num)
  10. num
  11. }
  12. )
  13. //少加:如果没有行动算子,转换算子就不会执行
  14. res.collect()
  15. //多加:多次调用行动算子,转换算子会多次执行 导致多加
  16. res.collect()
  17. println(sum.value)
  18. }
  19. }

image.png

image.png

3.自定义累加器

  1. 继承
  2. 实例化
  3. 注册

word count

  1. object Spark04_Acc {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ACC4")
  4. val sc: SparkContext = new SparkContext(conf)
  5. val rdd: RDD[String] = sc.makeRDD(List("Hello", "Spark", "Scala", "Hello"))
  6. val acc: MyAcc = new MyAcc //2.实例化
  7. sc.register(acc, "sum") //3.注册
  8. rdd.foreach(
  9. item => acc.add(item)
  10. )
  11. println(acc.value)
  12. }
  13. //1.继承
  14. class MyAcc extends AccumulatorV2[String,mutable.Map[String,Long]]{ // [输入值,输出值]
  15. private val map:mutable.Map[String,Long] = mutable.Map()
  16. //3.是否为空
  17. override def isZero: Boolean = map.isEmpty
  18. //1.复制(防止重新计算超出预期值,提供一个全新)
  19. override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] =
  20. new MyAcc
  21. //2.重制(防止重新计算超出预期值)
  22. override def reset(): Unit = map.empty
  23. //增加
  24. override def add(v: String): Unit = {
  25. val num: Long = map.getOrElse(v, 0L) + 1
  26. map.update(v,num)
  27. }
  28. //合并多个累加器(分布式计算)
  29. override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
  30. val map1: mutable.Map[String, Long] = this.map
  31. val map2: mutable.Map[String, Long] = other.value
  32. map2.foreach{
  33. case (k,v) =>{
  34. val count: Long = map1.getOrElse(k,0L) + v
  35. map1.update(k,count)
  36. }
  37. }
  38. }
  39. //获取值
  40. override def value: mutable.Map[String, Long] =
  41. this.map
  42. }
  43. }

image.png