血缘关系、依赖链
rdd.toDebugString
package tcode.day05import org.apache.spark.{SparkConf, SparkContext}object $03_Lineage {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val rdd1 = sc.textFile("datas/wc.txt")println(rdd1.toDebugString)println("-"*100)val rdd2 = rdd1.flatMap(_.split(" "))println(rdd2.toDebugString)println("-"*100)val rdd3 = rdd2.map((_,1))println(rdd3.toDebugString)println("-"*100)val rdd4 = rdd3.reduceByKey(_+_)println(rdd4.toDebugString)println("-"*100)println(rdd4.collect().toList)}}
依赖关系
/*
依赖: 父子RDD的关系
spark RDD的依赖关系分为两种:
宽依赖: 有shuffle的为宽依赖 【 父RDD一个分区的数据被子RDD多个分区使用 】
窄依赖: 没有shuffle的为窄依赖 【父RDD一个分区的数据只被子RDD一个分区所有使用】
Application: 应用
Job: 任务 .
stage: 阶段(TaskSet)
task: 单个任务
一个SparkContext对应一个Application
一个Application对应多个Job,一个action产生一个job
一个job中对应多个stage,stage的个数 = shuffle个数 + 1
一个stage对应多个task,task的个数 = 当前stage中最后一个rdd的分区数
一个Application中多个job之间是串行
一个job中多个stage之间是串行
一个stage中多个task之间是并行
spark job的stage就是根据shuffle切分
*/
package tcode.day05import org.apache.spark.{SparkConf, SparkContext}object $04_RddDepen {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setMaster("local[4]").setAppName("test"))val rdd1 = sc.textFile("datas/wc.txt",6)println("+++++++++++++++++++++++++++++")println(rdd1.dependencies)println("-"*100)val rdd2 = rdd1.flatMap(_.split(" "))println(rdd2.dependencies)println("-"*100)val rdd3 = rdd2.map((_,1))println(rdd3.dependencies)println("-"*100)val rdd4 = rdd3.coalesce(3)val rdd5 = rdd4.reduceByKey(_+_)println(rdd4.dependencies)println("-"*100)println("+++++++++++++++++++++++++++++")println(rdd5.collect().toList)Thread.sleep(1000000)}}
原理:
job划分stage是根据最后一个rdd向前找父类,如果与父类rdd的关系是宽依赖,那么开始切分stage,之后继续寻找父类rdd每找到一个宽依赖切分一次stage,直到找到第一个rdd为止
rdd.dependencies
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)runJob(rdd, func, 0 until rdd.partitions.length)runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)//提交jobval waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,Utils.cloneProperties(properties)))dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)//stage切分finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)//获取或者创建父stageval parents = getOrCreateParentStages(rdd, jobId)getShuffleDependencies(rdd)//循环找到所有的shuffle依赖getMissingAncestorShuffleDependencies(shuffleDep.rdd)//创建stagecreateShuffleMapStage//提交stagesubmitStage(finalStage)//所有父stage都已经执行完成,当前stage执行submitMissingTasks(stage, jobId.get)//stage切分taskval tasks: Seq[Task[_]] = try//提交tasktaskScheduler.submitTasks//创建tasksetManager,管理taskval manager = createTaskSetManager(taskSet, maxTaskFailures)//准备提交backend.reviveOffers()//向Driver发送消息,让提交taskdriverEndpoint.send(ReviveOffers)//准备提交makeOffers()//提交tasklaunchTasks(taskDescs)//将task提交到executorexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))//执行taskexecutor.launchTask(this, taskDesc)private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {//存放当前RDD的shuffle依赖val parents = new HashSet[ShuffleDependency[_, _, _]]//存放以及被访问过的rddval visited = new HashSet[RDD[_]]//等待被访问的rdd的集合val waitingForVisit = new ListBuffer[RDD[_]]//waitingForVisit += rdd//while (waitingForVisit.nonEmpty) {//当前待访问的rddval toVisit = waitingForVisit.remove(0)if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.prepend(dependency.rdd)}}}parents}
