累加器

  • 分布式共享只写变量: executor端的结果收集到driver端进行汇总
  • 优点:可以替代一些shuffle,提高效率

    使用系统自带累加器

    ```scala def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(“SparkCoreTest”).setMaster(“local[]”) val sc: SparkContext = new SparkContext(conf) val rdd: RDD[(String, Int)] = sc.makeRDD(List((“a”,1),(“a”,2),(“a”,3),(“a”,4)),2) / //存在shuffle,效率较低 val resRDD: RDD[(String, Int)] = rdd.reduceByKey(+) resRDD.map(_._2).collect().foreach(println) */ //创建累加器 //val sum: Accumulator[Int] = sc.accumulator(10) //过时了 val sum: LongAccumulator = sc.longAccumulator(“myAcc”)//since spark2.0 rdd.foreach{ case (word,count)=>{
    1. //sum +=count不能用+=,因为sum是累加器不是Long
    2. sum.add(count)
    3. println("****" + sum.value)
    } } println(sum.value) // 关闭连接 sc.stop() }
  1. <a name="Nnf2c"></a>
  2. #### 自定义累加器
  3. ```scala
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
  6. import org.apache.spark.{SparkConf, SparkContext}
  7. import scala.collection.mutable
  8. //自定义累加器,统计出RDD中,所有以"H"开头的单词以及出现次数(word,count)
  9. object Spark07_Accumulator {
  10. def main(args: Array[String]): Unit = {
  11. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  12. val sc: SparkContext = new SparkContext(conf)
  13. val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "HaHa", "Hello", "HaHa", "Spark", "Spark"))
  14. //创建累加器对象
  15. val myAcc = new MyAccumulator
  16. //注册累加器
  17. sc.register(myAcc)
  18. //使用累加器
  19. rdd.foreach{word=>{myAcc.add(word)}}
  20. //输出累加器结果
  21. println(myAcc.value)
  22. // 关闭连接
  23. sc.stop()
  24. }
  25. }
  26. //定义一个类,继承AccumulatorV2
  27. class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Int]]{
  28. //定义一个集合,集合单词以及出现次数
  29. var map = mutable.Map[String,Int]()
  30. //是否为初始状态
  31. override def isZero: Boolean = map.isEmpty
  32. //复制累加器
  33. override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
  34. new MyAccumulator
  35. }
  36. //重置累加器
  37. override def reset(): Unit = map.clear()
  38. //向累加器中添加元素
  39. override def add(elem: String): Unit = {
  40. if(elem.startsWith("H")){
  41. //向可变集合中添加或者更新元素
  42. map(elem) = map.getOrElse(elem,0) + 1
  43. }
  44. }
  45. //合并累加器 ("H",2)("H",3)>>>("H",5)
  46. override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
  47. //当前Excutor的map
  48. var map1 = map
  49. //另一个Excutor的map
  50. var map2 = other.value
  51. map = map1.foldLeft(map2) {
  52. (mm, kv) => {//mm表示map2,kv表示map1中的每一个元素
  53. //指定合并规则
  54. val k: String = kv._1
  55. val v: Int = kv._2
  56. //根据map1中元素的key,到map2中获取value
  57. mm(k) = mm.getOrElse(k, 0) + v
  58. mm
  59. }
  60. }
  61. }
  62. //返回累加器的值
  63. override def value: mutable.Map[String, Int] = map
  64. }

广播变量

  • 分布式只读变量: 把变量广播到executor中 供本excutor中的多个task共用
  • 优点:改变了默认共享数据时,给每个task都发送数据的情况。 减少内存占用 ```scala //想实现类似join效果 (a,(1,4)),(b,(2,5)),(c,(3,6)) object WordCount { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(“SparkCoreTest”).setMaster(“local[*]”) val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[(String, Int)] = sc.makeRDD(List((“a”,1),(“b”,2),(“c”,3))) val list: List[(String, Int)] = List((“a”,4),(“b”,5),(“c”,6)) //创建一个广播变量 val broadcastList: Broadcast[List[(String, Int)]] = sc.broadcast(list) val resRDD: RDD[(String, (Int, Int))] = rdd.map {

    1. case (k1, v1) => {
    2. var v3 = 0
    3. for ((k2, v2) <- broadcastList.value) {//for ((k2, v2) <- list) {
    4. if (k1 == k2) {
    5. v3 = v2
    6. }
    7. }
    8. (k1, (v1, v3))
    9. }

    } resRDD.collect().foreach(println)

    sc.stop() } }

```