血缘关系、依赖链
rdd.toDebugString
package tcode.day05
import org.apache.spark.{SparkConf, SparkContext}
object $03_Lineage {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
val rdd1 = sc.textFile("datas/wc.txt")
println(rdd1.toDebugString)
println("-"*100)
val rdd2 = rdd1.flatMap(_.split(" "))
println(rdd2.toDebugString)
println("-"*100)
val rdd3 = rdd2.map((_,1))
println(rdd3.toDebugString)
println("-"*100)
val rdd4 = rdd3.reduceByKey(_+_)
println(rdd4.toDebugString)
println("-"*100)
println(rdd4.collect().toList)
}
}
依赖关系
/*
依赖: 父子RDD的关系
spark RDD的依赖关系分为两种:
宽依赖: 有shuffle的为宽依赖 【 父RDD一个分区的数据被子RDD多个分区使用 】
窄依赖: 没有shuffle的为窄依赖 【父RDD一个分区的数据只被子RDD一个分区所有使用】
Application: 应用
Job: 任务 .
stage: 阶段(TaskSet)
task: 单个任务
一个SparkContext对应一个Application
一个Application对应多个Job,一个action产生一个job
一个job中对应多个stage,stage的个数 = shuffle个数 + 1
一个stage对应多个task,task的个数 = 当前stage中最后一个rdd的分区数
一个Application中多个job之间是串行
一个job中多个stage之间是串行
一个stage中多个task之间是并行
spark job的stage就是根据shuffle切分
*/
package tcode.day05
import org.apache.spark.{SparkConf, SparkContext}
object $04_RddDepen {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))
val rdd1 = sc.textFile("datas/wc.txt",6)
println("+++++++++++++++++++++++++++++")
println(rdd1.dependencies)
println("-"*100)
val rdd2 = rdd1.flatMap(_.split(" "))
println(rdd2.dependencies)
println("-"*100)
val rdd3 = rdd2.map((_,1))
println(rdd3.dependencies)
println("-"*100)
val rdd4 = rdd3.coalesce(3)
val rdd5 = rdd4.reduceByKey(_+_)
println(rdd4.dependencies)
println("-"*100)
println("+++++++++++++++++++++++++++++")
println(rdd5.collect().toList)
Thread.sleep(1000000)
}
}
原理:
job划分stage是根据最后一个rdd向前找父类,如果与父类rdd的关系是宽依赖,那么开始切分stage,之后继续寻找父类rdd每找到一个宽依赖切分一次stage,直到找到第一个rdd为止
rdd.dependencies
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
runJob(rdd, func, 0 until rdd.partitions.length)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
//提交job
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,Utils.cloneProperties(properties)))
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
//stage切分
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
//获取或者创建父stage
val parents = getOrCreateParentStages(rdd, jobId)
getShuffleDependencies(rdd)
//循环找到所有的shuffle依赖
getMissingAncestorShuffleDependencies(shuffleDep.rdd)
//创建stage
createShuffleMapStage
//提交stage
submitStage(finalStage)
//所有父stage都已经执行完成,当前stage执行
submitMissingTasks(stage, jobId.get)
//stage切分task
val tasks: Seq[Task[_]] = try
//提交task
taskScheduler.submitTasks
//创建tasksetManager,管理task
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//准备提交
backend.reviveOffers()
//向Driver发送消息,让提交task
driverEndpoint.send(ReviveOffers)
//准备提交
makeOffers()
//提交task
launchTasks(taskDescs)
//将task提交到executor
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
//执行task
executor.launchTask(this, taskDesc)
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
//存放当前RDD的shuffle依赖
val parents = new HashSet[ShuffleDependency[_, _, _]]
//存放以及被访问过的rdd
val visited = new HashSet[RDD[_]]
//等待被访问的rdd的集合
val waitingForVisit = new ListBuffer[RDD[_]]
//
waitingForVisit += rdd
//
while (waitingForVisit.nonEmpty) {
//当前待访问的rdd
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.prepend(dependency.rdd)
}
}
}
parents
}