longAccumulator(“xx”)

  1. package tcode.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $08_Accumulator {
  4. //累加器: 将每个task的累加结果返回给Driver,由Driver统一汇总
  5. // 好处: 可以在一定程度上减少shuffle
  6. def main(args: Array[String]): Unit = {
  7. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  8. val acc = sc.longAccumulator("xx")//xx是累加器名字
  9. var sum = 0
  10. val rdd = sc.parallelize(List(10,2,6,8,3,4,9))
  11. rdd.foreach(x=> acc.add(x))
  12. println(acc.value)// println(sum)如果直接打印sum只会打印driver中的sum,其结果不会改变
  13. Thread.sleep(100000000)
  14. }
  15. }

自定义累加器

WordCountAccumulator
总结:
0、场景: 累加器能够一定程度上减少shuffle,用于聚合场景
原理: 首先在每个task中累加,然后将task累加的结果发送给Driver,由Driver汇总结果
自定义累加器:
1、定义一个class继承AccumulatorV2[IN,OUT]
IN: 累加的元素类型
OUT: 累加器最终结果类型
2、重写抽象方法
isZero: 累加器是否为空
copy: 复制累加器,传递给每个task
reset: 重置累加器
add: 累加元素,在task中执行
merge: 汇总task的结果,在Driver中执行
value: 获取最终结果
3、使用自定义累加器步骤(默认的累加器不需要注册):
创建自定义分区器对象: val acc = new 类名
注册累加器: sc.register(累加器对象)
累加元素与得到: acc.add/
最终结果:acc.value

  1. package tcode.day06
  2. import org.apache.spark.util.AccumulatorV2
  3. import scala.collection.mutable
  4. //继承的累加器需要两个参数,IN\OUT
  5. class WordCountAccumulator extends AccumulatorV2[(String,Int),mutable.Map[String,Int]]{
  6. //创建容器,用于累加
  7. val result = mutable.Map[String,Int]()
  8. //判断累加器是否为空
  9. override def isZero: Boolean = result.isEmpty
  10. //复制一个累加器给每个task
  11. override def copy(): AccumulatorV2[(String, Int), mutable.Map[String, Int]] = new WordCountAccumulator
  12. //重置累加器
  13. override def reset(): Unit = result.clear()
  14. //累加元素 【在RDD每个task中累加】
  15. override def add(v: (String, Int)): Unit = {
  16. println(s"add 线程名=${Thread.currentThread().getName} 传入的数据=${v} 之前的累加结果:${result}")
  17. //判断单词在map中是否存在,如果存在则累加次数
  18. // (spark,2),(hadoop,3)
  19. if(result.contains( v._1 )){
  20. //获取之前的累加的次数
  21. val num = result.getOrElse( v._1,0 )
  22. // put去重,删除原来相同的元素,添加新的元素
  23. result.put( v._1 ,num + v._2)
  24. }
  25. //如果不存在,直接将元素放入map
  26. else{
  27. result.+=(v)
  28. }
  29. }// 以上结果并行发生在每个分区中(executor)
  30. //汇总所有task的结果[在Driver中执行]
  31. // 每个task都有一个copy方法传递过去的累加器,task执行玩后,将累加器传递回来(other)
  32. override def merge(other: AccumulatorV2[(String, Int), mutable.Map[String, Int]]): Unit = {
  33. println(s"merge 线程名=${Thread.currentThread().getName} 传入的数据=${other.value} 之前的累加结果:${result}")
  34. //1、合并当前Driver结果与传入task的结果汇总
  35. val otherTaskList = other.value//多态,driver的AccumulatorV2的子类wordcountaccumulator的result
  36. // map不能有相同的key,转化为list
  37. val totalList = otherTaskList.toList ++ result.toList
  38. //[W->10,W->20,..]
  39. //2、按照单词分组
  40. val groupMap = totalList.groupBy(_._1)
  41. //Map(
  42. // W -> List( W->10,W->20,..)
  43. // )
  44. //3、汇总次数
  45. val totalNum = groupMap.map(x=> {
  46. //x = W -> List( W->10,W->20,..)
  47. (x._1, x._2.map(_._2).sum )// (w->30)
  48. })
  49. result.++=(totalNum)
  50. }
  51. //获取最终结果
  52. override def value: mutable.Map[String, Int] = {
  53. result
  54. // 这里每个task都有result,作为上面的other传递进去实行多分区结果累加
  55. }
  56. }

自定义累加器的调用

  1. package tcode.day06
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $01_Accumulator {
  4. /**
  5. * 累加器: 能够在一定程度上减少shuffle操作
  6. * 累加器用于聚合场景
  7. * 自定义累加器的使用:
  8. * 1、创建自定义累加器对象: new 自定义累加器
  9. * 2、注册: sc.register(wc,"acc")
  10. * 3、累加元素: acc.add(..)
  11. * 4、获取最终结果: acc.value
  12. */
  13. def main(args: Array[String]): Unit = {
  14. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  15. //注册自定义累加器
  16. val wc = new WordCountAccumulator
  17. sc.register(wc,"acc")
  18. val rdd1 = sc.textFile("datas/wc.txt")
  19. val rdd2 = rdd1.flatMap(_.split(" "))
  20. val rdd3 = rdd2.map((_,1))
  21. //val rdd4 = rdd3.reduceByKey(_+_),累加器代替reduce
  22. rdd3.mapPartitionsWithIndex((index,it)=>{
  23. println(s"index:${index} it=${it.toList}")
  24. it
  25. }).collect()
  26. rdd3.foreach(x=> wc.add(x))// 使用循环来添加累加器
  27. println(wc.value.toList)
  28. Thread.sleep(10000000)
  29. }
  30. }