第一章.RDD分区器

Spark目前支持Hash分区,Range分区和用户自定义分区,Hash分区为当前的默认分区,分区器直接决定了RDD中分区的个数,RDD每条数据经过Shuffle后进入哪个分区和Reduce的个数

只有key-value类型的RDD才有分区器,非key-value类型的RDD分区的值为None

每个RDD的分区ID范围: 0~numPartitions-1,决定这个值是属于哪个分区的

  1. package com.atguigu.spark.day06
  2. import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
  3. object $01_Partitioner {
  4. def main(args: Array[String]): Unit = {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. sc.setLogLevel("error")
  7. /**
  8. * Spark自带的分区器: HashPartitioner、RangePartitioner
  9. * HashPartitioner的分区规则:key.hashCode%分区数 < 0 ? key.hashCode % 分区数 + 分区数 : key.hashcode % 分区数
  10. * RangePartitioner的分区规则:
  11. * 首先针对RDD数据采样,然后通过collect收集采样结果
  12. * 根据分区数[N],选择几个Key[N-1],确定每个分区的边界
  13. * 后续根据数据的key与每个分区的边界对比,决定让数据去哪个分区中
  14. * RangerPartitioner的结果是分区间有序,分区内无序
  15. */
  16. val rdd = sc.parallelize(List( 1 ->"hello" ,2->"spark",10->"hadoop",7->"spark",3->"flume",9->"kafka",5->"scala" ),3)
  17. //println(rdd.partitioner)
  18. rdd.mapPartitionsWithIndex((index,it)=>{
  19. println(s" rdd index=${index} it=${it.toList}")
  20. it
  21. }).collect()
  22. println("=" * 100)
  23. val rdd2 = rdd.partitionBy(new RangePartitioner(3, rdd))
  24. rdd2.mapPartitionsWithIndex((index,it)=>{
  25. println(s" rdd2 index=${index} it=${it.toList}")
  26. it}).collect()
  27. }
  28. }

$06[SparkCore(分区器_数据读取与保存_累加器_广播变量)] - 图1

第二章.数据读取与保存

  1. package com.atguigu.spark.day06
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.junit.Test
  4. class $02_ReadAndWrite {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. @Test
  7. def write()={
  8. val rdd1 = sc.parallelize(List(1,4,3,2,7,4,6,10))
  9. //保存为文本
  10. rdd1.saveAsTextFile("output/txt")
  11. //保存为对象文件
  12. rdd1.saveAsObjectFile("output/obj")
  13. val rdd2 = sc.parallelize(List("aa"->10,"bb"->20),2)
  14. //保存为序列化文件
  15. rdd2.saveAsSequenceFile("output/seq")
  16. }
  17. @Test
  18. def read():Unit={
  19. //读取文本
  20. println(sc.textFile("output/txt").collect().toList)
  21. //读取对象文件
  22. println(sc.objectFile[Int]("output/obj").collect().toList)
  23. //读取序列化文件
  24. println(sc.sequenceFile[String, Int]("output/seq").collect().toList)
  25. }
  26. }

第三章.累加器

1.累加器介绍

  1. package com.atguigu.spark.day06
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $03_Accumulator {
  4. /**
  5. * 累加器
  6. * 原理:首先在每个分区中对分区所有数据进行累加,然后将每个分区的累加结果发给Driver,由Driver汇总
  7. * 好处:可以在一定程度上减少shuffle操作
  8. * 场景:只能用于聚合的场景
  9. */
  10. def main(args: Array[String]): Unit = {
  11. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  12. val acc = sc.longAccumulator("xx")
  13. val rdd = sc.parallelize(List(10,20,30,40),2)
  14. //var sum =0
  15. //rdd.foreach(x=> sum = sum + x)
  16. rdd.foreach(x=>acc.add(x))
  17. println(acc.value)
  18. }
  19. }

2.WordCountAccumulator

  1. 自定义累加器
  1. package com.atguigu.spark.day06
  2. import org.apache.spark.util.AccumulatorV2
  3. import scala.collection.mutable
  4. /**
  5. * 自定义累加器
  6. * 1.定义一个class继承 AccumulatorV2[IN,OUT]
  7. * IN:代表累加的元素类型
  8. * OUT:代表最终结果类型
  9. * 2.重写抽象方法
  10. */
  11. class WordCountAccumulator extends AccumulatorV2[(String,Int),mutable.Map[String,Int]]{
  12. //中间结果容器
  13. val map = mutable.Map[String,Int]()
  14. //判断累加器是否为空
  15. override def isZero: Boolean = map.isEmpty
  16. //复制累加器对象
  17. override def copy(): AccumulatorV2[(String, Int), mutable.Map[String, Int]] = new WordCountAccumulator
  18. //重置累加器
  19. override def reset(): Unit = map.clear()
  20. //在每个分区中累加元素的逻辑
  21. override def add(element: (String, Int)): Unit = {
  22. //将之前的单词次数与当前次数累加
  23. val totalCount= map.getOrElse(element._1,0) + element._2
  24. map.put(element._1,totalCount)
  25. }
  26. //在Driver中对所有的分区结果全局汇总
  27. override def merge(other: AccumulatorV2[(String, Int), mutable.Map[String, Int]]): Unit = {
  28. //获取当前task的累加结果
  29. val taskMap = other.value
  30. val totalList = taskMap.toList ++ map.toList
  31. //按照单词进行分组
  32. val groupByMap = totalList.groupBy(x => x._1)
  33. val result = groupByMap.map(x => {
  34. val r = x._2.reduce((agg, curr) => {
  35. (agg._1, agg._2 + curr._2)
  36. })
  37. r
  38. })
  39. map.++=(result)
  40. }
  41. override def value: mutable.Map[String, Int] = map
  42. }
  1. 使用自定义累加器实现WordCount
  1. package com.atguigu.spark.day06
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $04_WordCountAccumulator {
  4. def main(args: Array[String]): Unit = {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. val acc = new WordCountAccumulator
  7. //注册累加器
  8. sc.register(acc,"wc")
  9. val rdd1 = sc.textFile("datas/wc.txt")
  10. val rdd2 = rdd1.flatMap(_.split(" "))
  11. val rdd3 = rdd2.map((_,1))
  12. //val rdd4 = rdd3.reduceByKey(_ + _)
  13. rdd3.foreach(x=>acc.add(x))
  14. println(acc.value)
  15. Thread.sleep(100000)
  16. }
  17. }

$06[SparkCore(分区器_数据读取与保存_累加器_广播变量)] - 图2

$06[SparkCore(分区器_数据读取与保存_累加器_广播变量)] - 图3

可以看到,用累加器实现的wordcount没有shuffle操作

第四章.广播变量

1.使用场景一(算子需要使用Driver中数据时)

  1. package com.atguigu.spark.day06
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.junit.Test
  4. /**
  5. * 广播变量:
  6. * 场景:
  7. * 1.当算子中需要使用Driver数据的时候可以使用广播变量,减少该数据占用的内存空间
  8. * 2.大表join小表的时候,可以减少shuffle操作
  9. * 如何广播数据:
  10. * 1.广播数据: val bc = sc.broadcast(数据)
  11. * 2.获取广播数据: bc.value
  12. */
  13. class $05_BroadCast {
  14. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  15. @Test
  16. def m():Unit={
  17. val data = Map("alibaba"->"www.alibaba.com","atgugiu"->"www.atguigu.com","jd"->"www.jd.com","pdd"->"www.pdd.com")
  18. //广播数据
  19. val bc = sc.broadcast(data)
  20. val rdd = sc.parallelize(List("alibaba","atguigu","jd","pdd"),10)
  21. val rdd2 = rdd.map(x => {
  22. bc.value.getOrElse(x,"")
  23. })
  24. println(rdd2.collect().toList)
  25. Thread.sleep(100000000)
  26. }
  27. }

2.使用场景2(大表join小表时)

  1. @Test
  2. def n():Unit={
  3. val studentRdd = sc.parallelize(List(
  4. ("1001","lisi","A1"),
  5. ("1002","wangwu","A2"),
  6. ("1003","zhaoliu","A1"),
  7. ("1004","lilei","A3"),
  8. ("1005","hanmeimei","A4"),
  9. ("1006","wangermazi","A5"),
  10. ))
  11. val classRdd = sc.parallelize(List(
  12. ("A1","大数据1班"),
  13. ("A2","大数据2班"),
  14. ("A3","大数据3班"),
  15. ("A4","大数据4班"),
  16. ("A5","大数据5班"),
  17. ))
  18. //需求获取学生信息和所在班级名称
  19. /*val stuRdd = studentRdd.map {
  20. case (id, name, classid) => (classid, (id, name))
  21. }
  22. val resultRdd = stuRdd.join(classRdd)
  23. val r = resultRdd.map {
  24. case ((classid, ((id, name), className))) => (id, name, className)
  25. }
  26. println(r.collect().toList)*/
  27. //广播小表数据
  28. val classMap = classRdd.collect().toMap
  29. val bc = sc.broadcast(classMap)
  30. val r = studentRdd.map({
  31. case (id, name, classid) => (id, name, bc.value.getOrElse(classid, ""))
  32. })
  33. println(r.collect().toList)
  34. }

3.广播变量原理

$06[SparkCore(分区器_数据读取与保存_累加器_广播变量)] - 图4