函数签名:
def groupByk(implicitkt:ClassTag[K]):RDD[{K,Iterable[T]}]

groupBy算子会对RDD里面的每个元素进行遍历,给每个元素取出来作为参数传递给T, 然后根据分组规则去匹配.

groupBy会存在shuffle过程

shuffle:将不同的分区数据进行打乱重组的过程
shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。
作用:
按照func的返回值进行分组.
func返回值作为 key, 对应的值放入一个迭代器中. 返回的 RDD: RDD[(K, Iterable[T])
每组内元素的顺序不能保证, 并且甚至每次调用得到的顺序也有可能不同.

groupBy算子返回值是元祖,key是当前组,value是当前组的所有元素.

image.png

例子

根据偶数和奇数分组

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. *
  5. * Desc: 转换算子-groupBy
  6. * -按照指定的规则,对RDD中的元素进行分组
  7. */
  8. object Spark05_Transformation_groupBy {
  9. def main(args: Array[String]): Unit = {
  10. //创建SparkConf并设置App名称
  11. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  12. //创建SparkContext,该对象是提交Spark App的入口
  13. val sc: SparkContext = new SparkContext(conf)
  14. val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
  15. println("==============groupBy分组前================")
  16. rdd.mapPartitionsWithIndex(
  17. (index, datas) => {
  18. println(index + "---->" + datas.mkString(","))
  19. datas
  20. }
  21. ).collect()
  22. /*输出: 0---->1,2,3,4
  23. 1---->5,6,7,8,9*/
  24. //按奇数和偶数分组
  25. val newRDD: RDD[(Int, Iterable[Int])] =
  26. rdd.groupBy(x => x % 2) // 可以简写成 rdd.groupBy(_ % 2)
  27. println("==============groupBy分组后================")
  28. newRDD.mapPartitionsWithIndex(
  29. (index, datas) => {
  30. println(index + "---->" + datas.mkString(","))
  31. datas
  32. }
  33. ).collect()
  34. //这里有三个分区,但是是根据奇数偶数分组的,数据就只有两条,要不就是奇数要不就是偶数
  35. //所以2号区域就是空的了.
  36. /*
  37. 2---->
  38. 1---->(1,CompactBuffer(1, 3, 5, 7, 9))
  39. 0---->(0,CompactBuffer(2, 4, 6, 8))
  40. */
  41. // 关闭连接
  42. sc.stop()
  43. }
  44. }

按相同元素进行分组

  1. object Spark05_Transformation_groupBy222 {
  2. def main(args: Array[String]): Unit = {
  3. //创建SparkConf并设置App名称
  4. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  5. //创建SparkContext,该对象是提交Spark App的入口
  6. val sc: SparkContext = new SparkContext(conf)
  7. val original = sc.makeRDD(List("aaa", "bbb", "ccc", "aaa", "ccc"))
  8. val value = original.groupBy(datas => datas)
  9. val tuples:Array[(String, Iterable[String])] = value.collect()
  10. for (i <- tuples) {
  11. println(i)
  12. }
  13. /*
  14. (ccc,CompactBuffer(ccc, ccc))
  15. (bbb,CompactBuffer(bbb))
  16. (aaa,CompactBuffer(aaa, aaa))
  17. */
  18. }
  19. }

按把奇数和偶数值分组,并且累加

  1. object Spark05_Transformation_groupBy222 {
  2. def main(args: Array[String]): Unit = {
  3. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  4. val sc: SparkContext = new SparkContext(conf)
  5. val original = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
  6. //按奇数偶数分组
  7. val gbrdd = original.groupBy(datas => datas % 2)
  8. val tuples: Array[(Int, Iterable[Int])] = gbrdd.collect()
  9. val value = gbrdd.map({
  10. //每个组的值累加
  11. case (k, it) => (k, it.sum)
  12. })
  13. println(value.collect().mkString(",")) //输出: (0,12),(1,9)
  14. }
  15. }