累加器概念
https://www.yuque.com/docs/share/5314de22-204e-4731-86a7-4454a5ac0274?# 《Spark之累加器的概念和使用》
自定义累加器
自定义Map的累加器,
统计List里面的累加值,总数,平均值
package com.add
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
object MyMapAccumulator {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("MyAcc").setMaster("local[4]")
val sc: SparkContext = new SparkContext(conf)
val list1 = List(30, 50, 70, 60, 10, 20, 10, 30, 40, 50)
val rdd1: RDD[Int] = sc.parallelize(list1, 2)
val acc = new MyMapAccumulator
sc.register(acc)
rdd1.foreach(x => acc.add(x))
println(acc.value) //输出: Map(sum -> 370.0, count -> 10, avg -> 37.0)
sc.stop()
}
}
// 将来累加器的值同时包含 sum, count, avg
// (sum, count, avg)
// Map("sum"-> 1000, "count"-> 10, "avg" -> 100)
class MyMapAccumulator extends AccumulatorV2[Double, Map[String, Any]] {
private var map = Map[String, Any]()
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[Double, Map[String, Any]] = {
println("copy...")
val acc = new MyMapAccumulator
acc.map = map //复制到当前的map
acc
}
/** 重置一下
* 不可变集合, 直接赋值一个空的集合
*
*/
override def reset(): Unit = map = {
println("reset...")
Map[String, Any]()
}
override def add(v: Double): Unit = {
// 对sum和count进行累加. avg在最后value函数进行计算
// 强转成double进行累加.
map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + v)
map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + 1L)
}
/**
* 把所有分区的map进行合并
*
* @param other
*/
override def merge(other: AccumulatorV2[Double, Map[String, Any]]): Unit = {
// 合并两个map
other match {
//是MapAcc才进行合并,如果是其它类型的话就抛异常
case o: MyMapAccumulator =>
map +=
"sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double]
+ o.map.getOrElse("sum", 0D).asInstanceOf[Double])
map +=
"count" -> (map.getOrElse("count", 0L).asInstanceOf[Long]
+ o.map.getOrElse("count", 0L).asInstanceOf[Long])
case _ => throw new UnsupportedOperationException
}
}
override def value: Map[String, Any] = {
// 进行平均值统计
map += "avg" ->
(map.getOrElse("sum", 0D).asInstanceOf[Double] / map.getOrElse("count", 0L).asInstanceOf[Long])
map
}
}
输出
Map(sum -> 370.0, count -> 10, avg -> 37.0)