1) RDD 血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

2) RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系

3) RDD 窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。
class OneToOneDependencyT extends NarrowDependencyT

4) RDD 宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会 引起 Shuffle,总结:宽依赖我们形象的比喻为多生。
class ShuffleDependencyK: ClassTag, V: ClassTag, C: ClassTag
extends Dependency[Product2[K, V]]

5) RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了 RDD 的转换过程和任务的阶段。
image.png

6) RDD 阶段划分源码

  1. try {
  2. // New stage creation may throw an exception if, for example, jobs are run on
  3. a
  4. // HadoopRDD whose underlying HDFS files have been deleted.
  5. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  6. } catch {
  7. case e: Exception =>
  8. logWarning("Creating new stage failed due to exception - job: " + jobId, e)
  9. listener.jobFailed(e)
  10. return
  11. }
  12. ……
  13. private def createResultStage(
  14. rdd: RDD[_],
  15. func: (TaskContext, Iterator[_]) => _,
  16. partitions: Array[Int],
  17. jobId: Int,
  18. callSite: CallSite): ResultStage = {
  19. val parents = getOrCreateParentStages(rdd, jobId)
  20. val id = nextStageId.getAndIncrement()
  21. val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  22. stageIdToStage(id) = stage
  23. updateJobIdStageIdMaps(jobId, stage)
  24. stage
  25. }
  26. ……
  27. private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage]
  28. = {
  29. getShuffleDependencies(rdd).map { shuffleDep =>
  30. getOrCreateShuffleMapStage(shuffleDep, firstJobId)
  31. }.toList
  32. }
  33. ……
  34. private[scheduler] def getShuffleDependencies(
  35. rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
  36. val parents = new HashSet[ShuffleDependency[_, _, _]]
  37. val visited = new HashSet[RDD[_]]
  38. val waitingForVisit = new Stack[RDD[_]]
  39. waitingForVisit.push(rdd)
  40. while (waitingForVisit.nonEmpty) {
  41. val toVisit = waitingForVisit.pop()
  42. if (!visited(toVisit)) {
  43. visited += toVisit
  44. toVisit.dependencies.foreach {
  45. case shuffleDep: ShuffleDependency[_, _, _] =>
  46. parents += shuffleDep
  47. case dependency =>
  48. waitingForVisit.push(dependency.rdd)
  49. }
  50. } }
  51. parents
  52. }

7) RDD 任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
image.png

8) RDD 任务划分源码

  1. val tasks: Seq[Task[_]] = try {
  2. stage match {
  3. case stage: ShuffleMapStage =>
  4. partitionsToCompute.map { id =>
  5. val locs = taskIdToLocations(id)
  6. val part = stage.rdd.partitions(id)
  7. new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
  8. taskBinary, part, locs, stage.latestInfo.taskMetrics, properties,
  9. Option(jobId),
  10. Option(sc.applicationId), sc.applicationAttemptId)
  11. }
  12. case stage: ResultStage =>
  13. partitionsToCompute.map { id =>
  14. val p: Int = stage.partitions(id)
  15. val part = stage.rdd.partitions(p)
  16. val locs = taskIdToLocations(id)
  17. new ResultTask(stage.id, stage.latestInfo.attemptId,
  18. taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
  19. Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
  20. }
  21. }
  22. ……
  23. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  24. ……
  25. override def findMissingPartitions(): Seq[Int] = {
  26. mapOutputTrackerMaster
  27. .findMissingPartitions(shuffleDep.shuffleId)
  28. .getOrElse(0 until numPartitions)
  29. }