2.1.1 Shuffle 原理

**2.1 spark原理(1) - 图1

2.1.1.1 SortShuffle

  1. mapTask将map(聚合算子)或array(join算子)写入内存
  2. 达到阀值发生溢写,溢写前根据key排序,分批写入磁盘,最终将所有临时文件合并成一个最终文件,并建立一份索引记录分区信息。一个mapTask最终形成一个文件。
  3. reduceTask拉取各个task中自己的分区数据去计算。

    2.1.1.2 和hadoop shuffle的区别

  4. 两者触发shuffle的前提条件?

    1. MR没有所谓的DAG划分,一次MR任务就意味着一次shuffle
    2. spark则是RDD驱动的,行动算子触发时才会按宽窄依赖划分阶段,只有宽依赖才会发生shuffle
  5. 两者的排序区别?
    1. MR在reduce端还会进行一次合并排序
    2. spark则在map端就完成了排序,采用Tim-Sort排序算法
  6. 两者存数据规则?
    1. MR的reduce拉取的数据直接放磁盘再读
    2. spark则是先放内存,放不下才放磁盘
  7. 两者是什么时候开始计算的?

  8. MR在数据拉取完毕后才开始计算
    1. spark则是边拉边计算(reduceByKey原理)
  9. 基于以上种种原因,MR自定义分区器时往往还需要自定义分组,spark则不需要(或者说map结构已经是自定义分组了)。

    2.1.2 Spark Job 提交流程(重点)

    2.1.2.1 standalone

    image.png

  10. driver端:通过反射获取主类执行main方法 -> 创建sparkconf和sparkContext,创建通信环境、后端调度器(负责向master发送注册信息、向excutor发送task的调度器)、task调度器、DAG(根据宽窄依赖划分stage)调度器 ->封装任务信息提交给Master

  11. Master端:缓存任务信息并将其放入任务队列 -> 轮到该任务时,调用调度方法进行资源调度 ->发送调度信息给对应的worker
  12. Worker端:worker将调度信息封装成对象 -> 调用对象的start方法,启动excutor进程
  13. Excutor进程:启动后向driver端反向注册(driver端拿到信息后注册excutor,向其发送任务) -> 创建线程池,封装任务对象 -> 获取池中线程执行任务 -> 反序列化TastSet,执行给定的各种算子步骤

    2.1.2.2 yarn-client

    /??

    2.1.2.3 yarn-cluster

    /???

    2.1.3 Job 运行原理

    **
    阶段一:我们编写driver程序,定义RDD的action和transformation操作。这些依赖关系形成操作的DAG。
    阶段二:根据形成的DAG,DAGScheduler将其划分为不同的stage。
    阶段三:每一个stage中有一个TaskSet,DAGScheduler将TaskSet交给TaskScheduler去执行,TaskScheduler将任务执行完毕之后结果返回给DAGSCheduler。
    阶段四:TaskScheduler将任务分发到每一个Worker节点去执行,并将结果返回给TaskScheduler。image.png

    2.1.4 Task 重试与本地化级别

    TaskScheduler遍历taskSet,调用launchTask方法根据数据”本地化级别”发送task到指定的Executor
    task在选择Executor时,会优先第一级,如果该Executor资源不足则会等待一段时间(默认3s),然后逐渐降级。

    2.1.4.1 本地化级别

  • PROCESS_LOCAL 进程本地化
  • NODE_LOCAL 节点本地化
  • NO_PREF 非本地化
  • RACK_LOCAL 机架本地化
  • ANY 任意

    2.1.4.2 重试机制

    taskSet监视到某个task处于失败或挣扎状态时,会进行重试机制
    当某个task提交失败后,默认会重试3次,3次之后DAGScheduler会重新提交TaskSet再次尝试,总共提交4次,当12次之后判定job失败,杀死Executor
    挣扎状态:当75%的Task完成之后,每隔100s计算所有剩余task已执行时间的中位数,超过这个数的1.5倍的task判定为挣扎task。

    2.1.5 DAG原理(源码级)

  1. sparkContext创建DAGScheduler->创建EventProcessLoop->调用eventLoop.start()方法开启事件监听
  2. action调用sparkContext.runJob->eventLoop监听到事件,调用handleJobSubmitted开始划分stage
  3. 首先对触发job的finalRDD调用createResultStage方法,通过getOrCreateParentStages获取所有父stage列表,然后创建自己。
    如:父(stage1,stage2),再创建自己stage3
  4. getOrCreateParentStages内部会调用getShuffleDependencies获取所有直接宽依赖(从后往前推,窄依赖直接跳过)
    在这个图中G的直接宽依赖是A和F,B因为是窄依赖所以跳过,所以最后B和G属于同一个stage
  5. 接下来会循环宽依赖列表,分别调用getOrCreateShuffleMapStage:
    — 如果某个RDD已经被划分过会直接返回stageID;否则就执行getMissingAncestorShuffleDependencies方法,继续寻找该RDD的父宽依赖,窄依赖老规矩直接加入:
    — 如果返回的宽依赖列表不为空,则继续执行4,5的流程直到为空为止; — 如果返回的宽依赖列表为空,则说明它没有父RDD或者没有宽依赖,此时可以直接调用createShuffleMapStage将该stage创建出来
  6. 因此最终的划分结果是stage3(B,G)、stage2(C,D,E,F)、stage1(A)
  7. 创建ResultStage,调用submitStage提交这个stage
  8. submitStage会首先检查这个stage的父stage是否已经提交,如果没提交就开始递归调用submitStage提交父stage,最后再提交自己。
  9. 每一个stage都是一个taskSet,每次提交都会提交一个taskSet给TaskScheduler