将rdd转换成(二元元祖)即键值对才能代用该类算子
以下函数要求的类型必须是kv键值对

partitionBy

partitionBy有中的参数为分区器partitioner
要求:
必须是元素必须是kv键值对
image.png
partitioner有两个常用子类:HashPartitioner、RangePartitioner,也可以选择自定义分区器
自定义分区器
1、为什么需要自定义分区器:
例如:使用了默认的HashPartitioner而产生了数据倾斜:
image.png
上图例子是假设,假设所有数据都去到了一号分区
产生了数据倾斜,这时我们需要自定义分区来解决数据倾斜问题
2、编写技巧:模仿HashPartitioner

  1. @Test
  2. def partitionBy()={
  3. val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
  4. val rdd1 = rdd.partitionBy(new HashPartitioner(3))
  5. val rdd2 = rdd1.mapPartitionsWithIndex((index,it)=>{
  6. it
  7. }).collect()
  8. println(rdd2.toList)
  9. }
  10. // 自定义分区
  11. @Test
  12. def myPartition()={
  13. val myP = new myPartitioner(4)
  14. val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
  15. val rdd1 = rdd.partitionBy(myP)
  16. val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {
  17. println(s"${index},${it.toList}")
  18. it
  19. }).collect()
  20. }
  21. }
  22. // 自定义分区器
  23. class myPartitioner(partitions: Int) extends Partitioner{
  24. override def numPartitions: Int = {
  25. // 限制分区数不小于4
  26. if (partitions < 4) {
  27. 4
  28. } else {
  29. partitions
  30. }
  31. }
  32. override def getPartition(key: Any): Int = {
  33. key match {
  34. case "scala" => 1
  35. case "spark" => 2
  36. case _ => 3
  37. }
  38. }

groupByKey

/*
groupByKey(): 根据key进行分组

reduceByKey与groupByKey的区别:
reduceByKey在shuffle阶段的时候有类似MR的combiner预聚合功能。reduceByKey的性能要比groupByKey的性能要高
groupByKey在shuffle阶段没有类似MR的combiner预聚合功能
reduceByKey的性能要比groupByKey的性能要高,工作中一般推荐使用高性能的shuffle算子
/

value值的,以迭代器封装:
groupByKey:
image.png
groupBy():
image.png
//rdd.groupByKey()比rdd.groupBy(x=>x._1)整洁些

  1. @Test
  2. def groupByKey(): Unit ={
  3. val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4,"scala"->1 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
  4. val rdd2 = rdd.groupByKey()
  5. val rdd3 = rdd.groupBy(x=>x._1)
  6. }

reduceByKey

针对每个组的所有value值进行聚合
高性能算子,因为有combiner,减少map阶段输出的数据量
第一种情况
datas=List((hello,1), (spark,1), (hello,1), (python,1), (scala,1), (spark,1))
map阶段
ByKey
按键分组,值为一个集合
hello -> List(1,1)
spark-> List(1,1)
python-> List(1)
scala-> List(1)
combine:
聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素
第一次计算: agg = 组中第一个vlaue值= 1 curr=1 agg+curr=2
0号分区combiner结果: hello->2,spark->2,python->1,scala->1
1号分区combiner结果: kafka->2,spark->1,flume->1,scala->1

reduce阶段:
shuffle
ByKey: 同上
combine: 同上

  1. @Test
  2. def reduceByKey(): Unit ={
  3. val rdd = sc.textFile("datas/wc.txt")
  4. val rdd2 = rdd.flatMap(_.split(" "))
  5. //val rdd3 = rdd2.groupBy(x=>x)
  6. // val rdd4 = rdd3.map(x=>(x._1, x._2.size))
  7. val rdd3 = rdd2.map(x=>(x,1))
  8. //index:0 datas=List((hello,1), (spark,1), (hello,1), (python,1), (scala,1), (spark,1))
  9. // combiner
  10. // 分组:
  11. // hello -> List(1,1)
  12. // 聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素
  13. // 第一次计算: agg = 组中第一个vlaue值= 1 curr=1 agg+curr=2
  14. // spark-> List(1,1)
  15. // 聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素
  16. // 第一次计算: agg = 组中第一个vlaue值= 1 curr=1 agg+curr=2
  17. // python-> List(1)
  18. // 聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素
  19. // scala-> List(1)
  20. // 0号分区combiner结果: hello->2,spark->2,python->1,scala->1
  21. //index:1 datas=List((spark,1), (kafka,1), (flume,1), (flume,1), (kafka,1), (scala,1))
  22. // combiner
  23. // 分组:
  24. // spark -> List(1)
  25. // kafka-> List(1,1)
  26. // flume-> List(1,1)
  27. // scala-> List(1)
  28. // 1号分区combiner结果: kafka->2,spark->1,flume->1,scala->1
  29. rdd3.mapPartitionsWithIndex((index,it)=>{
  30. println(s"index:${index} datas=${it.toList}")
  31. it
  32. }).collect()
  33. val rdd4 = rdd3.reduceByKey((agg,curr)=>{
  34. println(s"agg=${agg} curr=${curr}")
  35. agg+curr
  36. })
  37. println(rdd4.collect().toList)
  38. }

第二种情况
需要处理数据格式为:(x._1,(x._2,1))

  1. def reduceByKey(): Unit ={
  2. val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)
  3. val rdd4 = rdd.map( x=> (x._1,(x._2,1)))// 这一步是必要的,不然只能用combineByKey方法
  4. val rdd5 = rdd4.reduceByKey( (agg,curr) => ( agg._1+curr._1,agg._2+curr._2 ) )
  5. val rdd6 = rdd5.map(x=>(x._1,x._2._1/x._2._2))
  6. println(rdd6.collect().toList)

reduceByKey.png

combineByKey

  1. /**
  2. * combineByKey(createCombiner,mergeValue,mergeCombiners)
  3. * createCombiner: 在combine阶段对每个组的第一个value值进行转换
  4. * mergeValue: combiner的计算逻辑
  5. * mergeCombiners: reduce的计算逻辑
  6. */
  7. @Test
  8. def combineByKey(): Unit ={
  9. val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)
  10. //需求: 获取没门学科的平均分 【sum,count】
  11. /* val rdd2 = rdd.groupByKey()
  12. val rdd3 = rdd2.map(x=>(x._1,x._2.sum/x._2.size))
  13. println(rdd3.collect().toList)*/
  14. rdd.mapPartitionsWithIndex((index,it)=>{
  15. println(s"index:${index} datas=${it.toList}")
  16. it
  17. }).collect()
  18. val rdd2 = rdd.combineByKey( score=> (score,1), (agg:(Int,Int),curr:Int) => {
  19. println(s"combiner: agg=${agg} curr=${curr}")
  20. (agg._1+curr, agg._2 +1)
  21. }, (agg:(Int,Int),curr:(Int,Int) ) =>{
  22. println(s"reduce: agg=${agg} curr=${curr}")
  23. (agg._1+curr._1,agg._2+curr._2)
  24. } )
  25. val rdd4 = rdd.map( x=> (x._1,(x._2,1)))
  26. val rdd5 = rdd4.reduceByKey( (agg,curr) => ( agg._1+curr._1,agg._2+curr._2 ) )
  27. val rdd3 = rdd2.map(x=>(x._1,x._2._1/x._2._2))
  28. val rdd6 = rdd5.map(x=>(x._1,x._2._1/x._2._2))
  29. println(rdd3.collect().toList)
  30. println(rdd6.collect().toList)
  31. }

详细流程:
combineByKey.png

foldByKey

/*
foldByKey(初始值: value值类型)(func: (value值类型,value值类型)=>value值类型 ): 根据key分组聚合
*/

  1. @Test
  2. def foldByKey(): Unit ={
  3. val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)
  4. val rdd2 = rdd.map{
  5. case (name,score) => (name,(score,1))
  6. }
  7. rdd2.mapPartitionsWithIndex((index,it)=>{
  8. println(s"index:${index} datas=${it.toList}")
  9. it
  10. }).collect()
  11. rdd2.foldByKey((0,0))( (agg,curr)=> {
  12. (agg._1+curr._1,agg._2+curr._2)
  13. }).collect()
  14. }

foldByKey.png

aggregateByKey

/*
aggregateByKey(初始值: B) (seqOp: ( B,value值类型) => B , comOp: ( B,B)=>B )
seqOp: 是combiner计算逻辑
combiner对每个组第一次计算的时候第一个参数的值 = 初始值
comOp: 是reduce计算逻辑
/

  1. @Test
  2. def aggregateByKey(): Unit ={
  3. val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)
  4. val rdd2 = rdd.aggregateByKey((0,0))(seqOp = (agg:(Int,Int),curr:Int) => ( agg._1+curr,agg._2+1 ),combOp = (agg:(Int,Int),curr:(Int,Int))=> (agg._1+curr._1,agg._2+curr._2))
  5. println(rdd2.collect().toList)
  6. }

aggregateByKey.png

4byKey总结:

reduceByKey、foldByKey、combineByKey、aggregateByKey的区别:

reduceByKey的combiner与reduce的计算逻辑一样的,都是reduceByKey传入的函数
foldByKey的combiner与reduce的计算逻辑一样的,都是foldByKey传入的函数.foldByKey在combiner阶段对每个组第一次计算的时候,函数第一个参数的值=初始值
combineByKey的combiner与reduce的计算逻辑可以不一样
aggregateByKey的combiner与reduce的计算逻辑可以不一样,aggregateByKey在combiner阶段对每个组第一次计算的时候,函数第一个参数的值=初始值

sortByKey

/*
sortByKey: 根据元素的key进行排序,效果和sortBy(x=> x._1)一模一样,记第二个即可
参数:
false:降序
true:升序
*/

  1. @Test
  2. def sortByKey(): Unit ={
  3. val rdd = sc.parallelize(List("aa"->10,"dd"->20,"pp"->30,"cc"->40))
  4. val rdd2 = rdd.sortByKey(false)
  5. val rdd3 = rdd.sortBy(x=> x._1)
  6. println(rdd2.collect().toList)
  7. }

mapValues

/*
mapValues(func: value值类型=>B )
mapValue:在不改变key的情况下对key进行map操作,最后返回key不变的键值对
/

  1. @Test
  2. def mapVlaues(): Unit ={
  3. val rdd = sc.parallelize(List("aa"->10,"dd"->20,"pp"->30,"cc"->40))
  4. //rdd2与rdd4效果一致
  5. val rdd2 = rdd.mapValues(x=> x/10)
  6. val rdd4 = rdd.map(x=> (x._1,x._2/10))
  7. println(rdd2.collect().toList)
  8. }

join

根据key连接
/*
rdd1.join(rdd2): 两个rdd只有相同的key才能join
join的结果类型RDD[(join的key,( rdd1 key对应的vlaue值, rdd2 key对应的value值 ))
/

  1. @Test
  2. def join(): Unit ={
  3. val rdd1 = sc.parallelize(List("aa"->101,"dd"->201,"pp"->301,"cc"->401))
  4. val rdd2 = sc.parallelize(List("aa"->202,"aa"->808,"dd"->303,"ww"->404,"oo"->505))
  5. //两个rdd的key相同的时候才能连上
  6. val rdd3 = rdd1.join(rdd2)
  7. val rdd4 = rdd1.leftOuterJoin(rdd2)
  8. val rdd5 = rdd1.rightOuterJoin(rdd2)
  9. val rdd6 = rdd1.fullOuterJoin(rdd2)
  10. println(rdd3.collect().toList)
  11. println(rdd4.collect().toList)
  12. println(rdd6.collect().toList)
  13. }

cogroup

/*
cogroup生成的新的RDD[(key,(key对应左RDD所有value值,key对应右RDD所有的value值))]
类似于:全连接+分组
*/

  1. /**
  2. * cogroup生成的新的RDD[(key,(key对应左RDD所有value值,key对应右RDD所有的value值))]
  3. */
  4. @Test
  5. def cogroup(): Unit ={
  6. val rdd1 = sc.parallelize(List("aa"->101,"dd"->201,"pp"->301,"cc"->401))
  7. val rdd2 = sc.parallelize(List("aa"->202,"aa"->808,"dd"->303,"ww"->404,"oo"->505))
  8. val rdd3 = rdd1.cogroup(rdd2)
  9. println(rdd3.collect().toList)
  10. }

效果:
image.png