相邻两个RDD之间的关系称之为依赖关系,新的RDD依赖旧的RDD;多个连续的RDD的依赖关系,称之为血缘关系。
一、RDD血缘关系
RDD 只支持粗粒度转换(RDD是不保存数据的),即在大量记录上执行的单个操作。为了提供容错性,将创建RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的Lineage 会记录RDD 的元数据信息和转换行为,当该RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
val fileRDD: RDD[String] = ctx.textFile("./input/data.txt")
println(fileRDD.toDebugString) //toDebugString可以打印血缘关系
println(" ")
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
println(wordRDD.toDebugString)
println(" ")
val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
println(mapRDD.toDebugString)
println(" ")
val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
println(resultRDD.toDebugString)
打印结果如下:(数字3表示分区为3的意思)
(3) ./input/data.txt MapPartitionsRDD[1] at textFile at ReduceRDDDemo.scala:24 []
| ./input/data.txt HadoopRDD[0] at textFile at ReduceRDDDemo.scala:24 []
(3) MapPartitionsRDD[2] at flatMap at ReduceRDDDemo.scala:28 []
| ./input/data.txt MapPartitionsRDD[1] at textFile at ReduceRDDDemo.scala:24 []
| ./input/data.txt HadoopRDD[0] at textFile at ReduceRDDDemo.scala:24 []
(3) MapPartitionsRDD[3] at map at ReduceRDDDemo.scala:32 []
| MapPartitionsRDD[2] at flatMap at ReduceRDDDemo.scala:28 []
| ./input/data.txt MapPartitionsRDD[1] at textFile at ReduceRDDDemo.scala:24 []
| ./input/data.txt HadoopRDD[0] at textFile at ReduceRDDDemo.scala:24 []
(3) ShuffledRDD[4] at reduceByKey at ReduceRDDDemo.scala:36 []
+-(3) MapPartitionsRDD[3] at map at ReduceRDDDemo.scala:32 []
| MapPartitionsRDD[2] at flatMap at ReduceRDDDemo.scala:28 []
| ./input/data.txt MapPartitionsRDD[1] at textFile at ReduceRDDDemo.scala:24 []
| ./input/data.txt HadoopRDD[0] at textFile at ReduceRDDDemo.scala:24 []
二、RDD 依赖关系
这里所谓的依赖关系,其实就是两个相邻RDD 之间的关系。RDD 和它依赖的父RDD 的关系有两种不同的类型, 即窄依赖( narrowdependency)和宽依赖(wide dependency)。
val sc: SparkContext = new SparkContext(conf)
val fileRDD: RDD[String] = sc.textFile("input/1.txt") println(fileRDD.dependencies)
println(" ")
val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" ")) println(wordRDD.dependencies)
println(" ")
val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1)) println(mapRDD.dependencies)
println(" ")
val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_) println(resultRDD.dependencies)
resultRDD.collect()
输出结果:
List(org.apache.spark.OneToOneDependency@3abfe845) //窄依赖
***********************
List(org.apache.spark.OneToOneDependency@40e60ece)
***********************
List(org.apache.spark.OneToOneDependency@53a665ad)
***********************
List(org.apache.spark.ShuffleDependency@328d044f) //宽依赖
三、RDD 窄依赖
窄依赖表示每一个父(上游)RDD 的Partition 最多被子(下游)RDD 的一个Partition 使用,窄依赖我们形象的比喻为独生子女。
窄依赖分区数和Task之间的关系如下,上下游分区只需要同一个Task进行处理即可。
四、RDD 宽依赖
宽依赖表示同一个父(上游)RDD 的Partition 被多个子(下游)RDD 的Partition 依赖,会引起Shuffle,总结:宽依赖我们形象的比喻为多生。
宽依赖分区数和Task之间的关系如下,上下游分区需要不同的Task进行处理。
五、RDD 阶段划分
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向, 不会闭环。例如,DAG 记录了RDD 的转换过程和任务的阶段。
如下所示:一般只有产生了宽依赖的算子之间才会出现不同的Stage,而窄依赖的算子会在同一个Stage中。
默认会有一个ResultStage(只有一个、最后执行),当RDD中出现shuffle依赖时,Stage会自动增加一个。
六、*RDD 任务划分
RDD 任务切分中间分为:Application、Job、Stage 和 Task
- Application:初始化一个 SparkContext 即生成一个Application;
- Job:一个Action 算子就会生成一个Job;
- Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
- Task:一个 Stage 阶段中,最后一个RDD 的分区个数就是Task 的个数。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。