累加器
- 分布式共享只写变量: executor端的结果收集到driver端进行汇总
- 优点:可以替代一些shuffle,提高效率
使用系统自带累加器
```scala def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(“SparkCoreTest”).setMaster(“local[]”) val sc: SparkContext = new SparkContext(conf) val rdd: RDD[(String, Int)] = sc.makeRDD(List((“a”,1),(“a”,2),(“a”,3),(“a”,4)),2) / //存在shuffle,效率较低 val resRDD: RDD[(String, Int)] = rdd.reduceByKey(+) resRDD.map(_._2).collect().foreach(println) */ //创建累加器 //val sum: Accumulator[Int] = sc.accumulator(10) //过时了 val sum: LongAccumulator = sc.longAccumulator(“myAcc”)//since spark2.0 rdd.foreach{ case (word,count)=>{
} } println(sum.value) // 关闭连接 sc.stop() }//sum +=count不能用+=,因为sum是累加器不是Longsum.add(count)println("****" + sum.value)
<a name="Nnf2c"></a>#### 自定义累加器```scalaimport org.apache.spark.rdd.RDDimport org.apache.spark.util.{AccumulatorV2, LongAccumulator}import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable//自定义累加器,统计出RDD中,所有以"H"开头的单词以及出现次数(word,count)object Spark07_Accumulator {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "HaHa", "Hello", "HaHa", "Spark", "Spark"))//创建累加器对象val myAcc = new MyAccumulator//注册累加器sc.register(myAcc)//使用累加器rdd.foreach{word=>{myAcc.add(word)}}//输出累加器结果println(myAcc.value)// 关闭连接sc.stop()}}//定义一个类,继承AccumulatorV2class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Int]]{//定义一个集合,集合单词以及出现次数var map = mutable.Map[String,Int]()//是否为初始状态override def isZero: Boolean = map.isEmpty//复制累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {new MyAccumulator}//重置累加器override def reset(): Unit = map.clear()//向累加器中添加元素override def add(elem: String): Unit = {if(elem.startsWith("H")){//向可变集合中添加或者更新元素map(elem) = map.getOrElse(elem,0) + 1}}//合并累加器 ("H",2)("H",3)>>>("H",5)override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {//当前Excutor的mapvar map1 = map//另一个Excutor的mapvar map2 = other.valuemap = map1.foldLeft(map2) {(mm, kv) => {//mm表示map2,kv表示map1中的每一个元素//指定合并规则val k: String = kv._1val v: Int = kv._2//根据map1中元素的key,到map2中获取valuemm(k) = mm.getOrElse(k, 0) + vmm}}}//返回累加器的值override def value: mutable.Map[String, Int] = map}
广播变量
- 分布式只读变量: 把变量广播到executor中 供本excutor中的多个task共用
优点:改变了默认共享数据时,给每个task都发送数据的情况。 减少内存占用 ```scala //想实现类似join效果 (a,(1,4)),(b,(2,5)),(c,(3,6)) object WordCount { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(“SparkCoreTest”).setMaster(“local[*]”) val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List((“a”,1),(“b”,2),(“c”,3))) val list: List[(String, Int)] = List((“a”,4),(“b”,5),(“c”,6)) //创建一个广播变量 val broadcastList: Broadcast[List[(String, Int)]] = sc.broadcast(list) val resRDD: RDD[(String, (Int, Int))] = rdd.map {
case (k1, v1) => {var v3 = 0for ((k2, v2) <- broadcastList.value) {//for ((k2, v2) <- list) {if (k1 == k2) {v3 = v2}}(k1, (v1, v3))}
} resRDD.collect().foreach(println)
sc.stop() } }
```
