第一章.Action行动算子

1.reduce

  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.junit.Test
  4. class $01_Action {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. /**
  7. * reduce(func:(RDD元素类型,RDD元素类型)=>RDD元素类型):对一个RDD所有元素聚合
  8. * reduce是通过函数在每个分区中对分区内的所有数据先聚合,然后将每个分区的聚合结果发给driver,又driver汇总聚合
  9. */
  10. @Test
  11. def reduce():Unit={
  12. val rdd = sc.parallelize(List(1,4,3,2,8,10,9,7))
  13. val result = rdd.reduce((agg,curr)=>{
  14. println(s"${Thread.currentThread().getName} agg=${agg} curr=${curr}")
  15. agg+curr
  16. })
  17. println(result)
  18. Thread.sleep(100000)
  19. }
  20. }

2.collect

  1. /**
  2. * collect:收集Rdd每个分区的数据返回给Driver<重点>
  3. * 如果RDD分区数据比较大,Driver内存默认只有1G,所以
  4. */
  5. @Test
  6. def collect():Unit={
  7. val rdd = sc.parallelize(List(1,4,3,2,8,10,9,7))
  8. println(rdd.collect().toList)
  9. }

3.count

  1. /**
  2. * count:统计RDD元素个数
  3. */
  4. @Test
  5. def count():Unit={
  6. val rdd = sc.parallelize(List(1,4,3,2,8,10,9,7))
  7. println(rdd.count())
  8. }

4.first

  1. /**
  2. * first:获取RDD第一个元素
  3. *
  4. */
  5. @Test
  6. def first():Unit={
  7. val rdd1 = sc.parallelize(List(10,4,3,2,8,10,9,7))
  8. rdd1.mapPartitionsWithIndex((index,it)=>{
  9. println(s"rdd1 index=${index} data=${it.toList}")
  10. it
  11. }).collect()
  12. println(rdd1.first())
  13. val rdd2 = rdd1.map(x => (x, x))
  14. .partitionBy(new HashPartitioner(3))
  15. rdd2.mapPartitionsWithIndex((index,it)=>{
  16. println(s"rdd2 index=${index} data=${it.toList}")
  17. it
  18. }).collect()
  19. println(rdd2.first())
  20. }

5.take

  1. /**
  2. * take:获取前N个元素
  3. */
  4. @Test
  5. def take():Unit={
  6. val rdd1 = sc.parallelize(List(10,4,3,2,8,10,9,7))
  7. println(rdd1.take(3).toList)
  8. }

6.takeOrdered

  1. /**
  2. * takeOrdered:对RDD元素排序之后取前N个元素
  3. *
  4. */
  5. @Test
  6. def takeOrdered():Unit={
  7. val rdd1 = sc.parallelize(List(10,4,3,2,8,10,9,7))
  8. println(rdd1.takeOrdered(3).toList)
  9. }

7.aggregate

  1. /**
  2. * aggregate(默认值)(seqop:(默认值类型,RDD元素类型)=>默认值类型,comop:(默认值类型,默认值类型)=>默认值类型):
  3. * 先对每个分区所有数据聚合,然后将所有分区的聚合结果发给driver汇总
  4. * seqop:在每个分区中对分区的所有数据进行聚合,每个分区第一次聚合的时候,函数第一个参数的初始值=默认值
  5. * comop:在Driver中对每个分区汇总结果再次全局汇总,Driver第一次计算的时候,函数的第一个参数的初始值=默认值
  6. */
  7. @Test
  8. def aggregate():Unit={
  9. val rdd = sc.parallelize(List(10,4,3,2,8,10,9,7))
  10. rdd.mapPartitionsWithIndex((index,it)=>{
  11. println(s"rdd index=${index} data=${it.toList}")
  12. it
  13. }).collect()
  14. rdd.aggregate(1000)((agg,curr)=>{
  15. println(s"分区汇总过程: agg=${agg} curr=${curr}")
  16. agg+curr
  17. }, (agg,curr)=>{
  18. println(s"driver汇总过程: agg=${agg} curr=${curr}")
  19. agg+curr
  20. })
  21. }

8.fold

  1. /**
  2. * fold与aggreate的区别
  3. * aggreate的分区间与Driver汇总计算逻辑可以不一样
  4. * fold的分区间与Driver汇总计算逻辑一样
  5. */
  6. @Test
  7. def fold():Unit= {
  8. val rdd = sc.parallelize(List(10, 4, 3, 2, 8, 10, 9, 7))
  9. val rdd2 = rdd.fold(1000)((agg,curr)=>{
  10. println(s"汇总过程: agg=${agg} curr=${curr}")
  11. agg+curr
  12. })
  13. }

9.countByKey

  1. /**
  2. * countByKey:统计每个Key的个数
  3. * countByKey一般结合sample使用,工作中出现数据倾斜以后一般先用sample采集样本数据,
  4. * 然后使用countByKey统计样本数据中每个key的个数,从而判断哪些key出现了数据倾斜,从而判断哪些key使用了数据倾斜
  5. */
  6. @Test
  7. def countByKey()={
  8. val rdd = sc.parallelize(List("aa"->10,"bb"->20,"aa"->3,"aa"->4,"cc"->50,"bb"->1))
  9. println(rdd.countByKey)
  10. }

10.save

  1. /**
  2. * save:保存RDD数据到磁盘
  3. */
  4. @Test
  5. def save()={
  6. val rdd = sc.parallelize(List("aa"->10,"bb"->20,"aa"->3,"aa"->4,"cc"->50,"bb"->1))
  7. rdd.saveAsTextFile("output/text")
  8. }

11.foreach

  1. /**
  2. * foreach(func:RDD元素类型=>Unit):遍历
  3. * foreach与map的区别
  4. * map是转换算子,会生成新的RDD
  5. * foreach是行动算子,没有返回值
  6. * foreach里面的函数是针对元素操作,RDD有多少元素,函数就调用多少次
  7. */
  8. @Test
  9. def foreach()={
  10. val rdd = sc.parallelize(List("aa"->10,"bb"->20,"aa"->3,"aa"->4,"cc"->50,"bb"->1))
  11. rdd.foreach(x=>println(x))
  12. }

11.foreachPartition

  1. /**
  2. * foreachPartition(func:Iterator[RDD元素类型]=>Unit):Unit<常用>
  3. * foreachPartition一般用于将数据保存在mysql,hbase,redis等地方
  4. */
  5. @Test
  6. def foreachPartition():Unit={
  7. val rdd = sc.parallelize(List("zhangsan"->10,"lisi"->20,"wangwu"->3,"zhaoliu"->4))
  8. rdd.foreachPartition(it=>{
  9. var connection:Connection = null
  10. var statement:PreparedStatement = null
  11. try{
  12. connection = DriverManager.getConnection(".....")
  13. statement = connection.prepareStatement("insert into .. values(?,?)")
  14. var i = 0
  15. it.foreach(x=>{
  16. statement.setString(1,x._1)
  17. statement.setInt(2,x._2)
  18. statement.addBatch()
  19. if(i%1000==0){
  20. //提交一个批次的数据
  21. statement.executeBatch()
  22. statement.clearBatch()
  23. }
  24. i=i+1
  25. })
  26. //提交最后一个不满1000条的批次数据
  27. statement.executeBatch()
  28. }catch{
  29. case e:Exception => e.printStackTrace()
  30. }finally{
  31. if(statement!=null)
  32. statement.close()
  33. if(connection!=null)
  34. connection.close()
  35. }
  36. })
  37. }

第二章.RDD序列化

1.闭包

  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.junit.Test
  4. class $02_closePackage {
  5. /**
  6. * 闭包:函数体中调用外部变量的函数称之为闭包
  7. */
  8. @Test
  9. def m(): Unit ={
  10. val y = 10
  11. //闭包函数
  12. val func = (x:Int)=>x+y
  13. println(func(100))
  14. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  15. val rdd = sc.parallelize(List(1,4,3,2,8,9,5,7))
  16. //Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的
  17. //此时y是Int类型,底层实现了序列化
  18. val rdd2 = rdd.map(x => {
  19. x * y
  20. })
  21. println(rdd2.collect().toList)
  22. }
  23. }

2.Spark序列化

  1. 闭包使用未序列化的外部变量时
  1. @Test
  2. def n()={
  3. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  4. val rdd = sc.parallelize(List(1,4,3,2,8,9,5,7))
  5. val p = new Person
  6. //Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的
  7. //此时p是person类型,是在driver里面定义的,p并没有序列化
  8. val rdd2 = rdd.map(x => {
  9. x * p.y
  10. })
  11. println(rdd2.collect().toList)
  12. }
  13. class Person{
  14. var y = 10
  15. }

运行结果报错:

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图1

  1. driver中使用的变量可以序列化时
  1. @Test
  2. def p()={
  3. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  4. val rdd = sc.parallelize(List(1,4,3,2,8,9,5,7))
  5. val p = new Person1
  6. //Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的
  7. //此时a是局部变量,是Int类型,可以被序列化
  8. val rdd2 = p.cover(rdd)
  9. println(rdd2.collect().toList)
  10. }
  11. class Person1{
  12. var y = 10
  13. def cover(rdd:RDD[Int])={
  14. val a:Int = y
  15. rdd.map(x=>x*a)
  16. }
  17. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图2

  1. 使用样例类,样例类底层实现了序列化接口
  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.rdd.RDD
  4. object $03_closePackage {
  5. def main(args: Array[String]): Unit = {
  6. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  7. val rdd = sc.parallelize(List(1, 4, 3, 2, 8, 9, 5, 7))
  8. val p = new Person2
  9. //Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的
  10. //此时a是局部变量,是Int类型,可以被序列化
  11. val rdd2 = p.cover(rdd)
  12. println(rdd2.collect().toList)
  13. }
  14. case class Person2() {
  15. var y = 10
  16. def cover(rdd: RDD[Int]) = {
  17. val a: Int = y
  18. rdd.map(x => x * y)
  19. }
  20. }
  21. }
  1. Spark序列化方式

spark为什么要序列化?(重点)

  • spark算子里面的代码是在Executor中执行,算子外面的代码是在Driver中执行,如果算子里面有使用Driver定义的对象,此时需要将Driver定义的对象序列化之后传给Task使用

spark序列化分为两种

  • java序列化方式:spark默认使用
  • 使用java序列化时会将类的全类名,继承信息,属性信息,属性的类型信息,其他信息全部都会序列化
  • Kryo序列化方式
  • 使用Kryo序列化时只会序列化类的基本信息,比如类名,属性名,属性类型
  • kryo序列化性能上比java序列化高10倍左右
  • 工作中一般使用kryo序列化

spark如何使用序列化?(重点)

  • 需要在sparkConf中配置spark序列化的默认方式: new SparkConf().set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”) 【spark优化】
  • 配置哪些类的对象后续使用kryo序列化[可选]: conf.registerKryoClasses(Array(classOf[Student]))

验证kryo序列化比java序列化方式性能高

  1. java序列化
  1. package com.atguigu.spark.day05
  2. import org.junit.Test
  3. import java.io.{FileOutputStream, ObjectOutputStream}
  4. import scala.beans.BeanProperty
  5. class $03_Ser {
  6. /**
  7. * java序列化
  8. */
  9. @Test
  10. def javaSer():Unit={
  11. val student = new Student
  12. student.setName("lisi")
  13. student.setAge(20)
  14. val oos = new ObjectOutputStream(new FileOutputStream("d:/java.txt"))
  15. oos.writeObject(student)
  16. oos.flush()
  17. oos.close()
  18. }
  19. }
  20. class Student extends Serializable{
  21. @BeanProperty
  22. var name:String = _
  23. @BeanProperty
  24. var age:Int = _
  25. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图3

  1. kryo序列化
  1. package com.atguigu.spark.day05
  2. import com.esotericsoftware.kryo.Kryo
  3. import com.esotericsoftware.kryo.io.Output
  4. import org.junit.Test
  5. import java.io.{FileOutputStream, ObjectOutputStream}
  6. import scala.beans.BeanProperty
  7. class $04_Ser {
  8. /**
  9. * kryo序列化
  10. */
  11. @Test
  12. def kryoSer():Unit={
  13. val student = new Student
  14. student.setName("lisi")
  15. student.setAge(20)
  16. val kryo = new Kryo()
  17. val output = new Output(new FileOutputStream("d:/kryo.txt"))
  18. kryo.writeObject(output,student)
  19. output.flush()
  20. output.close()
  21. }
  22. }
  23. class Student extends Serializable{
  24. @BeanProperty
  25. var name:String = _
  26. @BeanProperty
  27. var age:Int = _
  28. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图4

  1. 读取kryo序列化的对象
  1. package com.atguigu.spark.day05
  2. import com.esotericsoftware.kryo.Kryo
  3. import com.esotericsoftware.kryo.io.{Input, Output}
  4. import org.junit.Test
  5. import java.io.{FileInputStream, FileOutputStream, ObjectOutputStream}
  6. import scala.beans.BeanProperty
  7. class $04_Ser {
  8. /**
  9. 读取kryo序列化的对象
  10. */
  11. @Test
  12. def kryoRead():Unit={
  13. val kryo = new Kryo()
  14. val input = new Input(new FileInputStream("d:/kryo.txt"))
  15. val student = kryo.readObject(input,classOf[Student])
  16. println(student.name)
  17. println(student.age)
  18. }
  19. }
  20. class Student extends Serializable{
  21. @BeanProperty
  22. var name:String = _
  23. @BeanProperty
  24. var age:Int = _
  25. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图5

Spark配置序列化

  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.rdd.RDD
  4. object $03_closePackage {
  5. def main(args: Array[String]): Unit = {
  6. //val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  7. /**
  8. * Spark配置序列化
  9. */
  10. val conf = new SparkConf().set("spark.serializer","org.apache.spark.serializer.KryoSerializer").setMaster("local[4]").setAppName("test")
  11. conf.registerKryoClasses(Array(classOf[Student]))
  12. val sc = new SparkContext(conf)
  13. val rdd = sc.parallelize(List(1, 4, 3, 2, 8, 9, 5, 7))
  14. val p = new Person2
  15. //Spark算子里面的代码是在executor中执行的,算子外面的代码是在Driver中执行的
  16. //此时a是局部变量,是Int类型,可以被序列化
  17. val rdd2 = p.cover(rdd)
  18. println(rdd2.collect().toList)
  19. }
  20. case class Person2() {
  21. var y = 10
  22. def cover(rdd: RDD[Int]) = {
  23. val a: Int = y
  24. rdd.map(x => x * y)
  25. }
  26. }
  27. }

第三章.RDD依赖关系

1.血统

  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * 血统:是指一个job中一系列RDD的依赖关系,是指一个家族之间的关系
  5. * 可以通过toDebugString查看RDD 的血统
  6. */
  7. object $05_Lineage {
  8. def main(args: Array[String]): Unit = {
  9. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  10. val rdd1 = sc.textFile("datas/wc.txt")
  11. val rdd2 = rdd1.flatMap(_.split(" "))
  12. val rdd3 = rdd2.map(x => (x, 1))
  13. val rdd4 = rdd3.reduceByKey(_ + _)
  14. println(rdd4.toDebugString)
  15. println(rdd4.collect().toList)
  16. }
  17. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图6

2.依赖关系

  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * 依赖关系:父子RDD的关系
  5. */
  6. object $04_Dependcy {
  7. def main(args: Array[String]): Unit = {
  8. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  9. println("-" * 100)
  10. val rdd1 = sc.textFile("datas/wc.txt")
  11. println(rdd1.dependencies)
  12. println("-" * 100)
  13. val rdd2 = rdd1.flatMap(_.split(" "))
  14. println(rdd2.dependencies)
  15. println("-" * 100)
  16. val rdd3 = rdd2.map(x => (x, 1))
  17. println(rdd3.dependencies)
  18. println("-" * 100)
  19. val rdd4 = rdd3.reduceByKey(_ + _)
  20. println(rdd4.dependencies)
  21. println("-" * 100)
  22. println(rdd4.collect().toList)
  23. }
  24. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图7

  • Spark之间的RDD依赖关系有两种

宽依赖:有shuffle操作的称之为宽依赖[父RDD一个分区的数据被子RDD多个分区所使用]

窄依赖:没有shuffle操作的称之为窄依赖[父RDD一个分区的数据只被子RDD一个分区所使用]

  • job进行stage切分的时候是根据最后一个RDD的依赖关系依次从后往前查询,遇到宽依赖就切分stage,然后再次向前查询,直到查询到第一个RDD为止
  • job中stage执行的时候是从前往后执行的,因为后面stage的输入数据是前面stage的输出数据
  • Application:应用[一个Sparkcontext为一个应用]
  • job :任务[一个action算子产生一个job]
    • stage:阶段[一个job中stage的个数=shuffle个数+1]
      • task:子任务[一个stage中task个数=该stage中最后一个RDD的分区数]
  • 一个Application中多个job之间并行
  • 一个job中多个stage串行
  • 一个stage中多个task是并行的

3.stage切分源码

  1. val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  2. //this是调用collect算子的rdd
  3. runJob(rdd, func, 0 until rdd.partitions.length)
  4. runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  5. runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
  6. dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  7. //提交job
  8. val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  9. //将job信息放入消息池中,由专门的线程统一处理
  10. eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,Utils.cloneProperties(properties)))
  11. //处理job提交
  12. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  13. //切分stage并返回最后一个stage
  14. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  15. //切分stage
  16. val parents = getOrCreateParentStages(rdd, jobId)
  17. //获取最靠近当前rdd的shuffle依赖
  18. getShuffleDependencies(rdd)
  19. //创建stage
  20. getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  21. //根据shuffle依赖再次向前找其他shuffle
  22. getMissingAncestorShuffleDependencies(shuffleDep.rdd)
  23. //创建stage
  24. createShuffleMapStage(shuffleDep, firstJobId)
  25. //创建stage对象
  26. val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
  27. //创建最后一个stage
  28. val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  29. //提交stage
  30. submitStage(finalStage)
  31. //获取当前stage的所有父stage,并根据id排序
  32. val missing = getMissingParentStages(stage).sortBy(_.id)
  33. //执行stage
  34. submitMissingTasks(stage, jobId.get)
  35. //将stage转成task
  36. val tasks: Seq[Task[_]] = ....
  37. //提交task
  38. taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
  39. //调度执行
  40. backend.reviveOffers()
  41. //给Driver发送提交task的指令
  42. override def reviveOffers(): Unit = {
  43. driverEndpoint.send(ReviveOffers)
  44. }
  45. //Driver接受到提交task指令,开始提交task
  46. case ReviveOffers => makeOffers()
  47. //提交task
  48. launchTasks(taskDescs)
  49. //向executor发送执行task指令
  50. executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
  51. //val rdd1 = sc.textFile("...")
  52. //val rdd2 = rdd1.flatMap(_.split(" "))
  53. //val rdd3 = rdd2.map(x=>(x,1))
  54. //val rdd4 = rdd3.reduceByKey(_+_)
  55. //val rdd5 = rdd4.coalesce(1)
  56. //rdd5.collect
  57. //获取最靠近当前rdd的shuffle依赖
  58. private[scheduler] def getShuffleDependencies(
  59. rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  60. //创建一个装载shuffle依赖的容器
  61. val parents = new HashSet[ShuffleDependency[_, _, _]]
  62. //创建一个装载访问过的rdd的容器
  63. val visited = new HashSet[RDD[_]]
  64. //创建一个状态待访问的RDD的容器
  65. val waitingForVisit = new ListBuffer[RDD[_]]
  66. //将当前rdd放入待访问容器中
  67. waitingForVisit += rdd
  68. //判断是否有带访问的RDD
  69. while (waitingForVisit.nonEmpty) {
  70. //从带访问容器中取出第一个rdd进行访问
  71. val toVisit = waitingForVisit.remove(0)
  72. //判断该rdd之前是否被访问过
  73. if (!visited(toVisit)) {
  74. //将目前待访问的rdd放入已访问的容器中,后续在遇到该rdd就不用再访问了
  75. visited += toVisit
  76. //获取当前待访问rdd的依赖
  77. toVisit.dependencies
  78. //遍历每个依赖
  79. .foreach {
  80. //shuffle依赖
  81. case shuffleDep: ShuffleDependency[_, _, _] =>
  82. parents += shuffleDep
  83. //窄依赖
  84. case dependency =>
  85. waitingForVisit.prepend(dependency.rdd)
  86. }
  87. }
  88. }
  89. parents
  90. }

第四章.RDD持久化

1.RDD持久化的原因

  1. 未使用RDD持久化
  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $06_Cache {
  4. def main(args: Array[String]): Unit = {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. sc.setLogLevel("error")
  7. val rdd1 = sc.textFile("datas/wc.txt")
  8. val rdd2 = rdd1.flatMap(line=>{
  9. println(s"----------------------->${line}")
  10. line.split(" ")
  11. })
  12. val rdd3 = rdd2.map(x => (x, 1))
  13. val rdd4 = rdd2.map(x=>x.length)
  14. rdd3.collect()
  15. rdd4.collect()
  16. }
  17. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图8

  1. 使用RDD持久化
  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * RDD持久化的原因:如果同一个RDD在多个job中重复使用,那么默认情况下,该RDD之前的步骤会执行多次,会影响效率,所以需要在
  5. * 第一个job执行完成之后将该RDD数据持久化到磁盘,后续其他job需要该RDD数据的时候直接从磁盘获取数据,不用重复计算了
  6. *
  7. * RDD持久化使用场景:
  8. * 1.同一个RDD在job中重复使用的时候,可以减少该RDD重复计算的次数
  9. * 2.一个job中RDD依赖链条太长的时候,可以使用RDD持久化防止计算出错导致重新计算花费大量的时间
  10. *
  11. * RDD持久化方式
  12. * 1.缓存
  13. * 数据保存的位置: 内存/分区所在主机的本地磁盘
  14. * 如何使用缓存:
  15. * 1.rdd.cache()
  16. * 2.rdd.persist
  17. * cache与persist的区别:
  18. * cache的数据只保存在内存中
  19. * persist可以设置数据保存在哪里[内存/磁盘]
  20. * 存储级别
  21. * NONE: 代表不存储
  22. * DISK_ONLY: 只保存在磁盘中
  23. * DISK_ONLY_2: 只保存在磁盘中,数据保存两份
  24. * MEMORY_ONLY: 数据只保存在内存中
  25. * MEMORY_ONLY_2: 数据只保存在内存中,数据保存两份
  26. * MEMORY_ONLY_SER: 数据只保存在内存中以序列化形式存储
  27. * MEMORY_ONLY_SER_2 : 数据只保存在内存中以序列化形式存储,数据保存两份
  28. * MEMORY_AND_DISK : 数据一部分保存在内存一部分在磁盘
  29. * MEMORY_AND_DISK_2 : 数据一部分保存在内存一部分在磁盘,数据保存两份
  30. * MEMORY_AND_DISK_SER : 数据一部分保存在内存一部分在磁盘以序列化形式存储
  31. * MEMORY_AND_DISK_SER_2 : 数据一部分保存在内存一部分在磁盘以序列化形式存储,数据保存两份
  32. * OFF_HEAP: 数据保存在堆外内存中
  33. * 工作常用的存储级别: MEMORY_AND_DISK<用于大数据量场景>、MEMORY_ONLY<用于小数据量场景>
  34. * 2.chackpoint
  35. *
  36. *
  37. */
  38. object $06_Cache {
  39. def main(args: Array[String]): Unit = {
  40. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  41. sc.setLogLevel("error")
  42. val rdd1 = sc.textFile("datas/wc.txt")
  43. val rdd2 = rdd1.flatMap(line=>{
  44. println(s"----------------------->${line}")
  45. line.split(" ")
  46. })
  47. //将RDD数据缓存下来,供后续job直接使用
  48. //val rdd21 = rdd2.cache()
  49. val rdd21 = rdd2.persist()
  50. val rdd3 = rdd21.map(x => (x, 1))
  51. val rdd4 = rdd21.map(x=>x.length)
  52. rdd3.collect()
  53. rdd4.collect()
  54. }
  55. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图9

2.CheckPoint(检查点)

checkpoint

  • 原因:缓存数据是保存在分区所在机器的磁盘/内存中,数据可能丢失,如果数据丢失需要重新计算,影响效率,所以最好将数据保存在可靠存储介质[HDFS]中
  • 数据存储位置:数据存储在HDFS上
  • 如何使用checkpoint
  • 设置数据持久化路径: sc.setCheckpointDir(…)
  • 持久化RDD
  • checkpoint持久化时机:等到RDD所在的第一个job执行完成之后,会出发该RDDcheckpoint操作[该操作会重新出发一个新的job,该job执行当前RDD之前的数据处理步骤]
  • 可以在checkpoint之前将RDD缓存起来避免重复执行
  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $07_CheckPoint {
  4. def main(args: Array[String]): Unit = {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. sc.setLogLevel("error")
  7. //设置数据持久化路径
  8. sc.setCheckpointDir("checkpoint")
  9. val rdd1 = sc.textFile("datas/wc.txt")
  10. val rdd2 = rdd1.flatMap(line => {
  11. println(s"----------------------->${line}")
  12. line.split(" ")
  13. })
  14. rdd2.cache()
  15. rdd2.checkpoint()
  16. val rdd3 = rdd2.map(x => (x, 1))
  17. val rdd4 = rdd2.map(x => x.length)
  18. rdd3.collect()
  19. rdd4.collect()
  20. }
  21. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图10

3.缓存与checkpoint的区别

缓存与checkpoint的区别

  1. 数据持久化的位置不一样
  • 缓存是将数据持久化到分区所在主机的内存/本地磁盘
  • checkpoint是将数据持久化到HDFS中
  1. 依赖关系是否切除不一样
  • 缓存是将数据持久化到分区所在主机的内存/本地磁盘,如果服务器宕机,数据丢失之后需要根据RDD的依赖关系重新计算得到数据
  • checkpoint是将数据持久化到HDFS中,数据不会丢失,此时RDD的依赖关系会切除

4.shuffle与缓存

  1. package com.atguigu.spark.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. /**
  4. * shuffle算子的数据会落盘相当于自带缓存操作
  5. */
  6. object $08_Shuffle {
  7. def main(args: Array[String]): Unit = {
  8. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  9. sc.setLogLevel("error")
  10. val rdd1 = sc.textFile("datas/wc.txt")
  11. val rdd2 = rdd1.flatMap(line => {
  12. println(s"----------------------->${line}")
  13. line.split(" ")
  14. })
  15. val rdd3 = rdd2.map(x => (x, 1))
  16. val rdd4 = rdd3.reduceByKey(_+_)
  17. val rdd5 = rdd4.map(x => (x._1, x._2 + 100))
  18. val rdd6 = rdd4.map(x => (x._1, x._2 + 1000))
  19. rdd5.collect()
  20. rdd6.collect()
  21. }
  22. }

$05[SparkCore(Action_序列化_依赖关系_持久化)] - 图11