概述

扁平化就是给整体拆分成一个一个的过程

flatMap和Scala的flatMap含义几乎是一样的.
flatMap是先做Map再做flat,flatMap要想执行的话,必须是整体,如果不是整体的话,无法拆分成个体.

作用: 类似于map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以func应该返回一个序列,而不是单一元素 T => TraversableOnce[U])

def flatMapU:ClassTag:RDD[U]
flatMap作用与map操作类似,将RDD中每一个元素通过应用f函数依次转换成新的元素,并封装到RDD中,区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中.

案例

需求: 创建一个集合,集合里面存储的还是子集合,把所有的子集合中数据取出来放入到一个大的集合中.
image.png

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. *
  5. * Desc: 转换算子-faltMap
  6. * -对集合中的元素进行扁平化处理
  7. */
  8. object demo {
  9. def main(args: Array[String]): Unit = {
  10. //创建SparkConf并设置App名称
  11. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  12. //创建SparkContext,该对象是提交Spark App的入口
  13. val sc: SparkContext = new SparkContext(conf)
  14. //大集合里面放的一个个小的集合,设定默认分区数是2
  15. val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4), List(5, 6), List(7, 8)), 2)
  16. //注意:如果匿名函数输入和输出相同,那么不能简化
  17. //datas 是整体对象
  18. // 把所有的子集合的元素取出来放到大的集合里面
  19. val newRDD: RDD[Int] = rdd.flatMap(datas => datas)
  20. //输出newRDD
  21. var result = newRDD.collect()
  22. result.foreach(println) //原来集合里面放的是集合,现在集合里面放的都是元素,
  23. // 关闭连接
  24. sc.stop()
  25. }
  26. }

其它练习

  1. def main(args: Array[String]): Unit = {
  2. val conf: SparkConf = new SparkConf().setAppName("FlatMap").setMaster("local[2]")
  3. val sc: SparkContext = new SparkContext(conf)
  4. val list1 = List(30, 5, 70, 6, 1, 20)
  5. val rdd1 = sc.parallelize(list1)
  6. // 给rdd1 数据放到list集合里面
  7. val rdd2 = rdd1.flatMap(x => List(x))
  8. println(rdd2.collect().mkString(",")) //输出: 30,5,70,6,1,20
  9. // 给每个元素乘以2
  10. val rdd3 = rdd1.flatMap(x => List(x, x * 2))
  11. println(rdd3.collect().mkString(",")) //输出: 30,60,5,10,70,140,6,12,1,2,20,40
  12. // 给每个元素乘以3
  13. val rdd4 = rdd1.flatMap(x => List(x, x * 2, x * 3))
  14. println(rdd4.collect().mkString(",")) //输出: 30,60,90,5,10,15,70,140,210,6,12,18,1,2,3,20,40,60
  15. //过滤集合的值,能被整除的取出来
  16. val rdd5 = rdd1.flatMap(x => if (x % 2 == 0) List(x) else List[Int]())
  17. println(rdd5.collect().mkString(",")) //输出: 30,70,6,20
  18. sc.stop()
  19. }