累加器概念

https://www.yuque.com/docs/share/5314de22-204e-4731-86a7-4454a5ac0274?# 《Spark之累加器的概念和使用》

自定义累加器

自定义Map的累加器,
统计List里面的累加值,总数,平均值

  1. package com.add
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.util.AccumulatorV2
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object MyMapAccumulator {
  6. def main(args: Array[String]): Unit = {
  7. val conf: SparkConf = new SparkConf().setAppName("MyAcc").setMaster("local[4]")
  8. val sc: SparkContext = new SparkContext(conf)
  9. val list1 = List(30, 50, 70, 60, 10, 20, 10, 30, 40, 50)
  10. val rdd1: RDD[Int] = sc.parallelize(list1, 2)
  11. val acc = new MyMapAccumulator
  12. sc.register(acc)
  13. rdd1.foreach(x => acc.add(x))
  14. println(acc.value) //输出: Map(sum -> 370.0, count -> 10, avg -> 37.0)
  15. sc.stop()
  16. }
  17. }
  18. // 将来累加器的值同时包含 sum, count, avg
  19. // (sum, count, avg)
  20. // Map("sum"-> 1000, "count"-> 10, "avg" -> 100)
  21. class MyMapAccumulator extends AccumulatorV2[Double, Map[String, Any]] {
  22. private var map = Map[String, Any]()
  23. override def isZero: Boolean = map.isEmpty
  24. override def copy(): AccumulatorV2[Double, Map[String, Any]] = {
  25. println("copy...")
  26. val acc = new MyMapAccumulator
  27. acc.map = map //复制到当前的map
  28. acc
  29. }
  30. /** 重置一下
  31. * 不可变集合, 直接赋值一个空的集合
  32. *
  33. */
  34. override def reset(): Unit = map = {
  35. println("reset...")
  36. Map[String, Any]()
  37. }
  38. override def add(v: Double): Unit = {
  39. // 对sum和count进行累加. avg在最后value函数进行计算
  40. // 强转成double进行累加.
  41. map += "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double] + v)
  42. map += "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long] + 1L)
  43. }
  44. /**
  45. * 把所有分区的map进行合并
  46. *
  47. * @param other
  48. */
  49. override def merge(other: AccumulatorV2[Double, Map[String, Any]]): Unit = {
  50. // 合并两个map
  51. other match {
  52. //是MapAcc才进行合并,如果是其它类型的话就抛异常
  53. case o: MyMapAccumulator =>
  54. map +=
  55. "sum" -> (map.getOrElse("sum", 0D).asInstanceOf[Double]
  56. + o.map.getOrElse("sum", 0D).asInstanceOf[Double])
  57. map +=
  58. "count" -> (map.getOrElse("count", 0L).asInstanceOf[Long]
  59. + o.map.getOrElse("count", 0L).asInstanceOf[Long])
  60. case _ => throw new UnsupportedOperationException
  61. }
  62. }
  63. override def value: Map[String, Any] = {
  64. // 进行平均值统计
  65. map += "avg" ->
  66. (map.getOrElse("sum", 0D).asInstanceOf[Double] / map.getOrElse("count", 0L).asInstanceOf[Long])
  67. map
  68. }
  69. }

输出

  1. Map(sum -> 370.0, count -> 10, avg -> 37.0)