第一章.RDD分区器
Spark目前支持Hash分区,Range分区和用户自定义分区,Hash分区为当前的默认分区,分区器直接决定了RDD中分区的个数,RDD每条数据经过Shuffle后进入哪个分区和Reduce的个数
只有key-value类型的RDD才有分区器,非key-value类型的RDD分区的值为None
每个RDD的分区ID范围: 0~numPartitions-1,决定这个值是属于哪个分区的
package com.atguigu.spark.day06import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}object $01_Partitioner {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))sc.setLogLevel("error")/*** Spark自带的分区器: HashPartitioner、RangePartitioner* HashPartitioner的分区规则:key.hashCode%分区数 < 0 ? key.hashCode % 分区数 + 分区数 : key.hashcode % 分区数* RangePartitioner的分区规则:* 首先针对RDD数据采样,然后通过collect收集采样结果* 根据分区数[N],选择几个Key[N-1],确定每个分区的边界* 后续根据数据的key与每个分区的边界对比,决定让数据去哪个分区中* RangerPartitioner的结果是分区间有序,分区内无序*/val rdd = sc.parallelize(List( 1 ->"hello" ,2->"spark",10->"hadoop",7->"spark",3->"flume",9->"kafka",5->"scala" ),3)//println(rdd.partitioner)rdd.mapPartitionsWithIndex((index,it)=>{println(s" rdd index=${index} it=${it.toList}")it}).collect()println("=" * 100)val rdd2 = rdd.partitionBy(new RangePartitioner(3, rdd))rdd2.mapPartitionsWithIndex((index,it)=>{println(s" rdd2 index=${index} it=${it.toList}")it}).collect()}}
![$06[SparkCore(分区器_数据读取与保存_累加器_广播变量)] - 图1](/uploads/projects/liuye-6lcqc@gws1uf/7da05f56d388bf37c2994dc12359f74e.png)
第二章.数据读取与保存
package com.atguigu.spark.day06import org.apache.spark.{SparkConf, SparkContext}import org.junit.Testclass $02_ReadAndWrite {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))@Testdef write()={val rdd1 = sc.parallelize(List(1,4,3,2,7,4,6,10))//保存为文本rdd1.saveAsTextFile("output/txt")//保存为对象文件rdd1.saveAsObjectFile("output/obj")val rdd2 = sc.parallelize(List("aa"->10,"bb"->20),2)//保存为序列化文件rdd2.saveAsSequenceFile("output/seq")}@Testdef read():Unit={//读取文本println(sc.textFile("output/txt").collect().toList)//读取对象文件println(sc.objectFile[Int]("output/obj").collect().toList)//读取序列化文件println(sc.sequenceFile[String, Int]("output/seq").collect().toList)}}
第三章.累加器
1.累加器介绍
package com.atguigu.spark.day06import org.apache.spark.{SparkConf, SparkContext}object $03_Accumulator {/*** 累加器* 原理:首先在每个分区中对分区所有数据进行累加,然后将每个分区的累加结果发给Driver,由Driver汇总* 好处:可以在一定程度上减少shuffle操作* 场景:只能用于聚合的场景*/def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val acc = sc.longAccumulator("xx")val rdd = sc.parallelize(List(10,20,30,40),2)//var sum =0//rdd.foreach(x=> sum = sum + x)rdd.foreach(x=>acc.add(x))println(acc.value)}}
2.WordCountAccumulator
- 自定义累加器
package com.atguigu.spark.day06import org.apache.spark.util.AccumulatorV2import scala.collection.mutable/*** 自定义累加器* 1.定义一个class继承 AccumulatorV2[IN,OUT]* IN:代表累加的元素类型* OUT:代表最终结果类型* 2.重写抽象方法*/class WordCountAccumulator extends AccumulatorV2[(String,Int),mutable.Map[String,Int]]{//中间结果容器val map = mutable.Map[String,Int]()//判断累加器是否为空override def isZero: Boolean = map.isEmpty//复制累加器对象override def copy(): AccumulatorV2[(String, Int), mutable.Map[String, Int]] = new WordCountAccumulator//重置累加器override def reset(): Unit = map.clear()//在每个分区中累加元素的逻辑override def add(element: (String, Int)): Unit = {//将之前的单词次数与当前次数累加val totalCount= map.getOrElse(element._1,0) + element._2map.put(element._1,totalCount)}//在Driver中对所有的分区结果全局汇总override def merge(other: AccumulatorV2[(String, Int), mutable.Map[String, Int]]): Unit = {//获取当前task的累加结果val taskMap = other.valueval totalList = taskMap.toList ++ map.toList//按照单词进行分组val groupByMap = totalList.groupBy(x => x._1)val result = groupByMap.map(x => {val r = x._2.reduce((agg, curr) => {(agg._1, agg._2 + curr._2)})r})map.++=(result)}override def value: mutable.Map[String, Int] = map}
- 使用自定义累加器实现WordCount
package com.atguigu.spark.day06import org.apache.spark.{SparkConf, SparkContext}object $04_WordCountAccumulator {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val acc = new WordCountAccumulator//注册累加器sc.register(acc,"wc")val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(_.split(" "))val rdd3 = rdd2.map((_,1))//val rdd4 = rdd3.reduceByKey(_ + _)rdd3.foreach(x=>acc.add(x))println(acc.value)Thread.sleep(100000)}}
![$06[SparkCore(分区器_数据读取与保存_累加器_广播变量)] - 图2](/uploads/projects/liuye-6lcqc@gws1uf/ea3a1760f229f36c7d0d59a5c5ea841f.png)
![$06[SparkCore(分区器_数据读取与保存_累加器_广播变量)] - 图3](/uploads/projects/liuye-6lcqc@gws1uf/e4d559f1ca3aa1a6c21c9e9a81fdee49.png)
可以看到,用累加器实现的wordcount没有shuffle操作
第四章.广播变量
1.使用场景一(算子需要使用Driver中数据时)
package com.atguigu.spark.day06import org.apache.spark.{SparkConf, SparkContext}import org.junit.Test/*** 广播变量:* 场景:* 1.当算子中需要使用Driver数据的时候可以使用广播变量,减少该数据占用的内存空间* 2.大表join小表的时候,可以减少shuffle操作* 如何广播数据:* 1.广播数据: val bc = sc.broadcast(数据)* 2.获取广播数据: bc.value*/class $05_BroadCast {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))@Testdef m():Unit={val data = Map("alibaba"->"www.alibaba.com","atgugiu"->"www.atguigu.com","jd"->"www.jd.com","pdd"->"www.pdd.com")//广播数据val bc = sc.broadcast(data)val rdd = sc.parallelize(List("alibaba","atguigu","jd","pdd"),10)val rdd2 = rdd.map(x => {bc.value.getOrElse(x,"")})println(rdd2.collect().toList)Thread.sleep(100000000)}}
2.使用场景2(大表join小表时)
@Testdef n():Unit={val studentRdd = sc.parallelize(List(("1001","lisi","A1"),("1002","wangwu","A2"),("1003","zhaoliu","A1"),("1004","lilei","A3"),("1005","hanmeimei","A4"),("1006","wangermazi","A5"),))val classRdd = sc.parallelize(List(("A1","大数据1班"),("A2","大数据2班"),("A3","大数据3班"),("A4","大数据4班"),("A5","大数据5班"),))//需求获取学生信息和所在班级名称/*val stuRdd = studentRdd.map {case (id, name, classid) => (classid, (id, name))}val resultRdd = stuRdd.join(classRdd)val r = resultRdd.map {case ((classid, ((id, name), className))) => (id, name, className)}println(r.collect().toList)*///广播小表数据val classMap = classRdd.collect().toMapval bc = sc.broadcast(classMap)val r = studentRdd.map({case (id, name, classid) => (id, name, bc.value.getOrElse(classid, ""))})println(r.collect().toList)}
3.广播变量原理
![$06[SparkCore(分区器_数据读取与保存_累加器_广播变量)] - 图4](/uploads/projects/liuye-6lcqc@gws1uf/ea79b2ec48da416126dd67637618e9dc.png)
