longAccumulator(“xx”)
package tcode.day05import org.apache.spark.{SparkConf, SparkContext}object $08_Accumulator {//累加器: 将每个task的累加结果返回给Driver,由Driver统一汇总// 好处: 可以在一定程度上减少shuffledef main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val acc = sc.longAccumulator("xx")//xx是累加器名字var sum = 0val rdd = sc.parallelize(List(10,2,6,8,3,4,9))rdd.foreach(x=> acc.add(x))println(acc.value)// println(sum)如果直接打印sum只会打印driver中的sum,其结果不会改变Thread.sleep(100000000)}}
自定义累加器
WordCountAccumulator
总结:
0、场景: 累加器能够一定程度上减少shuffle,用于聚合场景
原理: 首先在每个task中累加,然后将task累加的结果发送给Driver,由Driver汇总结果
自定义累加器:
1、定义一个class继承AccumulatorV2[IN,OUT]
IN: 累加的元素类型
OUT: 累加器最终结果类型
2、重写抽象方法
isZero: 累加器是否为空
copy: 复制累加器,传递给每个task
reset: 重置累加器
add: 累加元素,在task中执行
merge: 汇总task的结果,在Driver中执行
value: 获取最终结果
3、使用自定义累加器步骤(默认的累加器不需要注册):
创建自定义分区器对象: val acc = new 类名
注册累加器: sc.register(累加器对象)
累加元素与得到: acc.add/
最终结果:acc.value
package tcode.day06import org.apache.spark.util.AccumulatorV2import scala.collection.mutable//继承的累加器需要两个参数,IN\OUTclass WordCountAccumulator extends AccumulatorV2[(String,Int),mutable.Map[String,Int]]{//创建容器,用于累加val result = mutable.Map[String,Int]()//判断累加器是否为空override def isZero: Boolean = result.isEmpty//复制一个累加器给每个taskoverride def copy(): AccumulatorV2[(String, Int), mutable.Map[String, Int]] = new WordCountAccumulator//重置累加器override def reset(): Unit = result.clear()//累加元素 【在RDD每个task中累加】override def add(v: (String, Int)): Unit = {println(s"add 线程名=${Thread.currentThread().getName} 传入的数据=${v} 之前的累加结果:${result}")//判断单词在map中是否存在,如果存在则累加次数// (spark,2),(hadoop,3)if(result.contains( v._1 )){//获取之前的累加的次数val num = result.getOrElse( v._1,0 )// put去重,删除原来相同的元素,添加新的元素result.put( v._1 ,num + v._2)}//如果不存在,直接将元素放入mapelse{result.+=(v)}}// 以上结果并行发生在每个分区中(executor)//汇总所有task的结果[在Driver中执行]// 每个task都有一个copy方法传递过去的累加器,task执行玩后,将累加器传递回来(other)override def merge(other: AccumulatorV2[(String, Int), mutable.Map[String, Int]]): Unit = {println(s"merge 线程名=${Thread.currentThread().getName} 传入的数据=${other.value} 之前的累加结果:${result}")//1、合并当前Driver结果与传入task的结果汇总val otherTaskList = other.value//多态,driver的AccumulatorV2的子类wordcountaccumulator的result// map不能有相同的key,转化为listval totalList = otherTaskList.toList ++ result.toList//[W->10,W->20,..]//2、按照单词分组val groupMap = totalList.groupBy(_._1)//Map(// W -> List( W->10,W->20,..)// )//3、汇总次数val totalNum = groupMap.map(x=> {//x = W -> List( W->10,W->20,..)(x._1, x._2.map(_._2).sum )// (w->30)})result.++=(totalNum)}//获取最终结果override def value: mutable.Map[String, Int] = {result// 这里每个task都有result,作为上面的other传递进去实行多分区结果累加}}
自定义累加器的调用
package tcode.day06import org.apache.spark.{SparkConf, SparkContext}object $01_Accumulator {/*** 累加器: 能够在一定程度上减少shuffle操作* 累加器用于聚合场景* 自定义累加器的使用:* 1、创建自定义累加器对象: new 自定义累加器* 2、注册: sc.register(wc,"acc")* 3、累加元素: acc.add(..)* 4、获取最终结果: acc.value*/def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))//注册自定义累加器val wc = new WordCountAccumulatorsc.register(wc,"acc")val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(_.split(" "))val rdd3 = rdd2.map((_,1))//val rdd4 = rdd3.reduceByKey(_+_),累加器代替reducerdd3.mapPartitionsWithIndex((index,it)=>{println(s"index:${index} it=${it.toList}")it}).collect()rdd3.foreach(x=> wc.add(x))// 使用循环来添加累加器println(wc.value.toList)Thread.sleep(10000000)}}
