分布式共享只写变量
1.使用
如果没有累加器,那么Executor中计算的值是不会传回Driver中的
val sumAcc = sc.longAccumulator("name")rdd.foreach(num => {sumAcc.add(num)})println(sumAcc.value)

2.问题
少加:如果没有行动算子,转换算子就不会执行
多加:多次调用行动算子,转换算子会多次执行 导致多加
object Spark03_Acc {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ACC2")val sc= new SparkContext(conf)val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5,6))val sum: LongAccumulator = sc.longAccumulator("sum1")val res: RDD[Int] = rdd.map(num => {sum.add(num)num})//少加:如果没有行动算子,转换算子就不会执行res.collect()//多加:多次调用行动算子,转换算子会多次执行 导致多加res.collect()println(sum.value)}}


3.自定义累加器
- 继承
- 实例化
- 注册
word count
object Spark04_Acc {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ACC4")val sc: SparkContext = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(List("Hello", "Spark", "Scala", "Hello"))val acc: MyAcc = new MyAcc //2.实例化sc.register(acc, "sum") //3.注册rdd.foreach(item => acc.add(item))println(acc.value)}//1.继承class MyAcc extends AccumulatorV2[String,mutable.Map[String,Long]]{ // [输入值,输出值]private val map:mutable.Map[String,Long] = mutable.Map()//3.是否为空override def isZero: Boolean = map.isEmpty//1.复制(防止重新计算超出预期值,提供一个全新)override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] =new MyAcc//2.重制(防止重新计算超出预期值)override def reset(): Unit = map.empty//增加override def add(v: String): Unit = {val num: Long = map.getOrElse(v, 0L) + 1map.update(v,num)}//合并多个累加器(分布式计算)override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {val map1: mutable.Map[String, Long] = this.mapval map2: mutable.Map[String, Long] = other.valuemap2.foreach{case (k,v) =>{val count: Long = map1.getOrElse(k,0L) + vmap1.update(k,count)}}}//获取值override def value: mutable.Map[String, Long] =this.map}}

