相邻两个RDD之间的关系称之为依赖关系,新的RDD依赖旧的RDD;多个连续的RDD的依赖关系,称之为血缘关系。

一、RDD血缘关系

RDD 只支持粗粒度转换(RDD是不保存数据的),即在大量记录上执行的单个操作。为了提供容错性,将创建RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的Lineage 会记录RDD 的元数据信息和转换行为,当该RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

  1. val fileRDD: RDD[String] = ctx.textFile("./input/data.txt")
  2. println(fileRDD.toDebugString) //toDebugString可以打印血缘关系
  3. println(" ")
  4. val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
  5. println(wordRDD.toDebugString)
  6. println(" ")
  7. val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
  8. println(mapRDD.toDebugString)
  9. println(" ")
  10. val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
  11. println(resultRDD.toDebugString)
  12. 打印结果如下:(数字3表示分区为3的意思)
  13. (3) ./input/data.txt MapPartitionsRDD[1] at textFile at ReduceRDDDemo.scala:24 []
  14. | ./input/data.txt HadoopRDD[0] at textFile at ReduceRDDDemo.scala:24 []
  15. (3) MapPartitionsRDD[2] at flatMap at ReduceRDDDemo.scala:28 []
  16. | ./input/data.txt MapPartitionsRDD[1] at textFile at ReduceRDDDemo.scala:24 []
  17. | ./input/data.txt HadoopRDD[0] at textFile at ReduceRDDDemo.scala:24 []
  18. (3) MapPartitionsRDD[3] at map at ReduceRDDDemo.scala:32 []
  19. | MapPartitionsRDD[2] at flatMap at ReduceRDDDemo.scala:28 []
  20. | ./input/data.txt MapPartitionsRDD[1] at textFile at ReduceRDDDemo.scala:24 []
  21. | ./input/data.txt HadoopRDD[0] at textFile at ReduceRDDDemo.scala:24 []
  22. (3) ShuffledRDD[4] at reduceByKey at ReduceRDDDemo.scala:36 []
  23. +-(3) MapPartitionsRDD[3] at map at ReduceRDDDemo.scala:32 []
  24. | MapPartitionsRDD[2] at flatMap at ReduceRDDDemo.scala:28 []
  25. | ./input/data.txt MapPartitionsRDD[1] at textFile at ReduceRDDDemo.scala:24 []
  26. | ./input/data.txt HadoopRDD[0] at textFile at ReduceRDDDemo.scala:24 []

二、RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻RDD 之间的关系。RDD 和它依赖的父RDD 的关系有两种不同的类型, 即窄依赖( narrowdependency)和宽依赖(wide dependency)。
image.png

  1. val sc: SparkContext = new SparkContext(conf)
  2. val fileRDD: RDD[String] = sc.textFile("input/1.txt") println(fileRDD.dependencies)
  3. println(" ")
  4. val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" ")) println(wordRDD.dependencies)
  5. println(" ")
  6. val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1)) println(mapRDD.dependencies)
  7. println(" ")
  8. val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_) println(resultRDD.dependencies)
  9. resultRDD.collect()
  10. 输出结果:
  11. List(org.apache.spark.OneToOneDependency@3abfe845) //窄依赖
  12. ***********************
  13. List(org.apache.spark.OneToOneDependency@40e60ece)
  14. ***********************
  15. List(org.apache.spark.OneToOneDependency@53a665ad)
  16. ***********************
  17. List(org.apache.spark.ShuffleDependency@328d044f) //宽依赖

三、RDD 窄依赖

窄依赖表示每一个父(上游)RDD 的Partition 最多被子(下游)RDD 的一个Partition 使用,窄依赖我们形象的比喻为独生子女。
窄依赖分区数和Task之间的关系如下,上下游分区只需要同一个Task进行处理即可。
image.png

四、RDD 宽依赖

宽依赖表示同一个父(上游)RDD 的Partition 被多个子(下游)RDD 的Partition 依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为多生。
宽依赖分区数和Task之间的关系如下,上下游分区需要不同的Task进行处理。
image.png

五、RDD 阶段划分

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向, 不会闭环。例如,DAG 记录了RDD 的转换过程和任务的阶段。
图片1.png图片3.png
如下所示:一般只有产生了宽依赖的算子之间才会出现不同的Stage,而窄依赖的算子会在同一个Stage中。
默认会有一个ResultStage(只有一个、最后执行),当RDD中出现shuffle依赖时,Stage会自动增加一个。
image.png

六、*RDD 任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个Application;
  • Job:一个Action 算子就会生成一个Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个RDD 的分区个数就是Task 的个数。

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
图片7.png