第一章.Transformation转换算子(Key-Value类型)
1.partitionBy
package com.atguigu.spark.day04import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}import org.junit.Testclass $01_Transformation extends Serializable {//序列化忽略@transientval sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))/*** partitionBy(分区器):按照指定分区器shuffle重分区**/@Testdef partitionBy():Unit={val rdd = sc.parallelize(List(("aa",10),"bb"->20,"cc"->30,"ac"->30,"ec"->40,"cf"->50,"ad"->60))rdd.mapPartitionsWithIndex((index,it)=>{println(s"index=${index} data=${it.toList}")it}).collect()val rdd2 = rdd.partitionBy(new HashPartitioner(3))rdd2.mapPartitionsWithIndex((index,it)=>{println(s"rdd2 index=${index} data=${it.toList}")it}).collect()val rdd3 = rdd.groupBy(x=>x._1,p= new MyPartitioner(4))rdd3.mapPartitionsWithIndex((index,it)=>{println(s"rdd3 index=${index} data=${it.toList}")it}).collect()}/*** 自定义分区器* 1.创建class继承Partitioner* 2.重写抽象方法* 3.使用*/class MyPartitioner(num:Int) extends Partitioner{//获取分区数override def numPartitions: Int = if(num<4) 4 else this.num//根据key获取分区号override def getPartition(key: Any): Int = key match {case null => 0case "aa" => 1case "ac" => 2case "ec" => 3case _ => 0}}}
2.groupByKey
/*** groupByKey:按照key分组* groupByKey结果RDD[(K,V)],K代表原RDD中key,V代表在原RDD中对应的所有的value值的集合*/@Testdef groupByKey()={val rdd1 = sc.parallelize(List("aa"->1,"aa"->10,"cc"->20,"cd"->40,"aa"->50,"cc"->60,"cd"->100))val rdd2 = rdd1.groupByKey(3)println(rdd1.getNumPartitions)println(rdd2.getNumPartitions)//实现groupByKeyval rdd3 = rdd1.groupBy(_._1)println(rdd2.collect().toList)println(rdd3.collect().toList)}
3.reduceByKey
/*** reduceByKey(),按照key分组,之后针对每个组的所有value值聚合* reduceByKey与groupByKey区别:* groupByKey只是单纯的分组,没有combiner操作* reduceByKey是分组+聚合,有combiner操作,整体性能上比groupByKey要高*/@Testdef reduceByKey():Unit={val rdd1 = sc.parallelize(List("aa"->1,"aa"->10,"cc"->20,"cd"->40,"aa"->50,"cc"->60,"cd"->100),2)rdd1.mapPartitionsWithIndex((index,it)=>{println(s"index=${index} data=${it.toList}")it}).collect()val rdd2 = rdd1.reduceByKey((agg, curr) => {println(s"agg=${agg} curr=${curr}")agg + curr})println(rdd2.collect().toList)}
图解:
![$04[SparkCore(transformation转换算子)下] - 图1](/uploads/projects/liuye-6lcqc@gws1uf/093f56a7d4382d2ff92fda17c2b44927.png)
4.combineByKey
/*** combineByKey(createCombiner:RDD元素value值类型=>B,mergeValue:(B,RDD元素Value值类型)=>B,mergeCombiners:(B,B)=>B)* createCombiner:在combiner阶段对每个分区每个组的第一个value值进行转换* mergeValue:combiner聚合逻辑* mergeCombiners:reduce聚合逻辑*/@Testdef combineByKey() = {val rdd1 = sc.parallelize(List("语文" -> 60, "数学" -> 100, "语文" -> 70, "英语" -> 80, "数学" -> 65, "英语" -> 100,"英语" -> 70, "语文" -> 90, "数学" -> 100, "英语" -> 100), 2)//需求:统计学科的平均分//第一种方案val rdd2 = rdd1.groupByKey()val rdd3 = rdd2.map(x => {val totalScore = x._2.sumval count = x._2.size(x._1, totalScore.toDouble / count)})println(rdd3.collect().toList)//第二种方案val rdd4 = rdd1.map {case (name, score) => (name, (score, 1))}val rdd5 = rdd4.reduceByKey((agg, curr) => {(agg._1 + curr._1, agg._2 + curr._2)})val rdd6 = rdd5.map{case (name, (totalScore, count)) => (name, totalScore.toDouble / count)}println(rdd6.collect().toList)//第三种方案val rdd7 = rdd1.combineByKey(x=>(x,1),(agg:(Int,Int),curr)=>{(agg._1+curr,agg._2+1)},(agg:(Int,Int),curr:(Int,Int))=>{(agg._1+curr._1,agg._2+curr._2)})val result = rdd7.map{case (name,(totalScore,count))=>(name,totalScore.toDouble/count)}.collect()println(result.toList)}
图解
![$04[SparkCore(transformation转换算子)下] - 图2](/uploads/projects/liuye-6lcqc@gws1uf/18f57fed4eebd5e99ab80e1fb0caea2a.png)
![$04[SparkCore(transformation转换算子)下] - 图3](/uploads/projects/liuye-6lcqc@gws1uf/3073bdfa1250406179113cddb4ffd6f2.png)
5.flodByKey
/*** flodByKey(默认值)(func:(value值类型,value值类型)=>value值类型)* flodByKey在combiner阶段对每个组的第一次计算的时候,函数的第一个参数的初始值=*/@Testdef flodByKey():Unit={val rdd1 = sc.parallelize(List("语文" -> 60, "数学" -> 100, "语文" -> 70, "英语" -> 80, "数学" -> 65, "英语" -> 100,"英语" -> 70, "语文" -> 90, "数学" -> 100, "英语" -> 100), 2)rdd1.mapPartitionsWithIndex((index,it)=>{println(s"index=${index} data=${it.toList}")it}).collect()println(rdd1.foldByKey(1000)((agg, curr) => {println(s"agg=${agg} curr=${curr}")agg + curr}).collect().toList)}
图解
![$04[SparkCore(transformation转换算子)下] - 图4](/uploads/projects/liuye-6lcqc@gws1uf/84474ddbe83161529080d8e3af6edf56.png)
6.aggreateByKey
/*** aggreateByKey(默认值:U)(SeqOp:(U,value值类型)=>U,combOp:(U,U)=>U)* seqOp:combine计算逻辑* 在combine阶段对每个组第一次聚合的时候,seqOp函数第一个参数的初始值=默认值* combOp:reduce计算逻辑*/@Testdef aggreateByKey():Unit={val rdd1 = sc.parallelize(List("语文" -> 60, "数学" -> 100, "语文" -> 70, "英语" -> 80, "数学" -> 65, "英语" -> 100,"英语" -> 70, "语文" -> 90, "数学" -> 100, "英语" -> 100), 2)rdd1.mapPartitionsWithIndex((index,it)=>{println(s"index=${index} data=${it.toList}")it}).collect()val rdd2 = rdd1.aggregateByKey((0,0))((agg,curr)=>{println(s"combiner agg=${agg} curr=${curr}")(agg._1+curr,agg._2+1)},(agg,curr)=>{println(s"reducer agg=${agg} curr=${curr}")(agg._1+curr._1,agg._2+curr._2)})println(rdd2.collect().toList)}
reduceByKey,foldByKey,conbineByKey,aggregateByKey的区别
- reduceByKey:reduce与combine计算逻辑一样,在combiner阶段对每个组第一次聚合的时候,函数的第一个参数的初始值=默认值
- foldByKey:reduce与Combiner计算逻辑一样,在Combiner阶段对每个组第一次聚合的时候,函数的第一个参数的初始值=默认值
- combineByKey:reduce与combine计算逻辑可以不一样,在Combiner阶段会对第一个value值转换,在combine阶段对每个组第一次聚合的时候,函数的第一个参数的初始值=第一个value值的转换结果
- aggreateByKey:reduce与combine计算逻辑可以不一样,在combine阶段对每个组第一次聚合时候,函数的第一个参数的初始值=默认值
7.sortByKey
/*** sortByKey:根据key排序*/@Testdef sortByKey:Unit={val rdd1 = sc.parallelize(List("aa"->10,"ac"->20,"cc"->30,"cd"->40,"ad"->50))val rdd2 = rdd1.sortByKey(false)//sortByKey实现rdd1.sortBy(x=>x._1)println(rdd2.collect().toList)}
8.mapValues
/*** mapValues(func:RDDvalue值类型=>B):针对value值转换映射* mapValues是针对每个元素的value值进行操作*/@Testdef mapValues():Unit={val rdd1 = sc.parallelize(List("aa"->10,"ac"->20,"cc"->30,"cd"->40,"ad"->50))val rdd2 =rdd1.mapValues(x=>x/10)//等价于 rdd1.map(x=>(x._1,x._2/10))println(rdd2.collect().toList)}
9.join
/*** join:两个RDD中key相同的才能连接上,相当于sql的inner join*/@Testdef join():Unit={val rdd1 = sc.parallelize(List("zhangsan"->"1001","lisi"->"1002","wangwu"->"1003","zhaoliu"->"1001","李磊"->"1005"))val rdd2 = sc.parallelize(List("1001"->"开发部","1002"->"销售部","1003"->"市场部","1004"->"公关部"))val rdd3 = rdd1.map(x=>(x._2,x._1))val rdd4 = rdd3.join(rdd2)val rdd5 = rdd3.leftOuterJoin(rdd2)val rdd6 = rdd3.rightOuterJoin(rdd2)val rdd7 = rdd3.fullOuterJoin(rdd2)//笛卡尔积val rdd8 = rdd3.cartesian(rdd2)println(rdd4.collect().toList)println(rdd5.collect().toList)println(rdd6.collect().toList)println(rdd7.collect().toList)println(rdd8.collect().toList)}
10.cogroup
/*** cogroup = groupByKey+fullOuterJoin*/@Testdef cogroup():Unit={val rdd1 = sc.parallelize(List("1001"->"zhangsan","1002"->"lilei","1003"->"wangwu","1001"->"zhaoliu","1005"->"lilei"))val rdd2 = sc.parallelize(List("1001"->"开发部","1002"->"销售部","1003"->"市场部","1004"->"公关部"))val rdd3 = rdd1.cogroup(rdd2)println(rdd3.collect().toList)}
