image.png


先根据flatMap进行压平,然后在根据map算子给每个词变成元祖,key就是词汇,value就是1, 然后再根据元祖的key进行分组,最后将相同key的value值进行累加.

版本1

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark06_WordCount1 {
  4. def main(args: Array[String]): Unit = {
  5. //创建SparkConf并设置App名称
  6. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  7. //创建SparkContext,该对象是提交Spark App的入口
  8. val sc: SparkContext = new SparkContext(conf)
  9. //创建RDD
  10. val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark", "Hello World"))
  11. //简单版-实现方式1
  12. //对RDD中的元素进行扁平映射
  13. val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))
  14. //将映射后的数据进行结构的转换,为每个单词计数
  15. val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_,1))
  16. //按照key对RDD中的元素进行分组 (Hello,CompactBuffer((Hello,1), (Hello,1), (Hello,1)))
  17. val groupByRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupBy(_._1)
  18. //对分组后的元素再次进行映射 (Hello,3)
  19. val resRDD: RDD[(String, Int)] = groupByRDD.map {
  20. case (word, datas) => { //模式匹配
  21. (word, datas.size)
  22. }
  23. }
  24. // 关闭连接
  25. sc.stop()
  26. }
  27. }

版本2

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark06_WordCount2 {
  4. def main(args: Array[String]): Unit = {
  5. //创建SparkConf并设置App名称
  6. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  7. //创建SparkContext,该对象是提交Spark App的入口
  8. val sc: SparkContext = new SparkContext(conf)
  9. //创建RDD
  10. val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark", "Hello World"))
  11. //简单版-实现方式2
  12. //对RDD中的元素进行扁平映射
  13. val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))
  14. //将RDD中的单词进行分组
  15. val groupByRDD: RDD[(String, Iterable[String])] = flatMapRDD.groupBy(word=>word)
  16. //对分组之后的数据再次进行映射
  17. val resRDD: RDD[(String, Int)] = groupByRDD.map {
  18. case (word, datas) => {
  19. (word, datas.size)
  20. }
  21. }
  22. // 关闭连接
  23. sc.stop()
  24. }
  25. }

版本3

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**不建议使用
  4. *
  5. */
  6. object Spark06_WordCount3 {
  7. def main(args: Array[String]): Unit = {
  8. //创建SparkConf并设置App名称
  9. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  10. //创建SparkContext,该对象是提交Spark App的入口
  11. val sc: SparkContext = new SparkContext(conf)
  12. //创建RDD
  13. val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2)))
  14. //复杂版 方式1
  15. //将原RDD中字符串以及字符串出现的次数,进行处理,形成一个新的字符串
  16. val rdd1: RDD[String] = rdd.map {
  17. case (str, count) => {
  18. (str + " ") * count
  19. }
  20. }
  21. //对RDD中的元素进行扁平映射
  22. val flatMapRDD: RDD[String] = rdd1.flatMap(_.split(" "))
  23. //将RDD中的单词进行分组
  24. val groupByRDD: RDD[(String, Iterable[String])] = flatMapRDD.groupBy(word=>word)
  25. //对分组之后的数据再次进行映射
  26. val resRDD: RDD[(String, Int)] = groupByRDD.map {
  27. case (word, datas) => {
  28. println(word +"--"+ datas.size)
  29. (word, datas.size)
  30. }
  31. }
  32. resRDD.collect()
  33. /*
  34. World--2
  35. Spark--3
  36. Scala--2
  37. Hello--7
  38. */
  39. // 关闭连接
  40. sc.stop()
  41. }
  42. }

版本4

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * Desc: 使用groupBy完成WordCount案例
  5. */
  6. object Spark06_WordCountOriginal {
  7. def main(args: Array[String]): Unit = {
  8. //创建SparkConf并设置App名称
  9. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  10. //创建SparkContext,该对象是提交Spark App的入口
  11. val sc: SparkContext = new SparkContext(conf)
  12. //复杂版 方式2
  13. val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2)))
  14. /*先给一个元祖转成两个元祖
  15. * ("Hello Scala", 2)==>(hello,2),(Scala,2) ("Hello Spark", 3)==>(hello,3),(Spark,3)
  16. */
  17. //对RDD中的元素进行扁平映射
  18. val flatMapRDD: RDD[(String, Int)] = rdd.flatMap {
  19. // 模式匹配元祖, words是多个单词组成的字符串,count 是字符串出现的次数,
  20. case (words, count) => {
  21. words.split(" ") //对字符串进行切割,结果是字符串的数组.
  22. .map(word => (word, count))//对每个单词转成元祖(单词,单词出现的次数)
  23. }
  24. }
  25. //按照单词对RDD中的元素进行分组 (Hello,CompactBuffer((Hello,2), (Hello,3), (Hello,2)))
  26. // _1的意思是当前元祖的第一个元素
  27. val groupByRDD: RDD[(String, Iterable[(String, Int)])] = flatMapRDD.groupBy(_._1)
  28. //对RDD的元素重新进行映射
  29. val resRDD: RDD[(String, Int)] = groupByRDD.map {
  30. case (word, datas) => {
  31. // _2意思是:拿到元祖第二个元素进行累加.
  32. (word, datas.map(_._2).sum)
  33. }
  34. }
  35. resRDD.collect().foreach(println)
  36. // 关闭连接
  37. sc.stop()
  38. }
  39. }