第一章.Transformation转换算子(Key-Value类型)

1.partitionBy

  1. package com.atguigu.spark.day04
  2. import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
  3. import org.junit.Test
  4. class $01_Transformation extends Serializable {
  5. //序列化忽略
  6. @transient
  7. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  8. /**
  9. * partitionBy(分区器):按照指定分区器shuffle重分区
  10. *
  11. */
  12. @Test
  13. def partitionBy():Unit={
  14. val rdd = sc.parallelize(List(("aa",10),"bb"->20,"cc"->30,"ac"->30,"ec"->40,"cf"->50,"ad"->60))
  15. rdd.mapPartitionsWithIndex((index,it)=>{
  16. println(s"index=${index} data=${it.toList}")
  17. it
  18. }).collect()
  19. val rdd2 = rdd.partitionBy(new HashPartitioner(3))
  20. rdd2.mapPartitionsWithIndex((index,it)=>{
  21. println(s"rdd2 index=${index} data=${it.toList}")
  22. it
  23. }).collect()
  24. val rdd3 = rdd.groupBy(x=>x._1,p= new MyPartitioner(4))
  25. rdd3.mapPartitionsWithIndex((index,it)=>{
  26. println(s"rdd3 index=${index} data=${it.toList}")
  27. it
  28. }).collect()
  29. }
  30. /**
  31. * 自定义分区器
  32. * 1.创建class继承Partitioner
  33. * 2.重写抽象方法
  34. * 3.使用
  35. */
  36. class MyPartitioner(num:Int) extends Partitioner{
  37. //获取分区数
  38. override def numPartitions: Int = if(num<4) 4 else this.num
  39. //根据key获取分区号
  40. override def getPartition(key: Any): Int = key match {
  41. case null => 0
  42. case "aa" => 1
  43. case "ac" => 2
  44. case "ec" => 3
  45. case _ => 0
  46. }
  47. }
  48. }

2.groupByKey

  1. /**
  2. * groupByKey:按照key分组
  3. * groupByKey结果RDD[(K,V)],K代表原RDD中key,V代表在原RDD中对应的所有的value值的集合
  4. */
  5. @Test
  6. def groupByKey()={
  7. val rdd1 = sc.parallelize(List("aa"->1,"aa"->10,"cc"->20,"cd"->40,"aa"->50,"cc"->60,"cd"->100))
  8. val rdd2 = rdd1.groupByKey(3)
  9. println(rdd1.getNumPartitions)
  10. println(rdd2.getNumPartitions)
  11. //实现groupByKey
  12. val rdd3 = rdd1.groupBy(_._1)
  13. println(rdd2.collect().toList)
  14. println(rdd3.collect().toList)
  15. }

3.reduceByKey

  1. /**
  2. * reduceByKey(),按照key分组,之后针对每个组的所有value值聚合
  3. * reduceByKey与groupByKey区别:
  4. * groupByKey只是单纯的分组,没有combiner操作
  5. * reduceByKey是分组+聚合,有combiner操作,整体性能上比groupByKey要高
  6. */
  7. @Test
  8. def reduceByKey():Unit={
  9. val rdd1 = sc.parallelize(List("aa"->1,"aa"->10,"cc"->20,"cd"->40,"aa"->50,"cc"->60,"cd"->100),2)
  10. rdd1.mapPartitionsWithIndex((index,it)=>{
  11. println(s"index=${index} data=${it.toList}")
  12. it
  13. }).collect()
  14. val rdd2 = rdd1.reduceByKey((agg, curr) => {
  15. println(s"agg=${agg} curr=${curr}")
  16. agg + curr
  17. })
  18. println(rdd2.collect().toList)
  19. }

图解:

$04[SparkCore(transformation转换算子)下] - 图1

4.combineByKey

  1. /**
  2. * combineByKey(createCombiner:RDD元素value值类型=>B,mergeValue:(B,RDD元素Value值类型)=>B,mergeCombiners:(B,B)=>B)
  3. * createCombiner:在combiner阶段对每个分区每个组的第一个value值进行转换
  4. * mergeValue:combiner聚合逻辑
  5. * mergeCombiners:reduce聚合逻辑
  6. */
  7. @Test
  8. def combineByKey() = {
  9. val rdd1 = sc.parallelize(List("语文" -> 60, "数学" -> 100, "语文" -> 70, "英语" -> 80, "数学" -> 65, "英语" -> 100,
  10. "英语" -> 70, "语文" -> 90, "数学" -> 100, "英语" -> 100), 2)
  11. //需求:统计学科的平均分
  12. //第一种方案
  13. val rdd2 = rdd1.groupByKey()
  14. val rdd3 = rdd2.map(x => {
  15. val totalScore = x._2.sum
  16. val count = x._2.size
  17. (x._1, totalScore.toDouble / count)
  18. })
  19. println(rdd3.collect().toList)
  20. //第二种方案
  21. val rdd4 = rdd1.map {
  22. case (name, score) => (name, (score, 1))
  23. }
  24. val rdd5 = rdd4.reduceByKey((agg, curr) => {
  25. (agg._1 + curr._1, agg._2 + curr._2)
  26. })
  27. val rdd6 = rdd5.map{
  28. case (name, (totalScore, count)) => (name, totalScore.toDouble / count)
  29. }
  30. println(rdd6.collect().toList)
  31. //第三种方案
  32. val rdd7 = rdd1.combineByKey(
  33. x=>(x,1),(agg:(Int,Int),curr)=>{
  34. (agg._1+curr,agg._2+1)
  35. },(agg:(Int,Int),curr:(Int,Int))=>{
  36. (agg._1+curr._1,agg._2+curr._2)
  37. }
  38. )
  39. val result = rdd7.map{
  40. case (name,(totalScore,count))=>(name,totalScore.toDouble/count)
  41. }.collect()
  42. println(result.toList)
  43. }

图解

$04[SparkCore(transformation转换算子)下] - 图2

$04[SparkCore(transformation转换算子)下] - 图3

5.flodByKey

  1. /**
  2. * flodByKey(默认值)(func:(value值类型,value值类型)=>value值类型)
  3. * flodByKey在combiner阶段对每个组的第一次计算的时候,函数的第一个参数的初始值=
  4. */
  5. @Test
  6. def flodByKey():Unit={
  7. val rdd1 = sc.parallelize(List("语文" -> 60, "数学" -> 100, "语文" -> 70, "英语" -> 80, "数学" -> 65, "英语" -> 100,
  8. "英语" -> 70, "语文" -> 90, "数学" -> 100, "英语" -> 100), 2)
  9. rdd1.mapPartitionsWithIndex((index,it)=>{
  10. println(s"index=${index} data=${it.toList}")
  11. it
  12. }).collect()
  13. println(rdd1.foldByKey(1000)((agg, curr) => {
  14. println(s"agg=${agg} curr=${curr}")
  15. agg + curr
  16. }).collect().toList)
  17. }

图解

$04[SparkCore(transformation转换算子)下] - 图4

6.aggreateByKey

  1. /**
  2. * aggreateByKey(默认值:U)(SeqOp:(U,value值类型)=>U,combOp:(U,U)=>U)
  3. * seqOp:combine计算逻辑
  4. * 在combine阶段对每个组第一次聚合的时候,seqOp函数第一个参数的初始值=默认值
  5. * combOp:reduce计算逻辑
  6. */
  7. @Test
  8. def aggreateByKey():Unit={
  9. val rdd1 = sc.parallelize(List("语文" -> 60, "数学" -> 100, "语文" -> 70, "英语" -> 80, "数学" -> 65, "英语" -> 100,
  10. "英语" -> 70, "语文" -> 90, "数学" -> 100, "英语" -> 100), 2)
  11. rdd1.mapPartitionsWithIndex((index,it)=>{
  12. println(s"index=${index} data=${it.toList}")
  13. it
  14. }).collect()
  15. val rdd2 = rdd1.aggregateByKey((0,0))((agg,curr)=>{
  16. println(s"combiner agg=${agg} curr=${curr}")
  17. (agg._1+curr,agg._2+1)
  18. },
  19. (agg,curr)=>{
  20. println(s"reducer agg=${agg} curr=${curr}")
  21. (agg._1+curr._1,agg._2+curr._2)
  22. }
  23. )
  24. println(rdd2.collect().toList)
  25. }

reduceByKey,foldByKey,conbineByKey,aggregateByKey的区别

  • reduceByKey:reduce与combine计算逻辑一样,在combiner阶段对每个组第一次聚合的时候,函数的第一个参数的初始值=默认值
  • foldByKey:reduce与Combiner计算逻辑一样,在Combiner阶段对每个组第一次聚合的时候,函数的第一个参数的初始值=默认值
  • combineByKey:reduce与combine计算逻辑可以不一样,在Combiner阶段会对第一个value值转换,在combine阶段对每个组第一次聚合的时候,函数的第一个参数的初始值=第一个value值的转换结果
  • aggreateByKey:reduce与combine计算逻辑可以不一样,在combine阶段对每个组第一次聚合时候,函数的第一个参数的初始值=默认值

7.sortByKey

  1. /**
  2. * sortByKey:根据key排序
  3. */
  4. @Test
  5. def sortByKey:Unit={
  6. val rdd1 = sc.parallelize(List("aa"->10,"ac"->20,"cc"->30,"cd"->40,"ad"->50))
  7. val rdd2 = rdd1.sortByKey(false)
  8. //sortByKey实现
  9. rdd1.sortBy(x=>x._1)
  10. println(rdd2.collect().toList)
  11. }

8.mapValues

  1. /**
  2. * mapValues(func:RDDvalue值类型=>B):针对value值转换映射
  3. * mapValues是针对每个元素的value值进行操作
  4. */
  5. @Test
  6. def mapValues():Unit={
  7. val rdd1 = sc.parallelize(List("aa"->10,"ac"->20,"cc"->30,"cd"->40,"ad"->50))
  8. val rdd2 =rdd1.mapValues(x=>x/10)
  9. //等价于 rdd1.map(x=>(x._1,x._2/10))
  10. println(rdd2.collect().toList)
  11. }

9.join

  1. /**
  2. * join:两个RDD中key相同的才能连接上,相当于sql的inner join
  3. */
  4. @Test
  5. def join():Unit={
  6. val rdd1 = sc.parallelize(List("zhangsan"->"1001","lisi"->"1002","wangwu"->"1003","zhaoliu"->"1001","李磊"->"1005"))
  7. val rdd2 = sc.parallelize(List("1001"->"开发部","1002"->"销售部","1003"->"市场部","1004"->"公关部"))
  8. val rdd3 = rdd1.map(x=>(x._2,x._1))
  9. val rdd4 = rdd3.join(rdd2)
  10. val rdd5 = rdd3.leftOuterJoin(rdd2)
  11. val rdd6 = rdd3.rightOuterJoin(rdd2)
  12. val rdd7 = rdd3.fullOuterJoin(rdd2)
  13. //笛卡尔积
  14. val rdd8 = rdd3.cartesian(rdd2)
  15. println(rdd4.collect().toList)
  16. println(rdd5.collect().toList)
  17. println(rdd6.collect().toList)
  18. println(rdd7.collect().toList)
  19. println(rdd8.collect().toList)
  20. }

10.cogroup

  1. /**
  2. * cogroup = groupByKey+fullOuterJoin
  3. */
  4. @Test
  5. def cogroup():Unit={
  6. val rdd1 = sc.parallelize(List("1001"->"zhangsan","1002"->"lilei","1003"->"wangwu","1001"->"zhaoliu","1005"->"lilei"))
  7. val rdd2 = sc.parallelize(List("1001"->"开发部","1002"->"销售部","1003"->"市场部","1004"->"公关部"))
  8. val rdd3 = rdd1.cogroup(rdd2)
  9. println(rdd3.collect().toList)
  10. }