概述
扁平化就是给整体拆分成一个一个的过程
flatMap和Scala的flatMap含义几乎是一样的.
flatMap是先做Map再做flat,flatMap要想执行的话,必须是整体,如果不是整体的话,无法拆分成个体.
作用: 类似于map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以func应该返回一个序列,而不是单一元素 T => TraversableOnce[U])
def flatMapU:ClassTag:RDD[U]
flatMap作用与map操作类似,将RDD中每一个元素通过应用f函数依次转换成新的元素,并封装到RDD中,区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中.
案例
需求: 创建一个集合,集合里面存储的还是子集合,把所有的子集合中数据取出来放入到一个大的集合中.
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
*
* Desc: 转换算子-faltMap
* -对集合中的元素进行扁平化处理
*/
object demo {
def main(args: Array[String]): Unit = {
//创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//大集合里面放的一个个小的集合,设定默认分区数是2
val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4), List(5, 6), List(7, 8)), 2)
//注意:如果匿名函数输入和输出相同,那么不能简化
//datas 是整体对象
// 把所有的子集合的元素取出来放到大的集合里面
val newRDD: RDD[Int] = rdd.flatMap(datas => datas)
//输出newRDD
var result = newRDD.collect()
result.foreach(println) //原来集合里面放的是集合,现在集合里面放的都是元素,
// 关闭连接
sc.stop()
}
}
其它练习
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("FlatMap").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val list1 = List(30, 5, 70, 6, 1, 20)
val rdd1 = sc.parallelize(list1)
// 给rdd1 数据放到list集合里面
val rdd2 = rdd1.flatMap(x => List(x))
println(rdd2.collect().mkString(",")) //输出: 30,5,70,6,1,20
// 给每个元素乘以2
val rdd3 = rdd1.flatMap(x => List(x, x * 2))
println(rdd3.collect().mkString(",")) //输出: 30,60,5,10,70,140,6,12,1,2,20,40
// 给每个元素乘以3
val rdd4 = rdd1.flatMap(x => List(x, x * 2, x * 3))
println(rdd4.collect().mkString(",")) //输出: 30,60,90,5,10,15,70,140,210,6,12,18,1,2,3,20,40,60
//过滤集合的值,能被整除的取出来
val rdd5 = rdd1.flatMap(x => if (x % 2 == 0) List(x) else List[Int]())
println(rdd5.collect().mkString(",")) //输出: 30,70,6,20
sc.stop()
}