概念

作用
按照key进行分组,返回的新的RDD, key是就是key, value是key里面的内容
分组的时候也可以指定分区器或者分区个数(默认使用的是HashPartitioner)
在groupByKey算子执行的时候,可能会shuffle操作.
image.png


1. 基于当前的实现, groupByKey必须在内存中持有所有的键值对. 如果一个key有太多的value, 则会导致内存溢出(OutOfMemoryError)
2. 所以这操作非常耗资源, 如果分组的目的是为了在每个key上执行聚合操作(比如: sum 和 average), 则应该使用PairRDDFunctions.aggregateByKey 或者PairRDDFunctions.reduceByKey, 因为他们有更好的性能(会先在分区进行预聚合)

wordCount案例

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * Desc: 转换算子-groupByKey
  5. * -根据key对RDD中的元素进行分组
  6. */
  7. object Transformation_groupByKey {
  8. def main(args: Array[String]): Unit = {
  9. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  10. val sc: SparkContext = new SparkContext(conf)
  11. //创建RDD
  12. val rdd = sc.makeRDD(List(("a", 1), ("b", 5), ("a", 5), ("b", 2)))
  13. val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
  14. groupRDD.collect().foreach(println)
  15. /*
  16. 输出:
  17. (a,CompactBuffer(1, 5))
  18. (b,CompactBuffer(5, 2))
  19. */
  20. val resRDD: RDD[(String, Int)] = groupRDD.map {
  21. case (key, datas) => { //模式匹配.对元祖的value进行求和就可以了.
  22. (key, datas.sum)
  23. }
  24. }
  25. resRDD.collect().foreach(println)
  26. /*
  27. 输出:
  28. (a,6)
  29. (b,7)
  30. */
  31. sc.stop()
  32. }
  33. }

wordCount案例第二种写法

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Transformation_groupByKey {
  4. def main(args: Array[String]): Unit = {
  5. val conf: SparkConf = new SparkConf().setAppName("GroupByKey").setMaster("local[2]")
  6. val sc: SparkContext = new SparkContext(conf)
  7. val rdd1 = sc.parallelize(Array("hello", "hello", "world", "hello", "hello"))
  8. val wordOne = rdd1.map((_, 1))
  9. val wordOneGrouped = wordOne.groupByKey().mapValues(_.sum)
  10. wordOneGrouped.collect.foreach(println)
  11. /*输出
  12. (hello,4)
  13. (world,1)
  14. */
  15. sc.stop()
  16. }
  17. }