def glom():RDD[Array[T]]
glom算子效果和flatMap(func)效果是相反的.
作用: 将RDD中每一个分区的元素合并成一个数组,形成新的 RDD 类型是RDD[Array[T]]
假如说我有个RDD1,RDD1里面有两个分区,两个分区分别放1 2 和 3 4. 经过glom算子之后就得到一个新的RDD2, RDD2还是两个分区,分区号没变,还是原来的分区号,此时每个分区的数据都合并成一个数组了, 数组里面是原来分区的值,.
比如说之前分区0里面的数据是 1 2 , 调用glom算子之后,分区0里面的数据是 array(1,2)了.
案例
/**
* -将RDD一个分区中的元素,组合成一个新的数组
*/
object Spark04_Transformation_glom222 {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//2个分区
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
//给每个分区的元素的值变成数组
var rddglom = rdd.glom()
//
var mappartitionsRDD = rddglom.mapPartitionsWithIndex {
(index, datas) => {
datas
}
}
var array: Array[Array[Int]] = mappartitionsRDD.collect()
// 获取第一个分区的元素,是个数组,按逗号变成字符串
println(array(0).mkString(",")) //输出: 1,2,3
//获取第二个分区的元素,是个数组,按逗号变成字符串
println(array(1).mkString(",")) //输出: 4,5,6
// 关闭连接
sc.stop()
}
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* -将RDD一个分区中的元素,组合成一个新的数组
*/
object Spark04_Transformation_glom {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
// 以分区为单位输出每个分区的元素.
println("------------没有glom之前------------")
rdd.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.mkString(","))//给数组展示出来.
datas
}
}.collect()
/*
0--->1,2,3
1--->4,5,6
*/
println("------------调用glom之后------------")
val newRDD: RDD[Array[Int]] = rdd.glom()
newRDD.mapPartitionsWithIndex {
(index, datas) => {
println(index + "--->" + datas.next().mkString(","))
datas
}
}.collect()
/*
0--->1,2,3
1--->4,5,6
*/
// 关闭连接
sc.stop()
}
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_Transformation_glom222 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Glom").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val list1 = List(30, 50, 70, 60, 10, 20)
val rdd1: RDD[Int] = sc.parallelize(list1, 2)
// val rdd2 = rdd1.glom().map(x => x.toList)
val rdd2 = rdd1.glom().map(_.toList)
// 一个分区组成一个数组
rdd2.collect.foreach(println)
//输出: List(30, 50, 70)
//List(60, 10, 20)
sc.stop()
}
}