def glom():RDD[Array[T]]
image.png

glom算子效果和flatMap(func)效果是相反的.

作用: 将RDD中每一个分区的元素合并成一个数组,形成新的 RDD 类型是RDD[Array[T]]
假如说我有个RDD1,RDD1里面有两个分区,两个分区分别放1 2 和 3 4. 经过glom算子之后就得到一个新的RDD2, RDD2还是两个分区,分区号没变,还是原来的分区号,此时每个分区的数据都合并成一个数组了, 数组里面是原来分区的值,.

比如说之前分区0里面的数据是 1 2 , 调用glom算子之后,分区0里面的数据是 array(1,2)了.

案例

  1. /**
  2. * -将RDD一个分区中的元素,组合成一个新的数组
  3. */
  4. object Spark04_Transformation_glom222 {
  5. def main(args: Array[String]): Unit = {
  6. //创建SparkConf并设置App名称
  7. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  8. //创建SparkContext,该对象是提交Spark App的入口
  9. val sc: SparkContext = new SparkContext(conf)
  10. //2个分区
  11. val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
  12. //给每个分区的元素的值变成数组
  13. var rddglom = rdd.glom()
  14. //
  15. var mappartitionsRDD = rddglom.mapPartitionsWithIndex {
  16. (index, datas) => {
  17. datas
  18. }
  19. }
  20. var array: Array[Array[Int]] = mappartitionsRDD.collect()
  21. // 获取第一个分区的元素,是个数组,按逗号变成字符串
  22. println(array(0).mkString(",")) //输出: 1,2,3
  23. //获取第二个分区的元素,是个数组,按逗号变成字符串
  24. println(array(1).mkString(",")) //输出: 4,5,6
  25. // 关闭连接
  26. sc.stop()
  27. }
  28. }
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * -将RDD一个分区中的元素,组合成一个新的数组
  5. */
  6. object Spark04_Transformation_glom {
  7. def main(args: Array[String]): Unit = {
  8. //创建SparkConf并设置App名称
  9. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  10. //创建SparkContext,该对象是提交Spark App的入口
  11. val sc: SparkContext = new SparkContext(conf)
  12. val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
  13. // 以分区为单位输出每个分区的元素.
  14. println("------------没有glom之前------------")
  15. rdd.mapPartitionsWithIndex {
  16. (index, datas) => {
  17. println(index + "--->" + datas.mkString(","))//给数组展示出来.
  18. datas
  19. }
  20. }.collect()
  21. /*
  22. 0--->1,2,3
  23. 1--->4,5,6
  24. */
  25. println("------------调用glom之后------------")
  26. val newRDD: RDD[Array[Int]] = rdd.glom()
  27. newRDD.mapPartitionsWithIndex {
  28. (index, datas) => {
  29. println(index + "--->" + datas.next().mkString(","))
  30. datas
  31. }
  32. }.collect()
  33. /*
  34. 0--->1,2,3
  35. 1--->4,5,6
  36. */
  37. // 关闭连接
  38. sc.stop()
  39. }
  40. }
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object Spark04_Transformation_glom222 {
  4. def main(args: Array[String]): Unit = {
  5. val conf: SparkConf = new SparkConf().setAppName("Glom").setMaster("local[2]")
  6. val sc: SparkContext = new SparkContext(conf)
  7. val list1 = List(30, 50, 70, 60, 10, 20)
  8. val rdd1: RDD[Int] = sc.parallelize(list1, 2)
  9. // val rdd2 = rdd1.glom().map(x => x.toList)
  10. val rdd2 = rdd1.glom().map(_.toList)
  11. // 一个分区组成一个数组
  12. rdd2.collect.foreach(println)
  13. //输出: List(30, 50, 70)
  14. //List(60, 10, 20)
  15. sc.stop()
  16. }
  17. }