将rdd转换成(二元元祖)即键值对才能代用该类算子
以下函数要求的类型必须是kv键值对
partitionBy
partitionBy有中的参数为分区器partitioner
要求:
必须是元素必须是kv键值对
partitioner有两个常用子类:HashPartitioner、RangePartitioner,也可以选择自定义分区器
自定义分区器
1、为什么需要自定义分区器:
例如:使用了默认的HashPartitioner而产生了数据倾斜:
上图例子是假设,假设所有数据都去到了一号分区
产生了数据倾斜,这时我们需要自定义分区来解决数据倾斜问题
2、编写技巧:模仿HashPartitioner
@Testdef partitionBy()={val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))val rdd1 = rdd.partitionBy(new HashPartitioner(3))val rdd2 = rdd1.mapPartitionsWithIndex((index,it)=>{it}).collect()println(rdd2.toList)}// 自定义分区@Testdef myPartition()={val myP = new myPartitioner(4)val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))val rdd1 = rdd.partitionBy(myP)val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {println(s"${index},${it.toList}")it}).collect()}}// 自定义分区器class myPartitioner(partitions: Int) extends Partitioner{override def numPartitions: Int = {// 限制分区数不小于4if (partitions < 4) {4} else {partitions}}override def getPartition(key: Any): Int = {key match {case "scala" => 1case "spark" => 2case _ => 3}}
groupByKey
/*
groupByKey(): 根据key进行分组
reduceByKey与groupByKey的区别:
reduceByKey在shuffle阶段的时候有类似MR的combiner预聚合功能。reduceByKey的性能要比groupByKey的性能要高
groupByKey在shuffle阶段没有类似MR的combiner预聚合功能
reduceByKey的性能要比groupByKey的性能要高,工作中一般推荐使用高性能的shuffle算子
/
value值的,以迭代器封装:
groupByKey:
groupBy():
//rdd.groupByKey()比rdd.groupBy(x=>x._1)整洁些
@Testdef groupByKey(): Unit ={val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4,"scala"->1 ,"java"->1,"java"->1,"spark"->1,"scala"->1))val rdd2 = rdd.groupByKey()val rdd3 = rdd.groupBy(x=>x._1)}
reduceByKey
针对每个组的所有value值进行聚合
高性能算子,因为有combiner,减少map阶段输出的数据量
第一种情况
datas=List((hello,1), (spark,1), (hello,1), (python,1), (scala,1), (spark,1))
map阶段
ByKey
按键分组,值为一个集合
hello -> List(1,1)
spark-> List(1,1)
python-> List(1)
scala-> List(1)
combine:
聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素
第一次计算: agg = 组中第一个vlaue值= 1 curr=1 agg+curr=2
0号分区combiner结果: hello->2,spark->2,python->1,scala->1
1号分区combiner结果: kafka->2,spark->1,flume->1,scala->1
reduce阶段:
shuffle
ByKey: 同上
combine: 同上
@Testdef reduceByKey(): Unit ={val rdd = sc.textFile("datas/wc.txt")val rdd2 = rdd.flatMap(_.split(" "))//val rdd3 = rdd2.groupBy(x=>x)// val rdd4 = rdd3.map(x=>(x._1, x._2.size))val rdd3 = rdd2.map(x=>(x,1))//index:0 datas=List((hello,1), (spark,1), (hello,1), (python,1), (scala,1), (spark,1))// combiner// 分组:// hello -> List(1,1)// 聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素// 第一次计算: agg = 组中第一个vlaue值= 1 curr=1 agg+curr=2// spark-> List(1,1)// 聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素// 第一次计算: agg = 组中第一个vlaue值= 1 curr=1 agg+curr=2// python-> List(1)// 聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素// scala-> List(1)// 0号分区combiner结果: hello->2,spark->2,python->1,scala->1//index:1 datas=List((spark,1), (kafka,1), (flume,1), (flume,1), (kafka,1), (scala,1))// combiner// 分组:// spark -> List(1)// kafka-> List(1,1)// flume-> List(1,1)// scala-> List(1)// 1号分区combiner结果: kafka->2,spark->1,flume->1,scala->1rdd3.mapPartitionsWithIndex((index,it)=>{println(s"index:${index} datas=${it.toList}")it}).collect()val rdd4 = rdd3.reduceByKey((agg,curr)=>{println(s"agg=${agg} curr=${curr}")agg+curr})println(rdd4.collect().toList)}
第二种情况
需要处理数据格式为:(x._1,(x._2,1))
def reduceByKey(): Unit ={val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)val rdd4 = rdd.map( x=> (x._1,(x._2,1)))// 这一步是必要的,不然只能用combineByKey方法val rdd5 = rdd4.reduceByKey( (agg,curr) => ( agg._1+curr._1,agg._2+curr._2 ) )val rdd6 = rdd5.map(x=>(x._1,x._2._1/x._2._2))println(rdd6.collect().toList)
combineByKey
/*** combineByKey(createCombiner,mergeValue,mergeCombiners)* createCombiner: 在combine阶段对每个组的第一个value值进行转换* mergeValue: combiner的计算逻辑* mergeCombiners: reduce的计算逻辑*/@Testdef combineByKey(): Unit ={val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)//需求: 获取没门学科的平均分 【sum,count】/* val rdd2 = rdd.groupByKey()val rdd3 = rdd2.map(x=>(x._1,x._2.sum/x._2.size))println(rdd3.collect().toList)*/rdd.mapPartitionsWithIndex((index,it)=>{println(s"index:${index} datas=${it.toList}")it}).collect()val rdd2 = rdd.combineByKey( score=> (score,1), (agg:(Int,Int),curr:Int) => {println(s"combiner: agg=${agg} curr=${curr}")(agg._1+curr, agg._2 +1)}, (agg:(Int,Int),curr:(Int,Int) ) =>{println(s"reduce: agg=${agg} curr=${curr}")(agg._1+curr._1,agg._2+curr._2)} )val rdd4 = rdd.map( x=> (x._1,(x._2,1)))val rdd5 = rdd4.reduceByKey( (agg,curr) => ( agg._1+curr._1,agg._2+curr._2 ) )val rdd3 = rdd2.map(x=>(x._1,x._2._1/x._2._2))val rdd6 = rdd5.map(x=>(x._1,x._2._1/x._2._2))println(rdd3.collect().toList)println(rdd6.collect().toList)}
foldByKey
/*
foldByKey(初始值: value值类型)(func: (value值类型,value值类型)=>value值类型 ): 根据key分组聚合
*/
@Testdef foldByKey(): Unit ={val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)val rdd2 = rdd.map{case (name,score) => (name,(score,1))}rdd2.mapPartitionsWithIndex((index,it)=>{println(s"index:${index} datas=${it.toList}")it}).collect()rdd2.foldByKey((0,0))( (agg,curr)=> {(agg._1+curr._1,agg._2+curr._2)}).collect()}
aggregateByKey
/*
aggregateByKey(初始值: B) (seqOp: ( B,value值类型) => B , comOp: ( B,B)=>B )
seqOp: 是combiner计算逻辑
combiner对每个组第一次计算的时候第一个参数的值 = 初始值
comOp: 是reduce计算逻辑
/
@Testdef aggregateByKey(): Unit ={val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)val rdd2 = rdd.aggregateByKey((0,0))(seqOp = (agg:(Int,Int),curr:Int) => ( agg._1+curr,agg._2+1 ),combOp = (agg:(Int,Int),curr:(Int,Int))=> (agg._1+curr._1,agg._2+curr._2))println(rdd2.collect().toList)}
4byKey总结:
reduceByKey、foldByKey、combineByKey、aggregateByKey的区别:
| reduceByKey的combiner与reduce的计算逻辑一样的,都是reduceByKey传入的函数 |
|---|
| foldByKey的combiner与reduce的计算逻辑一样的,都是foldByKey传入的函数.foldByKey在combiner阶段对每个组第一次计算的时候,函数第一个参数的值=初始值 |
| combineByKey的combiner与reduce的计算逻辑可以不一样 |
| aggregateByKey的combiner与reduce的计算逻辑可以不一样,aggregateByKey在combiner阶段对每个组第一次计算的时候,函数第一个参数的值=初始值 |
sortByKey
/*
sortByKey: 根据元素的key进行排序,效果和sortBy(x=> x._1)一模一样,记第二个即可
参数:
false:降序
true:升序
*/
@Testdef sortByKey(): Unit ={val rdd = sc.parallelize(List("aa"->10,"dd"->20,"pp"->30,"cc"->40))val rdd2 = rdd.sortByKey(false)val rdd3 = rdd.sortBy(x=> x._1)println(rdd2.collect().toList)}
mapValues
/*
mapValues(func: value值类型=>B )
mapValue:在不改变key的情况下对key进行map操作,最后返回key不变的键值对
/
@Testdef mapVlaues(): Unit ={val rdd = sc.parallelize(List("aa"->10,"dd"->20,"pp"->30,"cc"->40))//rdd2与rdd4效果一致val rdd2 = rdd.mapValues(x=> x/10)val rdd4 = rdd.map(x=> (x._1,x._2/10))println(rdd2.collect().toList)}
join
根据key连接
/*
rdd1.join(rdd2): 两个rdd只有相同的key才能join
join的结果类型RDD[(join的key,( rdd1 key对应的vlaue值, rdd2 key对应的value值 ))
/
@Testdef join(): Unit ={val rdd1 = sc.parallelize(List("aa"->101,"dd"->201,"pp"->301,"cc"->401))val rdd2 = sc.parallelize(List("aa"->202,"aa"->808,"dd"->303,"ww"->404,"oo"->505))//两个rdd的key相同的时候才能连上val rdd3 = rdd1.join(rdd2)val rdd4 = rdd1.leftOuterJoin(rdd2)val rdd5 = rdd1.rightOuterJoin(rdd2)val rdd6 = rdd1.fullOuterJoin(rdd2)println(rdd3.collect().toList)println(rdd4.collect().toList)println(rdd6.collect().toList)}
cogroup
/*
cogroup生成的新的RDD[(key,(key对应左RDD所有value值,key对应右RDD所有的value值))]
类似于:全连接+分组
*/
/*** cogroup生成的新的RDD[(key,(key对应左RDD所有value值,key对应右RDD所有的value值))]*/@Testdef cogroup(): Unit ={val rdd1 = sc.parallelize(List("aa"->101,"dd"->201,"pp"->301,"cc"->401))val rdd2 = sc.parallelize(List("aa"->202,"aa"->808,"dd"->303,"ww"->404,"oo"->505))val rdd3 = rdd1.cogroup(rdd2)println(rdd3.collect().toList)}
效果:



