累加器概念

https://www.yuque.com/docs/share/5314de22-204e-4731-86a7-4454a5ac0274?# 《Spark之累加器的概念和使用》

自定义累加器

  1. package com.add
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.util.AccumulatorV2
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. /**
  6. * 自定义Int类型的累加器
  7. */
  8. object MyIntAccumulator {
  9. def main(args: Array[String]): Unit = {
  10. val conf: SparkConf = new SparkConf().setAppName("Add").setMaster("local[2]")
  11. val sc: SparkContext = new SparkContext(conf)
  12. val list1 = List(30, 50, 70, 60, 10, 20)
  13. val rdd1: RDD[Int] = sc.parallelize(list1, 2)
  14. // 先注册自定义的累加器
  15. val acc = new MyIntAccumulator
  16. sc.register(acc, "first")
  17. val rdd2: RDD[Int] = rdd1.map(x => {
  18. acc.add(1)
  19. x
  20. })
  21. rdd2.collect
  22. println(acc.value)
  23. sc.stop()
  24. }
  25. }
  26. /**
  27. * 自定义int累加器
  28. * 泛型的意思: 第一个是调用这个累加器传什么值, 第二个泛型意思是返回的结果是什么类型的
  29. * 比如说对int值进行累加,那么第一个值就是int类型的,
  30. * 这个累加器返回int值,那么第二个泛型就是int类型的.
  31. */
  32. class MyIntAccumulator extends AccumulatorV2[Int, Int] {
  33. private var sum = 0
  34. /**
  35. * 判断是不是"零", 对缓冲区值进行判"零"
  36. * 当然这个具体得看业务了,比如说集合,那么就是判空集合,
  37. * 如果是map累加器那么就是空map
  38. * 如果是字符串的累加器,那么就是判断空字符串
  39. * 所以具体得看业务了.
  40. *
  41. * @return
  42. */
  43. override def isZero: Boolean = sum == 0
  44. /** 把当前的累加复制为一个新的累加器
  45. *
  46. * @return
  47. */
  48. override def copy(): AccumulatorV2[Int, Int] = {
  49. val acc = new MyIntAccumulator
  50. acc.sum = sum
  51. acc
  52. }
  53. /** 重置累加器(就是把缓冲区的值重置为"零")
  54. *
  55. */
  56. override def reset(): Unit = sum = 0
  57. /** 真正的累加方法,这个是分区内的累加,多个分区各自累加
  58. *
  59. * @param v
  60. */
  61. override def add(v: Int): Unit = sum += v
  62. /** 分区间的合并 把other的sum合并到this的sum中
  63. * 把所有分区的值再累加在一起.
  64. *
  65. * @param other
  66. */
  67. override def merge(other: AccumulatorV2[Int, Int]): Unit = other match {
  68. case acc: MyIntAccumulator => this.sum += acc.sum
  69. case _ => this.sum += 0
  70. }
  71. /** 返回累加后的最终值
  72. *
  73. * @return
  74. */
  75. override def value: Int = sum
  76. }

输出

  1. 6