广播变量

用法

  1. val list: List[String] = List("Apache", "Spark")
  2. // sc为SparkContext实例
  3. val bc = sc.broadcast(list)

解决的问题

  1. import org.apache.spark.rdd.RDD
  2. val rootPath: String = _
  3. val file: String = s"${rootPath}/wikiOfSpark.txt"
  4. // 读取文件内容
  5. val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
  6. // 以行为单位做分词
  7. val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
  8. // 创建单词列表list
  9. val list: List[String] = List("Apache", "Spark")
  10. // 使用list列表对RDD进行过滤
  11. val cleanWordRDD: RDD[String] = wordRDD.filter(word => list.contains(word))
  12. // 把RDD元素转换为(Key,Value)的形式
  13. val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
  14. // 按照单词做分组计数
  15. val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
  16. // 获取计算结果
  17. wordCounts.collect
  18. // Array[(String, Int)] = Array((Apache,34), (Spark,63))

单词列表list是在driver端创建的,因此,Spark需要把list变量分发给每一个分布式任务,也就是说,系统中有多少个task,变量就会在网络中分发多少次
image.png

广播变量的分发,是以Executor为粒度的,同一个Executor内多个不同的Tasks只需访问一份数据拷贝即可
image.png

累加器

工作原理

主要作用是全局计数,与单机系统不同,在分布式系统中,我们不能依赖简单的普通变量来完成全局计数

与广播变量类似,累加器也是在Driver端定义的,但它的更新是通过在RDD算子中调用add函数完成的。在应用执行完毕之后,开发者在Driver端调用累加器的value函数,就能获取全局计数结果

  1. import org.apache.spark.rdd.RDD
  2. val rootPath: String = _
  3. val file: String = s"${rootPath}/wikiOfSpark.txt"
  4. // 读取文件内容
  5. val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
  6. // 以行为单位做分词
  7. val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
  8. // 定义Long类型的累加器
  9. val ac = sc.longAccumulator("Empty string")
  10. // 定义filter算子的判定函数f,注意,f的返回类型必须是Boolean
  11. def f(x: String): Boolean = {
  12. if(x.equals("")) {
  13. // 当遇到空字符串时,累加器加1
  14. ac.add(1)
  15. return false
  16. } else {
  17. return true
  18. }
  19. }
  20. // 使用f对RDD进行过滤
  21. val cleanWordRDD: RDD[String] = wordRDD.filter(f)
  22. // 把RDD元素转换为(Key,Value)的形式
  23. val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
  24. // 按照单词做分组计数
  25. val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
  26. // 收集计数结果
  27. wordCounts.collect
  28. // 作业执行完毕,通过调用value获取累加器结果
  29. ac.value
  30. // Long = 79