RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value 类型

Value 类型

1) map ➢ 函数签名 def mapU: ClassTag: RDD[U]
小功能:从服务器日志数据 apache.log 中获取用户请求 URL 资源路径

  1. // ➢ 函数说明 将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,
  2. 也可以是值的转换。
  3. // TODO 算子 -map
  4. val rdd = sc.makeRDD(List(1,2,3,4))
  5. // 转换函数
  6. def mapFunction(num:Int): Int = {
  7. num * 2
  8. }
  9. // val mapRDD: RDD[Int] = rdd.map(mapFunction)
  10. // val mapRDD: RDD[Int] = rdd.map((num:Int)=>{num*2})
  11. // val mapRDD: RDD[Int] = rdd.map((num:Int)=>num*2)
  12. // val mapRDD: RDD[Int] = rdd.map((num)=>num*2)
  13. val mapRDD: RDD[Int] = rdd.map(_*2)
  14. mapRDD.collect().foreach(println)
  15. =============================================================================
  16. // 1. rdd的计算一个分区内的数据是一个一个执行逻辑
  17. // 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
  18. // 分区内数据的执行是有序的。
  19. // 2. 不同分区数据计算是无序的。
  20. val rdd = sc.makeRDD(List(1,2,3,4),2)
  21. val mapRDD = rdd.map(
  22. num => {
  23. println(">>>>>>> " + num)
  24. num
  25. }
  26. )
  27. val mapRDD1 = mapRDD.map(
  28. num => {
  29. println("###### " + num)
  30. num
  31. }
  32. )
  33. mapRDD1.collect()
  34. >>>>>>> 3
  35. >>>>>>> 1
  36. ###### 3
  37. ###### 1
  38. >>>>>>> 2
  39. ###### 2
  40. >>>>>>> 4
  41. ###### 4
  42. =============================================================================
  43. // TODO 算子 -map
  44. val rdd = sc.makeRDD(List(1,2,3,4),2)
  45. // [1,2],[3,4]
  46. rdd.saveAsTextFile("output")
  47. val mapRDD = rdd.map(_*2)
  48. // [2,4],[6,8]
  49. mapRDD.saveAsTextFile("output1")
  50. =============================================================================
  51. // TODO 算子 -map
  52. val rdd = sc.textFile("data/apache.log")
  53. // 长的字符串
  54. // 短的字符串
  55. val mapRDD: RDD[String] = rdd.map(
  56. line => {
  57. val datas = line.split(" ")
  58. datas(1)
  59. }
  60. )

2) mapPartitions
➢ 函数签名
def mapPartitionsU: ClassTag: RDD[U]
❖ 小功能:获取每个数据分区的最大值

  1. //➢ 函数说明
  2. 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处
  3. 理,哪怕是过滤数据。
  4. // TODO 算子 - mapPartitions
  5. val rdd = sc.makeRDD(List(1,2,3,4),2)
  6. // mapPartitions : 可以以分区为单位进行数据转换操作
  7. // 但是会将整个分区的数据加载到内存进行引用
  8. // 如果处理完的数据是不会被释放掉,存在对象的引用。
  9. // 在内存较小,数据量较大的场合下,容易出现内存溢出。
  10. val mapRDD:RDD[Int] = rdd.mapPartitions(
  11. iter => {
  12. println(">>>>>")
  13. iter.map(_ * 2)
  14. }
  15. )
  16. mapRDD.collect().foreach(println)

思考一个问题:map 和 mapPartitions 的区别?
➢ 数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。
➢ 功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。
MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
➢ 性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
3) mapPartitionsWithIndex
➢ 函数签名
def mapPartitionsWithIndexU: ClassTag => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

  1. // ➢ 函数说明
  2. 将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处
  3. 理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
  4. 小功能:获取第二个数据分区的数据
  5. // TODO 算子 - mapPartitionsWithIndex
  6. val rdd = sc.makeRDD(List(1,2,3,4),2)
  7. // [1,2],[3,4]
  8. val mapRDD = rdd.mapPartitionsWithIndex(
  9. (index, iter) => {
  10. if (index == 1) {
  11. iter
  12. } else {
  13. Nil.iterator
  14. }
  15. }
  16. )
  17. mapRDD.collect().foreach(println)

4) flatMap
➢ 函数签名
def flatMapU: ClassTag: RDD[U]

  1. // ➢ 函数说明
  2. 将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
  3. // TODO 算子 - flatMap
  4. val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1,2),List(3,4)))
  5. val flatRDD: RDD[Int] = rdd.flatMap(
  6. list => {
  7. list
  8. }
  9. )
  10. =============================================================================
  11. val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))
  12. val flatRDD: RDD[String] = rdd.flatMap(
  13. s => {
  14. s.split(" ")
  15. }
  16. )
  17. =============================================================================
  18. 小功能:将 List(List(1,2),3,List(4,5))进行扁平化操作
  19. val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))
  20. val flatRDD = rdd.flatMap(
  21. data => {
  22. data match {
  23. case list:List[_] => list
  24. case dat => List(dat)
  25. }
  26. }
  27. )

5) glom
➢ 函数签名
def glom(): RDD[Array[T]]

  1. // ➢ 函数说明
  2. 将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
  3. // TODO 算子 - glom
  4. val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
  5. // List => Int
  6. // Int => Array
  7. val glomRDD: RDD[Array[Int]] = rdd.glom()
  8. glomRDD.collect().foreach(data=> println(data.mkString(",")))
  9. =============================================================================
  10. 小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
  11. val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
  12. // 【1,2】,【3,4】
  13. // 【2】,【4】
  14. // 【6】
  15. val glomRDD: RDD[Array[Int]] = rdd.glom()
  16. val maxRDD: RDD[Int] = glomRDD.map(
  17. array => {
  18. array.max
  19. }
  20. )
  21. println(maxRDD.collect().sum)

6) groupBy
➢ 函数签名
def groupByK(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
➢ 函数说明
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

  1. // TODO 算子 - groupBy
  2. val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
  3. // groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
  4. // 相同的key值的数据会放置在一个组中
  5. def groupFunction(num:Int) = {
  6. num % 2
  7. }
  8. val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
  9. groupRDD.collect().foreach(println)
  10. =============================================================================
  11. 小功能:将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
  12. // TODO 算子 - groupBy
  13. val rdd = sc.makeRDD(List("Hello","Spark","Scala","Hadoop"),2)
  14. // 分组和分区没有必然的关系
  15. val groupRDD = rdd.groupBy(_.charAt(0))
  16. groupRDD.collect().foreach(println)
  17. // 结果:
  18. // (H,CompactBuffer(Hello, Hadoop))
  19. // (S,CompactBuffer(Spark, Scala))
  20. =============================================================================
  21. 小功能:从服务器日志数据 apache.log 中获取每个时间段访问量。
  22. // TODO 算子 - groupBy
  23. val rdd = sc.textFile("data/apache.log")
  24. val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
  25. line => {
  26. val datas = line.split(" ")
  27. val time = datas(3)
  28. //time.substring(0, )
  29. val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  30. val date: Date = sdf.parse(time)
  31. val sdf1 = new SimpleDateFormat("HH")
  32. val hour: String = sdf1.format(date)
  33. (hour, 1)
  34. }
  35. ).groupBy(_._1)
  36. timeRDD.map{
  37. case ( hour, iter ) => {
  38. (hour, iter.size)
  39. }
  40. }.collect.foreach(println)

7) filter
➢ 函数签名
def filter(f: T => Boolean): RDD[T]
➢ 函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

  1. // TODO 算子 - filter
  2. val rdd = sc.makeRDD(List(1,2,3,4))
  3. val filterRDD: RDD[Int] = rdd.filter(num=>num%2!=0)
  4. filterRDD.collect().foreach(println)
  5. // 结果:
  6. 1
  7. 3

8) sample
➢ 函数签名
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
➢ 函数说明 根据指定的规则从数据集中抽取数据

  1. val dataRDD = sparkContext.makeRDD(List(1,2,3,4)
  2. // 抽取数据不放回(伯努利算法)
  3. // 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
  4. // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不
  5. // 第一个参数:抽取的数据是否放回,false:不放回
  6. // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
  7. // 第三个参数:随机数种子
  8. val dataRDD1 = dataRDD.sample(false, 0.5)
  9. // 抽取数据放回(泊松算法)
  10. // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
  11. // 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
  12. // 第三个参数:随机数种子
  13. val dataRDD2 = dataRDD.sample(true, 2)
  14. =============================================================================
  15. val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
  16. // sample算子需要传递三个参数
  17. // 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
  18. // 2. 第二个参数表示,
  19. // 如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
  20. // 如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
  21. // 3. 第三个参数表示,抽取数据时随机算法的种子
  22. // 如果不传递第三个参数,那么使用的是当前系统时间
  23. println(rdd.sample(
  24. true,
  25. 2
  26. //1
  27. ).collect().mkString(","))

9) distinct
➢ 函数签名 ➢ 函数说明 将数据集中重复的数据去重
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

  1. // TODO 算子 - distinct
  2. val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))
  3. val rdd1: RDD[Int] = rdd.distinct()
  4. rdd1.collect().foreach(println)

10) coalesce
➢ 函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
➢ 函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率,当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

  1. // TODO 算子 - coalesce
  2. val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)
  3. // coalesce方法默认情况下不会将分区的数据打乱重新组合
  4. // 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
  5. // 如果想要让数据均衡,可以进行shuffle处理 true是进行shuffle处理
  6. // val newRDD:RDD[Int] = rdd.coalesce(2)
  7. val newRDD:RDD[Int] = rdd.coalesce(2,true)
  8. newRDD.saveAsTextFile("output")

11) repartition
➢ 函数签名
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
➢ 函数说明
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。

  1. // TODO 算子 - repartition
  2. val rdd = sc.makeRDD(List(1,2,3,4,5,6),2)
  3. // coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
  4. // 所以如果想要实现扩大分区的效果,需要使用shuffle操作
  5. // spark提供了一个简化的操作
  6. // 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
  7. // 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
  8. // val newRDD:RDD[Int] = rdd.coalesce(3,true)
  9. val newRDD:RDD[Int] = rdd.repartition(3)
  10. newRDD.saveAsTextFile("output")

12) sortBy
➢ 函数签名
def sortByK => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
➢ 函数说明
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

  1. // TODO 算子 - sortBy
  2. val rdd = sc.makeRDD(List(6,2,4,5,3,1), 2)
  3. val newRDD:RDD[Int] = rdd.sortBy(num=>num)
  4. newRDD.saveAsTextFile("output")
  5. =============================================================================
  6. // TODO 算子 - sortBy
  7. val rdd = sc.makeRDD(List(("1",1),("11",2),("2",3)),2)
  8. // sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,
  9. 第二个参数可以改变排序的方式 false降序
  10. // sortBy默认情况下,不会改变分区。但是中间存在shuffle操作
  11. val newRDD = rdd.sortBy(t=>t._1.toInt,false)
  12. newRDD.saveAsTextFile("output")

双Value 类型

13) intersection ➢ 函数说明 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
➢ 函数签名 def intersection(other: RDD[T]): RDD[T]
14) union ➢ 函数说明 对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
➢ 函数签名 def union(other: RDD[T]): RDD[T]
15) subtract ➢ 函数说明 以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集
➢ 函数签名 def subtract(other: RDD[T]): RDD[T]
16) zip ➢ 函数签名 def zipU: ClassTag: RDD[(T, U)]
➢ 函数说明
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素 拉链

  1. // TODO 算子 - 双Value类型
  2. // 交集,并集和差集要求两个数据源数据类型保持一致
  3. // 拉链操作两个数据源的类型可以不一致
  4. val rdd1 = sc.makeRDD(List(1,2,3,4))
  5. val rdd2 = sc.makeRDD(List(3,4,5,6))
  6. val rdd7 = sc.makeRDD(List("3","4","5","6"))
  7. // 交集 : [3,4]
  8. val rdd3: RDD[Int] = rdd1.intersection(rdd2)
  9. // val rdd8 = rdd1.intersection(rdd7)
  10. println(rdd3.collect().mkString(","))
  11. // 并集 : [1,2,3,4,3,4,5,6]
  12. val rdd4: RDD[Int] = rdd1.union(rdd2)
  13. println(rdd4.collect().mkString(","))
  14. // 差集 : [1,2]
  15. val rdd5: RDD[Int] = rdd1.subtract(rdd2)
  16. println(rdd5.collect().mkString(","))
  17. // 拉链 : [1-3,2-4,3-5,4-6]
  18. val rdd6: RDD[(Int,Int)] = rdd1.zip(rdd2)
  19. val rdd8 = rdd1.zip(rdd7)
  20. println(rdd6.collect().mkString(","))
  21. println(rdd8.collect().mkString(","))
  22. =============================================================================
  23. // TODO 算子 - 双Value类型
  24. // Can't zip RDDs with unequal numbers of partitions: List(2, 4)
  25. // 两个数据源要求分区数量要保持一致
  26. // Can only zip RDDs with same number of elements in each partition
  27. // 两个数据源要求分区中数据数量保持一致
  28. val rdd1 = sc.makeRDD(List(1,2,3,4,5,6), 2)
  29. val rdd2 = sc.makeRDD(List(3,4,5,6,7,8), 2)
  30. val rdd6: RDD[(Int,Int)] = rdd1.zip(rdd2)
  31. println(rdd6.collect().mkString(","))

Key - Value 类型

17) partitionBy ➢ 函数说明 将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
➢ 函数签名 def partitionBy(partitioner: Partitioner): RDD[(K, V)]

  1. // TODO 算子 - (Key - Value类型 partitionBy)
  2. val rdd = sc.makeRDD(List(1,2,3,4),2)
  3. val mapRDD:RDD[(Int,Int)] = rdd.map((_,1))
  4. // RDD => PairRDDFunctions
  5. // 隐式转换(二次编译)
  6. // partitionBy根据指定的分区规则对数据进行重分区
  7. val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
  8. newRDD.partitionBy(new HashPartitioner(2))
  9. newRDD.saveAsTextFile("output")

18) reduceByKey ➢ 函数说明 可以将数据按照相同的 Key 对 Value 进行聚合
➢ 函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

  1. // TODO 算子 - (Key - Value类型 reduceByKey)
  2. val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
  3. // reduceByKey : 相同的key的数据进行value数据的聚合操作
  4. // scala语言中一般的聚合操作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合
  5. // 【1,2,3】
  6. // 【3,3】
  7. // 【6】
  8. // reduceByKey中如果key的数据只有一个,是不会参与运算的。
  9. val reduceRDD: RDD[(String,Int)] = rdd.reduceByKey((x:Int,y:Int) => {
  10. println(s"x = ${x},y = ${y}")
  11. x + y
  12. })
  13. reduceRDD.collect().foreach(println)
  14. x = 1,y = 2
  15. x = 3,y = 3
  16. (a,6)
  17. (b,4)

19) groupByKey ➢ 函数说明 将数据源的数据根据 key 对 value 进行分组
➢ 函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

  1. // TODO 算子 - (Key - Value类型 groupByKey)
  2. val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
  3. // groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
  4. // 元组中的第一个元素就是key,
  5. // 元组中的第二个元素就是相同key的value的集合
  6. val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
  7. groupRDD.collect().foreach(println)
  8. val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
  9. groupRDD1.collect().foreach(println)
  10. (a,CompactBuffer(1, 2, 3))
  11. (b,CompactBuffer(4))
  12. (a,CompactBuffer((a,1), (a,2), (a,3)))
  13. (b,CompactBuffer((b,4)))

思考:reduceByKey 和 groupByKey 的区别?
从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较 高。
从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey
20) aggregateByKey ➢ 函数说明 将数据根据不同的规则进行分区内计算和分区间计算
➢ 函数签名
def aggregateByKeyU: ClassTag(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]

  1. // TODO 算子 - (Key - Value类型 aggregateByKey)
  2. val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)),2)
  3. // (a,[1,2]), (a,[3,4])
  4. // (a, 2), (a, 4)
  5. // (a, 6)
  6. // aggregateByKey存在函数柯里化,有两个参数列表
  7. // 第一个参数列表,需要传递一个参数,表示为初始值
  8. // 主要用于当碰见第一个key的时候,和value进行分区内计算
  9. // 第二个参数列表需要传递2个参数
  10. // 第一个参数表示分区内计算规则
  11. // 第二个参数表示分区间计算规则
  12. // math.min(x, y)
  13. // math.max(x, y)
  14. // 取出每个分区内相同 key 的最大值然后分区间相加
  15. rdd.aggregateByKey(0)(
  16. (x,y) => math.max(x,y),
  17. (x,y) => x + y
  18. ).collect().foreach(println)

21) foldByKey ➢ 函数签名 def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
➢ 函数说明 当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey(思考:分区内计算规则和分区间计算规则相同怎么办?)

  1. // TODO 算子 - (Key - Value类型 aggregateByKey)
  2. val rdd = sc.makeRDD(List(
  3. ("a", 1), ("a", 2), ("b", 3),
  4. ("b", 4), ("b", 5), ("a", 6)
  5. ),2)
  6. // (b,12)
  7. // (a,9)
  8. // rdd.aggregateByKey(0)(_+_,_+_).collect().foreach(println)
  9. // 如果聚合计算时,分区内和分区间计算规则相同,spark提供了简化的方法
  10. rdd.foldByKey(0)(_+_).collect.foreach(println)

22) combineByKey
➢ 函数签名
def combineByKeyC => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
➢ 函数说明
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

  1. // TODO 算子 - (Key - Value类型 combineByKey)
  2. val rdd = sc.makeRDD(List(
  3. ("a", 1), ("a", 2), ("b", 3),
  4. ("b", 4), ("b", 5), ("a", 6)
  5. ),2)
  6. // combineByKey : 方法需要三个参数
  7. // 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
  8. // 第二个参数表示:分区内的计算规则
  9. // 第三个参数表示:分区间的计算规则
  10. val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey(
  11. v => (v, 1),
  12. ( t:(Int, Int), v ) => {
  13. (t._1 + v, t._2 + 1)
  14. },
  15. (t1:(Int, Int), t2:(Int, Int)) => {
  16. (t1._1 + t2._1, t1._2 + t2._2)
  17. }
  18. )
  19. val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
  20. case (num, cnt) => {
  21. num / cnt
  22. }
  23. }
  24. resultRDD.collect().foreach(println)

思考:reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?
reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

  1. reduceByKey:
  2. combineByKeyWithClassTag[V](
  3. (v: V) => v, // 第一个值不会参与计算
  4. func, // 分区内计算规则
  5. func, // 分区间计算规则
  6. )
  7. aggregateByKey :
  8. combineByKeyWithClassTag[U](
  9. (v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
  10. cleanedSeqOp, // 分区内计算规则
  11. combOp, // 分区间计算规则
  12. )
  13. foldByKey:
  14. combineByKeyWithClassTag[V](
  15. (v: V) => cleanedFunc(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
  16. cleanedFunc, // 分区内计算规则
  17. cleanedFunc, // 分区间计算规则
  18. )
  19. combineByKey :
  20. combineByKeyWithClassTag(
  21. createCombiner, // 相同key的第一条数据进行的处理函数
  22. mergeValue, // 表示分区内数据的处理函数
  23. mergeCombiners, // 表示分区间数据的处理函数

23) sortByKey ➢ 函数说明 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的
➢ 函数签名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

  1. val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
  2. val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
  3. val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)

24) join
➢ 函数签名 def joinW]): RDD[(K, (V, W))]
➢ 函数说明 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDD

// TODO 算子 - (Key - Value类型  join)
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("c", 3), ("d",1)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 5), ("c", 6),("a", 4)
    ))

    // join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
    //        如果两个数据源中key没有匹配上,那么数据不会出现在结果中
    //        如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。
    val joinRDD: RDD[(String,(Int,Int))] = rdd1.join(rdd2)
    joinRDD.collect().foreach(println)
(a,(1,5))
(a,(1,4))
(a,(2,5))
(a,(2,4))
(c,(3,6))

25) leftOuterJoin
➢ 函数签名 def leftOuterJoinW]): RDD[(K, (V, Option[W]))]
➢ 函数说明 类似于 SQL 语句的左外连接
26) cogroup
➢ 函数签名 def cogroupW]): RDD[(K, (Iterable[V], Iterable[W]))]
➢ 函数说明 在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD
案例实操
1) 数据准备 agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
2) 需求描述 统计出每一个省份每个广告被点击数量排行的 Top3

// TODO 案例实操
    // 1 获取原始数据:时间戳,省份,城市,用户,广告
    val dataRDD = sc.textFile("data/agent.log")

    // 2 将原始数据进行结构转换。方便统计
    // 时间戳,省份,城市,用户,广告
    // =>
    // ((省份,广告),1)
    val mapRDD = dataRDD.map(
      line => {
        val datas = line.split(" ")
        ((datas(1), datas(4)), 1)
      }
    )

    // 3 将转换结构后的数据,进行分组聚合
    // ((省份,广告),1)=> ((省份,广告),sum)
    val reduceRDD: RDD[((String,String),Int)] = mapRDD.reduceByKey(_+_)

    // 4 将聚合的结果进行结构的转换
    // (省份,广告),sum)=>(省份,(广告,sum))
    val newMapRDD = reduceRDD.map {
      case ((prv, ad), sum) => {
        (prv, (ad, sum))
      }
    }

    // 5 将转换结构后的数据根据省份进行分组
    // (省份,[(广告A,sumA), (广告B,sumB)])
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = 
        newMapRDD.groupByKey()

    // 6 将分组后的数据组内排序(降序),取前3名
    val resultRDD = groupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
      }
    )

    // 7 采集数据打印在控制台
    resultRDD.collect().foreach(println)