将rdd转换成(二元元祖)即键值对才能代用该类算子
以下函数要求的类型必须是kv键值对
partitionBy
partitionBy有中的参数为分区器partitioner
要求:
必须是元素必须是kv键值对
partitioner有两个常用子类:HashPartitioner、RangePartitioner,也可以选择自定义分区器
自定义分区器
1、为什么需要自定义分区器:
例如:使用了默认的HashPartitioner而产生了数据倾斜:
上图例子是假设,假设所有数据都去到了一号分区
产生了数据倾斜,这时我们需要自定义分区来解决数据倾斜问题
2、编写技巧:模仿HashPartitioner
@Test
def partitionBy()={
val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
val rdd1 = rdd.partitionBy(new HashPartitioner(3))
val rdd2 = rdd1.mapPartitionsWithIndex((index,it)=>{
it
}).collect()
println(rdd2.toList)
}
// 自定义分区
@Test
def myPartition()={
val myP = new myPartitioner(4)
val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
val rdd1 = rdd.partitionBy(myP)
val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {
println(s"${index},${it.toList}")
it
}).collect()
}
}
// 自定义分区器
class myPartitioner(partitions: Int) extends Partitioner{
override def numPartitions: Int = {
// 限制分区数不小于4
if (partitions < 4) {
4
} else {
partitions
}
}
override def getPartition(key: Any): Int = {
key match {
case "scala" => 1
case "spark" => 2
case _ => 3
}
}
groupByKey
/*
groupByKey(): 根据key进行分组
reduceByKey与groupByKey的区别:
reduceByKey在shuffle阶段的时候有类似MR的combiner预聚合功能。reduceByKey的性能要比groupByKey的性能要高
groupByKey在shuffle阶段没有类似MR的combiner预聚合功能
reduceByKey的性能要比groupByKey的性能要高,工作中一般推荐使用高性能的shuffle算子
/
value值的,以迭代器封装:
groupByKey:
groupBy():
//rdd.groupByKey()比rdd.groupBy(x=>x._1)整洁些
@Test
def groupByKey(): Unit ={
val rdd = sc.parallelize(List( ("spark",2),("scala",4),("spark",1),"scala"->5,"java"->4,"scala"->1 ,"java"->1,"java"->1,"spark"->1,"scala"->1))
val rdd2 = rdd.groupByKey()
val rdd3 = rdd.groupBy(x=>x._1)
}
reduceByKey
针对每个组的所有value值进行聚合
高性能算子,因为有combiner,减少map阶段输出的数据量
第一种情况
datas=List((hello,1), (spark,1), (hello,1), (python,1), (scala,1), (spark,1))
map阶段
ByKey
按键分组,值为一个集合
hello -> List(1,1)
spark-> List(1,1)
python-> List(1)
scala-> List(1)
combine:
聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素
第一次计算: agg = 组中第一个vlaue值= 1 curr=1 agg+curr=2
0号分区combiner结果: hello->2,spark->2,python->1,scala->1
1号分区combiner结果: kafka->2,spark->1,flume->1,scala->1
reduce阶段:
shuffle
ByKey: 同上
combine: 同上
@Test
def reduceByKey(): Unit ={
val rdd = sc.textFile("datas/wc.txt")
val rdd2 = rdd.flatMap(_.split(" "))
//val rdd3 = rdd2.groupBy(x=>x)
// val rdd4 = rdd3.map(x=>(x._1, x._2.size))
val rdd3 = rdd2.map(x=>(x,1))
//index:0 datas=List((hello,1), (spark,1), (hello,1), (python,1), (scala,1), (spark,1))
// combiner
// 分组:
// hello -> List(1,1)
// 聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素
// 第一次计算: agg = 组中第一个vlaue值= 1 curr=1 agg+curr=2
// spark-> List(1,1)
// 聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素
// 第一次计算: agg = 组中第一个vlaue值= 1 curr=1 agg+curr=2
// python-> List(1)
// 聚合: (agg,curr)=>aggg+curr agg是上一次的聚合结果 curr是当前要聚合的元素
// scala-> List(1)
// 0号分区combiner结果: hello->2,spark->2,python->1,scala->1
//index:1 datas=List((spark,1), (kafka,1), (flume,1), (flume,1), (kafka,1), (scala,1))
// combiner
// 分组:
// spark -> List(1)
// kafka-> List(1,1)
// flume-> List(1,1)
// scala-> List(1)
// 1号分区combiner结果: kafka->2,spark->1,flume->1,scala->1
rdd3.mapPartitionsWithIndex((index,it)=>{
println(s"index:${index} datas=${it.toList}")
it
}).collect()
val rdd4 = rdd3.reduceByKey((agg,curr)=>{
println(s"agg=${agg} curr=${curr}")
agg+curr
})
println(rdd4.collect().toList)
}
第二种情况
需要处理数据格式为:(x._1,(x._2,1))
def reduceByKey(): Unit ={
val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)
val rdd4 = rdd.map( x=> (x._1,(x._2,1)))// 这一步是必要的,不然只能用combineByKey方法
val rdd5 = rdd4.reduceByKey( (agg,curr) => ( agg._1+curr._1,agg._2+curr._2 ) )
val rdd6 = rdd5.map(x=>(x._1,x._2._1/x._2._2))
println(rdd6.collect().toList)
combineByKey
/**
* combineByKey(createCombiner,mergeValue,mergeCombiners)
* createCombiner: 在combine阶段对每个组的第一个value值进行转换
* mergeValue: combiner的计算逻辑
* mergeCombiners: reduce的计算逻辑
*/
@Test
def combineByKey(): Unit ={
val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)
//需求: 获取没门学科的平均分 【sum,count】
/* val rdd2 = rdd.groupByKey()
val rdd3 = rdd2.map(x=>(x._1,x._2.sum/x._2.size))
println(rdd3.collect().toList)*/
rdd.mapPartitionsWithIndex((index,it)=>{
println(s"index:${index} datas=${it.toList}")
it
}).collect()
val rdd2 = rdd.combineByKey( score=> (score,1), (agg:(Int,Int),curr:Int) => {
println(s"combiner: agg=${agg} curr=${curr}")
(agg._1+curr, agg._2 +1)
}, (agg:(Int,Int),curr:(Int,Int) ) =>{
println(s"reduce: agg=${agg} curr=${curr}")
(agg._1+curr._1,agg._2+curr._2)
} )
val rdd4 = rdd.map( x=> (x._1,(x._2,1)))
val rdd5 = rdd4.reduceByKey( (agg,curr) => ( agg._1+curr._1,agg._2+curr._2 ) )
val rdd3 = rdd2.map(x=>(x._1,x._2._1/x._2._2))
val rdd6 = rdd5.map(x=>(x._1,x._2._1/x._2._2))
println(rdd3.collect().toList)
println(rdd6.collect().toList)
}
foldByKey
/*
foldByKey(初始值: value值类型)(func: (value值类型,value值类型)=>value值类型 ): 根据key分组聚合
*/
@Test
def foldByKey(): Unit ={
val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)
val rdd2 = rdd.map{
case (name,score) => (name,(score,1))
}
rdd2.mapPartitionsWithIndex((index,it)=>{
println(s"index:${index} datas=${it.toList}")
it
}).collect()
rdd2.foldByKey((0,0))( (agg,curr)=> {
(agg._1+curr._1,agg._2+curr._2)
}).collect()
}
aggregateByKey
/*
aggregateByKey(初始值: B) (seqOp: ( B,value值类型) => B , comOp: ( B,B)=>B )
seqOp: 是combiner计算逻辑
combiner对每个组第一次计算的时候第一个参数的值 = 初始值
comOp: 是reduce计算逻辑
/
@Test
def aggregateByKey(): Unit ={
val rdd = sc.parallelize(List( "语文"->60,"英语"->80,"数学"->100 ,"数学"->60,"数学"->80,"语文"->70,"英语"->60,"英语"->80,"语文"->60,"英语"->40,"数学"->50 ),3)
val rdd2 = rdd.aggregateByKey((0,0))(seqOp = (agg:(Int,Int),curr:Int) => ( agg._1+curr,agg._2+1 ),combOp = (agg:(Int,Int),curr:(Int,Int))=> (agg._1+curr._1,agg._2+curr._2))
println(rdd2.collect().toList)
}
4byKey总结:
reduceByKey、foldByKey、combineByKey、aggregateByKey的区别:
reduceByKey的combiner与reduce的计算逻辑一样的,都是reduceByKey传入的函数 |
---|
foldByKey的combiner与reduce的计算逻辑一样的,都是foldByKey传入的函数.foldByKey在combiner阶段对每个组第一次计算的时候,函数第一个参数的值=初始值 |
combineByKey的combiner与reduce的计算逻辑可以不一样 |
aggregateByKey的combiner与reduce的计算逻辑可以不一样,aggregateByKey在combiner阶段对每个组第一次计算的时候,函数第一个参数的值=初始值 |
sortByKey
/*
sortByKey: 根据元素的key进行排序,效果和sortBy(x=> x._1)一模一样,记第二个即可
参数:
false:降序
true:升序
*/
@Test
def sortByKey(): Unit ={
val rdd = sc.parallelize(List("aa"->10,"dd"->20,"pp"->30,"cc"->40))
val rdd2 = rdd.sortByKey(false)
val rdd3 = rdd.sortBy(x=> x._1)
println(rdd2.collect().toList)
}
mapValues
/*
mapValues(func: value值类型=>B )
mapValue:在不改变key的情况下对key进行map操作,最后返回key不变的键值对
/
@Test
def mapVlaues(): Unit ={
val rdd = sc.parallelize(List("aa"->10,"dd"->20,"pp"->30,"cc"->40))
//rdd2与rdd4效果一致
val rdd2 = rdd.mapValues(x=> x/10)
val rdd4 = rdd.map(x=> (x._1,x._2/10))
println(rdd2.collect().toList)
}
join
根据key连接
/*
rdd1.join(rdd2): 两个rdd只有相同的key才能join
join的结果类型RDD[(join的key,( rdd1 key对应的vlaue值, rdd2 key对应的value值 ))
/
@Test
def join(): Unit ={
val rdd1 = sc.parallelize(List("aa"->101,"dd"->201,"pp"->301,"cc"->401))
val rdd2 = sc.parallelize(List("aa"->202,"aa"->808,"dd"->303,"ww"->404,"oo"->505))
//两个rdd的key相同的时候才能连上
val rdd3 = rdd1.join(rdd2)
val rdd4 = rdd1.leftOuterJoin(rdd2)
val rdd5 = rdd1.rightOuterJoin(rdd2)
val rdd6 = rdd1.fullOuterJoin(rdd2)
println(rdd3.collect().toList)
println(rdd4.collect().toList)
println(rdd6.collect().toList)
}
cogroup
/*
cogroup生成的新的RDD[(key,(key对应左RDD所有value值,key对应右RDD所有的value值))]
类似于:全连接+分组
*/
/**
* cogroup生成的新的RDD[(key,(key对应左RDD所有value值,key对应右RDD所有的value值))]
*/
@Test
def cogroup(): Unit ={
val rdd1 = sc.parallelize(List("aa"->101,"dd"->201,"pp"->301,"cc"->401))
val rdd2 = sc.parallelize(List("aa"->202,"aa"->808,"dd"->303,"ww"->404,"oo"->505))
val rdd3 = rdd1.cogroup(rdd2)
println(rdd3.collect().toList)
}
效果: