分布式共享只写变量
1.使用
如果没有累加器,那么Executor中计算的值是不会传回Driver中的
val sumAcc = sc.longAccumulator("name")
rdd.foreach(
num => {
sumAcc.add(num)
}
)
println(sumAcc.value)
2.问题
少加:如果没有行动算子,转换算子就不会执行
多加:多次调用行动算子,转换算子会多次执行 导致多加
object Spark03_Acc {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ACC2")
val sc= new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5,6))
val sum: LongAccumulator = sc.longAccumulator("sum1")
val res: RDD[Int] = rdd.map(
num => {
sum.add(num)
num
}
)
//少加:如果没有行动算子,转换算子就不会执行
res.collect()
//多加:多次调用行动算子,转换算子会多次执行 导致多加
res.collect()
println(sum.value)
}
}
3.自定义累加器
- 继承
- 实例化
- 注册
word count
object Spark04_Acc {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("ACC4")
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Spark", "Scala", "Hello"))
val acc: MyAcc = new MyAcc //2.实例化
sc.register(acc, "sum") //3.注册
rdd.foreach(
item => acc.add(item)
)
println(acc.value)
}
//1.继承
class MyAcc extends AccumulatorV2[String,mutable.Map[String,Long]]{ // [输入值,输出值]
private val map:mutable.Map[String,Long] = mutable.Map()
//3.是否为空
override def isZero: Boolean = map.isEmpty
//1.复制(防止重新计算超出预期值,提供一个全新)
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] =
new MyAcc
//2.重制(防止重新计算超出预期值)
override def reset(): Unit = map.empty
//增加
override def add(v: String): Unit = {
val num: Long = map.getOrElse(v, 0L) + 1
map.update(v,num)
}
//合并多个累加器(分布式计算)
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1: mutable.Map[String, Long] = this.map
val map2: mutable.Map[String, Long] = other.value
map2.foreach{
case (k,v) =>{
val count: Long = map1.getOrElse(k,0L) + v
map1.update(k,count)
}
}
}
//获取值
override def value: mutable.Map[String, Long] =
this.map
}
}