RDD
转换:功能的补充和封装,将旧的的RDD包装成新的RDD
行动:触发任务的调度和作业的执行

RDD根据数据处理方式不同将算子整体分为
Value类型、双Value类型、Key-Value类型


一 Value

1.Map()

函数签名: def map( f : T => U ) : RDD[U]

  1. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("R")
  2. val sc: SparkContext = new SparkContext(sparkConf)
  3. val rdd: RDD[String] = sc.textFile("datas/apache.log")
  4. val mapRDD: RDD[String] = rdd.map(
  5. line => {
  6. val datas: Array[String] = line.split(" ")
  7. datas(6)
  8. })
  9. mapRDD.collect().foreach(print)
  10. sc.stop()

数据源
image.png

输出
image.png

2.mapPartitions()

函数签名 def mapPartitionsU:ClassTag :RDD[U]

是以分区为单位进行数据转换操作,会将整个分区的数据加载到内存进行操作

例子:
1.每个分区过滤

  1. val rdd: RDD[String] = sc.textFile("datas/1.txt")
  2. val mapRDD: RDD[String] = rdd.mapPartitions(
  3. iter => {
  4. iter.filter( item => item.toInt % 2 == 0)
  5. }
  6. )
  7. mapRDD.collect().foreach(print)
  8. sc.stop()

2.每个分区最大值

  1. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("R")
  2. val sc: SparkContext = new SparkContext(sparkConf)
  3. val rdd: RDD[String] = sc.textFile("datas/3.txt")
  4. val mapRDD: RDD[Int] = rdd.mapPartitions(
  5. iter => {
  6. //iter.filter( item => item.toInt % 2 == 0)
  7. val arr: Iterator[Int] = iter.map(item => item.toInt)
  8. List(arr.max).iterator
  9. }
  10. )
  11. mapRDD.collect().foreach(println)
  12. sc.stop()

3.mapPartitionsWithIndex()

函数签名 def mapPartitionsWithIndexU:ClassTag => Iterator[U] , preservesPartitionsing : Boolean = false ) :RDD[U]

第一个参数是分区编号(从0开始)

  1. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("R")
  2. val sc: SparkContext = new SparkContext(sparkConf)
  3. val rdd: RDD[String] = sc.textFile("datas/3.txt")
  4. val mapRDD: RDD[String] = rdd.mapPartitionsWithIndex(
  5. (index, iter) => {
  6. if (index == 0) {
  7. iter
  8. } else {
  9. Nil.iterator
  10. }
  11. })
  12. mapRDD.collect().foreach(println)
  13. sc.stop()

4.flatMap()

函数签名 def flatMap( f: T => TraversableOnce[U] ) : RDD[U]

泛型T 是类泛型

先map再flatten

  1. val rdd: RDD[List[Int]] = sc.makeRDD(List(
  2. List(1,2),List(3,4)
  3. ))
  4. val rddMap: RDD[Int] = rdd.flatMap(arr => arr)
  5. rddMap.collect().foreach(println)

模式匹配

  1. val rdd: RDD[Any] = sc.makeRDD(List(
  2. List(1, 2), 3, List(4, 5)
  3. ))
  4. val rddMap: RDD[Any] = rdd.flatMap(
  5. item => {
  6. item match {
  7. case list: List[_] => list
  8. case i => List(i) //将不是集合的元素转成集合
  9. }
  10. }
  11. )
  12. rddMap.collect().foreach(println)

6.groupBy()

函数签名 def groupByK(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

作用 遍历集合的元素,将返回值当作key 相同key为一组 最终返回一个元组

  1. val rdd: RDD[String] = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"))
  2. val groupRDD: RDD[(Char, Iterable[String])] = rdd.groupBy(item => item.charAt(0))
  3. groupRDD.collect().foreach(println)

image.png

重新分区(shuffle)

  1. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,567,8))
  2. val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(item => item % 2)

image.png

例子
统计每个时间段的访问次数

  1. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("R5")
  2. val sc: SparkContext = new SparkContext(sparkConf)
  3. //83.149.9.216 - - 17/05/2015:10:05:03
  4. val rdd: RDD[String] = sc.textFile("datas/apache.log")
  5. val mapRDD: RDD[(String, Int)] = rdd.map(line => {
  6. val datas: Array[String] = line.split(" ")
  7. val format: SimpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  8. val date: Date = format.parse(datas(3))
  9. val format2: SimpleDateFormat = new SimpleDateFormat("HH")
  10. val str: String = format2.format(date)
  11. (str, 1)
  12. })
  13. val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupBy(item => item._1)
  14. val resRDD: RDD[(String, Int)] = groupRDD.map(item => {
  15. (item._1, item._2.size)
  16. })
  17. resRDD.collect().foreach(println)
  18. sc.stop()

7.filter()

函数签名 def filter( f : T => Boolean ) : RDD[T]

作用: 过滤集合中元素,false过滤

注意: 此函数可能会出现 数据倾斜,也就是分区间数据量不平衡

例子
筛选出是 2015/05/17 的 访问记录

  1. val filterRDD: RDD[String] = rdd.filter(
  2. item => {
  3. val line: Array[String] = item.split(" ")
  4. line(3).startsWith("17/05/2015")
  5. }
  6. )
  7. val value: RDD[(String, String)] = filterRDD.map(item => {
  8. val strings: Array[String] = item.split(" ")
  9. (strings(3), strings(6))
  10. })
  11. value.collect().foreach(println)

5.glom()

函数签名 def glom() :RDD[Array[T]

作用: 将一维数据转成二维数组