血缘关系、依赖链

rdd.toDebugString

  1. package tcode.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $03_Lineage {
  4. def main(args: Array[String]): Unit = {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. val rdd1 = sc.textFile("datas/wc.txt")
  7. println(rdd1.toDebugString)
  8. println("-"*100)
  9. val rdd2 = rdd1.flatMap(_.split(" "))
  10. println(rdd2.toDebugString)
  11. println("-"*100)
  12. val rdd3 = rdd2.map((_,1))
  13. println(rdd3.toDebugString)
  14. println("-"*100)
  15. val rdd4 = rdd3.reduceByKey(_+_)
  16. println(rdd4.toDebugString)
  17. println("-"*100)
  18. println(rdd4.collect().toList)
  19. }
  20. }

依赖关系

/*
依赖: 父子RDD的关系
spark RDD的依赖关系分为两种:
宽依赖: 有shuffle的为宽依赖 【 父RDD一个分区的数据被子RDD多个分区使用 】
窄依赖: 没有shuffle的为窄依赖 【父RDD一个分区的数据只被子RDD一个分区所有使用】

Application: 应用
Job: 任务 .
stage: 阶段(TaskSet)
task: 单个任务
一个SparkContext对应一个Application
一个Application对应多个Job,一个action产生一个job
一个job中对应多个stage,stage的个数 = shuffle个数 + 1
一个stage对应多个task,task的个数 = 当前stage中最后一个rdd的分区数

一个Application中多个job之间是串行
一个job中多个stage之间是串行
一个stage中多个task之间是并行

spark job的stage就是根据shuffle切分
*/

  1. package tcode.day05
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object $04_RddDepen {
  4. def main(args: Array[String]): Unit = {
  5. val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
  6. val rdd1 = sc.textFile("datas/wc.txt",6)
  7. println("+++++++++++++++++++++++++++++")
  8. println(rdd1.dependencies)
  9. println("-"*100)
  10. val rdd2 = rdd1.flatMap(_.split(" "))
  11. println(rdd2.dependencies)
  12. println("-"*100)
  13. val rdd3 = rdd2.map((_,1))
  14. println(rdd3.dependencies)
  15. println("-"*100)
  16. val rdd4 = rdd3.coalesce(3)
  17. val rdd5 = rdd4.reduceByKey(_+_)
  18. println(rdd4.dependencies)
  19. println("-"*100)
  20. println("+++++++++++++++++++++++++++++")
  21. println(rdd5.collect().toList)
  22. Thread.sleep(1000000)
  23. }
  24. }

原理:
job划分stage是根据最后一个rdd向前找父类,如果与父类rdd的关系是宽依赖,那么开始切分stage,之后继续寻找父类rdd每找到一个宽依赖切分一次stage,直到找到第一个rdd为止
rdd.dependencies

  1. val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  2. runJob(rdd, func, 0 until rdd.partitions.length)
  3. runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  4. runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
  5. dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  6. //提交job
  7. val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  8. eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,Utils.cloneProperties(properties)))
  9. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  10. //stage切分
  11. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  12. //获取或者创建父stage
  13. val parents = getOrCreateParentStages(rdd, jobId)
  14. getShuffleDependencies(rdd)
  15. //循环找到所有的shuffle依赖
  16. getMissingAncestorShuffleDependencies(shuffleDep.rdd)
  17. //创建stage
  18. createShuffleMapStage
  19. //提交stage
  20. submitStage(finalStage)
  21. //所有父stage都已经执行完成,当前stage执行
  22. submitMissingTasks(stage, jobId.get)
  23. //stage切分task
  24. val tasks: Seq[Task[_]] = try
  25. //提交task
  26. taskScheduler.submitTasks
  27. //创建tasksetManager,管理task
  28. val manager = createTaskSetManager(taskSet, maxTaskFailures)
  29. //准备提交
  30. backend.reviveOffers()
  31. //向Driver发送消息,让提交task
  32. driverEndpoint.send(ReviveOffers)
  33. //准备提交
  34. makeOffers()
  35. //提交task
  36. launchTasks(taskDescs)
  37. //将task提交到executor
  38. executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
  39. //执行task
  40. executor.launchTask(this, taskDesc)
  41. private[scheduler] def getShuffleDependencies(
  42. rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  43. //存放当前RDD的shuffle依赖
  44. val parents = new HashSet[ShuffleDependency[_, _, _]]
  45. //存放以及被访问过的rdd
  46. val visited = new HashSet[RDD[_]]
  47. //等待被访问的rdd的集合
  48. val waitingForVisit = new ListBuffer[RDD[_]]
  49. //
  50. waitingForVisit += rdd
  51. //
  52. while (waitingForVisit.nonEmpty) {
  53. //当前待访问的rdd
  54. val toVisit = waitingForVisit.remove(0)
  55. if (!visited(toVisit)) {
  56. visited += toVisit
  57. toVisit.dependencies.foreach {
  58. case shuffleDep: ShuffleDependency[_, _, _] =>
  59. parents += shuffleDep
  60. case dependency =>
  61. waitingForVisit.prepend(dependency.rdd)
  62. }
  63. }
  64. }
  65. parents
  66. }