学习目标:
①从宏观上了解Job(Spark on YARN)提交的流程。(画图)
②了解Job在提交之后,进行任务的划分,Stage的划分,任务的调度的过程!
结合: 宽依赖,窄依赖,Stage,task , job
③了解整个Job在执行期间Driver和Executor之间的通信方式
④Shuffle (区别不同的shuffle)
Spark是如何实现Shuffle!1
不同的Shuffle的效率影响!
⑤Spark的内存管理 (只有统一内存管理,用什么GC回收器)


spark的通信框架

spark每个终端(endpoint)都会有以后收信箱(inbox),还可以选择启动多个发信信箱(outbox),具体由需要发送给什么终端决定。

image.png

每个终端通过dispatcher,将信息发送给本地或者其他终端,
如果发送个其他终端则先将信息发送到outbox,之后通过transportClient将数据发送出去,由别的终端的transportServer接收后传输到inbox,被其他终端消费,依据是指向inbox的标识nettyRpcEndpointRef
image.png

spark的app部署模式

SparkOnYarnClient提交流程:

image.png

client模式的源码解析:

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-yarn_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  6. spark-submit --master yarn --xxx xx com.atguigu.spark.WordCount /input
  7. -----------------------------------
  8. 第一部分: 客户端和YARN通信
  9. org.apache.spark.deploy.SparkSubmit.main
  10. -- submit.doSubmit(args)
  11. --super.doSubmit(args)
  12. // 解析spark-submit后传入的参数,加载spark默认的参数
  13. -- val appArgs = parseArguments(args)
  14. --submit(appArgs, uninitLog)
  15. --doRunMain()
  16. //运行spark-submit 提交的主类 中的main方法
  17. --runMain(args, uninitLog)
  18. // 如果deployMode == CLIENT,此时childMainClass=自己提交的全类名
  19. // 如果deployMode == CLUSTER,是YARN集群 childMainClass=org.apache.spark.deploy.yarn.YarnClusterApplication
  20. --val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
  21. //创建childMainClass的Class类型
  22. --mainClass = Utils.classForName(childMainClass)
  23. --val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
  24. mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
  25. } else {
  26. //如果是client 模式,此时运行下一行
  27. new JavaMainApplication(mainClass)
  28. }
  29. //JavaMainApplication.start
  30. -- app.start(childArgs.toArray, sparkConf)
  31. //获取用户编写的 app类的main
  32. -- val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
  33. //执行main方法,Driver启动
  34. --mainMethod.invoke(null, args)
  35. ------------------------------------------------------------------------
  36. 第二部分: WordCount.main
  37. SparkContext sc=new SparkContext
  38. SparkContext的重要组件:
  39. private var _taskScheduler: TaskScheduler = _ //负责Task的调度
  40. private var _dagScheduler: DAGScheduler = _ // Job中DAG Stage的切分
  41. private var _env: SparkEnv = _ // RpcEnv(通信环境) BlockManager(存数据)
  42. --------------
  43. _taskScheduler = ts
  44. _taskScheduler.start()
  45. //YarnClientSchedulerBackend.start()
  46. --backend.start()
  47. -- client = new Client(args, conf, sc.env.rpcEnv)
  48. //提交应用程序到YARN
  49. -- bindToYarn(client.submitApplication(), None)
  50. //参考cluster模式
  51. --......
  52. --在容器中运行的AM的全类名: org.apache.spark.deploy.yarn.ExecutorLauncher
  53. -----------------------------
  54. 第二部分:启动AM
  55. org.apache.spark.deploy.yarn.ExecutorLauncher.main
  56. //ExecutorLauncher 是对AM的封装,也是AM的实现
  57. ApplicationMaster.main(args)
  58. -- --master.run()
  59. if (isClusterMode) {
  60. --runDriver()
  61. } else {
  62. //client模式运行
  63. runExecutorLauncher() //申请Container启动Executor
  64. }
  65. ----------------------------------------------------------------------------
  66. // YarnClusterApplication.start() Client是可以和yarn进行通信的一个客户端
  67. --new Client(new ClientArguments(args), conf, null).run()
  68. --this.appId = submitApplication()
  69. // 提交应用程序到YARN上,获取YARN的返回值
  70. -- val newApp = yarnClient.createApplication()
  71. //确定app保存临时数据的作业目录,通常是在hdfs中的/tmp目录中生成一个子目录
  72. -- val appStagingBaseDir = sparkConf.get(STAGING_DIR)
  73. //确保YARN有足够的资源运行当前app的 AM
  74. // 如果集群资源不足,此时会阻塞,一直阻塞到超时,会FAILD
  75. -- verifyClusterResources(newAppResponse)
  76. -- val containerContext = createContainerLaunchContext(newAppResponse)
  77. // 确定AM的主类名
  78. // AM是YARN提供的一个接口
  79. // 任何的应用程序如果希望提交APP到YARN,实现AM的接口
  80. // MR写的app,MR实现AM
  81. // spark写的App,spark实现AM
  82. --val amClass =
  83. if (isClusterMode) {
  84. //集群模式AM的全类名是org.apache.spark.deploy.yarn.ApplicationMaster
  85. Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
  86. } else {
  87. Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
  88. }
  89. -- val appContext = createApplicationSubmissionContext(newApp, containerContext)
  90. //提交运行AM
  91. -- yarnClient.submitApplication(appContext)

SparkOnYarnCluster提交流程架构图示:

image.png

image.png

流程源码概述:

1、客户端与yarn通信:
1)通过:spark-submit --master yarn --deploy-mode cluster --xxx xx com.atguigu.spark.WordCount /input指令
2)启动执行org.apache.spark.deploy.SparkSubmit.main进程,启動client线程,在客户端中提交申请应用给resouremanage
3)之后resourceManager找到一台nodemanager
2、AM进程启动
image.png
1)創建amContainer并在其中执行ApplicationMaster.main.run 方法,启动一个线程来运行用户自己编写的app应用程序,并起名为“Driver”
2)等用户的app运行完毕,获取用户自己创建的sparkContext,通過獲取sparkContext的參數向RM汇报AM启动成功(registerAM,其实是注册)并申请Executor的资源:container
3)之后RM返回其资源列表给appmaster,master获取到已经分配的container,在其中运行进程,每个container都会创建一个 ExecutorRunnable,交给线程池运行
4)org.apache.spark.executor.YarnCoarseGrainedExecutorBackend就是那个运行的进程,這個進程就是我們的executor
3、启动YarnCoarseGrainedExecutorBackend进程(executor,负责通信):
1)进程中向driver请求spark配置信息:new SparkConf,与driver产生通信
2)并且给当前进程在网络中起了一个“Executor”别名,这里的负责通信的executor和下面负责计算的executor都是图中所说的executor,只是负责的功能不一样
其中:进程YarnCoarseGrainedExecutorBackend别名就是executor,负责通信
而这个进程类中,就有个属性 val executor =null ;负责计算
下图所框出来的起始就是负责通信的executor
image.pngimage.png
3)之后该进程向Dirver发送RegisterExecutor的反向注册信息,Driver接收到注册信息后返回注册成功信息
4)注册成功过后,YarnCoarseGrainedExecutorBackend.receive方法接收到返回信息,开始调用父类GrainedExecutorBackend的Executor,(spark的计算者,他维护 了一个线程池,用来计算各种Task,但需要等待driver发送task过来才能运行。)之后返回executor启动成功的信息给driver
5)之后driver线程调用DriverEndPoint.receive方法处理过来的信息,并向Executor调度工作任务makeOffers(上图的发offers,就是指派任务的意思):(Task)
task如何发送:
//从调度池(TaskSet)中,根据TaskSet的优先级,调度其中的Task。
// 按照轮流发送的原则将Task调度到所有的Executor,保证负载均衡。
//调度应该发往次Executor的Task
6)之后反序列化task,然后创建task运行线程new TaskRunner(context, taskDescription)执行task:
ResultTask
ShufflerTask

cluster模式的源码解析:

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-yarn_2.12</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  6. spark-submit --master yarn --deploy-mode cluster --xxx xx com.atguigu.spark.WordCount /input
  7. -----------------------------------
  8. 第一部分: 客户端和YARN通信
  9. org.apache.spark.deploy.SparkSubmit.main
  10. -- submit.doSubmit(args)
  11. --super.doSubmit(args)
  12. // 解析spark-submit后传入的参数,加载spark默认的参数
  13. -- val appArgs = parseArguments(args)
  14. --submit(appArgs, uninitLog)
  15. --doRunMain()
  16. //运行spark-submit 提交的主类 中的main方法
  17. --runMain(args, uninitLog)
  18. // 如果deployMode == CLIENT,此时childMainClass=自己提交的全类名
  19. // 如果deployMode == CLUSTER,是YARN集群 childMainClass=org.apache.spark.deploy.yarn.YarnClusterApplication
  20. --val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
  21. //创建childMainClass的Class类型
  22. --mainClass = Utils.classForName(childMainClass)
  23. --val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
  24. //如果是cluster 模式,此时运行下一行
  25. mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
  26. } else {
  27. new JavaMainApplication(mainClass)
  28. }
  29. -- app.start(childArgs.toArray, sparkConf)
  30. // YarnClusterApplication.start() Client是可以和yarn进行通信的一个客户端
  31. --new Client(new ClientArguments(args), conf, null).run()
  32. --this.appId = submitApplication()
  33. // 提交应用程序到YARN上,获取YARN的返回值
  34. -- val newApp = yarnClient.createApplication()
  35. //确定app保存临时数据的作业目录,通常是在hdfs中的/tmp目录中生成一个子目录
  36. -- val appStagingBaseDir = sparkConf.get(STAGING_DIR)
  37. //确保YARN有足够的资源运行当前app的 AM
  38. // 如果集群资源不足,此时会阻塞,一直阻塞到超时,会FAILD
  39. -- verifyClusterResources(newAppResponse)
  40. -- val containerContext = createContainerLaunchContext(newAppResponse)
  41. // 确定AM的主类名
  42. // AM是YARN提供的一个接口
  43. // 任何的应用程序如果希望提交APP到YARN,实现AM的接口
  44. // MR写的app,MR实现AM
  45. // spark写的App,spark实现AM
  46. --val amClass =
  47. if (isClusterMode) {
  48. //集群模式AM的全类名是org.apache.spark.deploy.yarn.ApplicationMaster
  49. Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
  50. } else {
  51. Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
  52. }
  53. -- val appContext = createApplicationSubmissionContext(newApp, containerContext)
  54. //提交运行AM
  55. -- yarnClient.submitApplication(appContext)
  56. ----------------------------------------
  57. 第二部分 AM启动
  58. ApplicationMaster.main
  59. --master.run()
  60. --if (isClusterMode) {
  61. //cluster模式运行
  62. --runDriver()
  63. } else {
  64. runExecutorLauncher()
  65. }
  66. // 启动一个线程,运行用户自己编写的app应用程序的main方法
  67. --userClassThread = startUserApplication()
  68. //为线程起名Driver
  69. --userThread.setName("Driver")
  70. //等待用户自己编写的app的main方法运行结束后,获取用户自己创建的SparkContext
  71. --val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
  72. Duration(totalWaitTime, TimeUnit.MILLISECONDS))
  73. // 向RM回报,AM已经启动成功了
  74. --registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
  75. // 创建一个可以向RM申请资源的对象,申请资源启动Executor
  76. -- createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
  77. //allocator: YarnAllocator 作用就是向RM申请Container,决定获取到COntainer后,使用Container干什么
  78. --allocator = client.createAllocator()
  79. --allocator.allocateResources()
  80. //发送申请请求
  81. -- val allocateResponse = amClient.allocate(progressIndicator)
  82. //从YARN的响应中获取已经分配的Container
  83. --val allocatedContainers = allocateResponse.getAllocatedContainers()
  84. //处理申请到的Container
  85. -- handleAllocatedContainers(allocatedContainers.asScala)
  86. //在Container中运行进程
  87. -- runAllocatedContainers(containersToUse)
  88. //每个Container都会创建一个ExecutorRunnable,交给线程池运行
  89. --new ExecutorRunnable.run
  90. -- nmClient = NMClient.createNMClient()
  91. nmClient.init(conf)
  92. nmClient.start()
  93. //准备容器中的启动进程
  94. startContainer()
  95. --org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 容器中启动的进程
  96. --val commands = prepareCommand()
  97. org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
  98. ①本身就是一个通信端点
  99. ②var driver: Option[RpcEndpointRef] = None
  100. RpcEndpointRef: 某个通信端点的引用
  101. 类比为网络中的一个通讯设备都有唯一的电话号
  102. ------------------
  103. 第一部分: YarnCoarseGrainedExecutorBackend进程启动
  104. --YarnCoarseGrainedExecutorBackend.main
  105. -- CoarseGrainedExecutorBackend.run(backendArgs, createFn)
  106. //向Driver请求spark的配置信息
  107. --val executorConf = new SparkConf
  108. // 当前进程在网络中有一个通信的别名称为Executor
  109. -- env.rpcEnv.setupEndpoint("Executor",
  110. backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
  111. //阻塞,一直运行,除非Driver发送停止命令
  112. --env.rpcEnv.awaitTermination()
  113. constructor -> onStart -> receive* -> onStop
  114. --------------------------------------
  115. 第二部分: YarnCoarseGrainedExecutorBackend进程向Driver发送注册请求
  116. onStart
  117. // ref:RpcEndpointRef 代表Driver的通信端点引用(电话号)
  118. // 向Driver发RegisterExecutor消息,请求答复一个Boolean的值
  119. -- ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
  120. extractAttributes, _resources, resourceProfile.id))
  121. }(ThreadUtils.sameThread).onComplete {
  122. case Success(_) =>
  123. //成功,自己给自己发 RegisteredExecutor消息
  124. self.send(RegisteredExecutor)
  125. //失败,进程就退出
  126. case Failure(e) =>
  127. exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
  128. -------------------------------------
  129. 第三部分: Driver处理注册请求
  130. DriverEndPoint.receiveAndReply
  131. --case RegisterExecutor
  132. -- ①判断是否已经注册过了,如果注册过了,回复失败; 判断是否拉黑,拉黑,注册失败,
  133. -- 否则注册,注册完成后,回复true
  134. context.reply(true)
  135. ----------------------------------------
  136. 第四部分: YarnCoarseGrainedExecutorBackend 注册成功后
  137. YarnCoarseGrainedExecutorBackend.receive
  138. -- case RegisteredExecutor =>
  139. logInfo("Successfully registered with driver")
  140. try {
  141. // Spark的计算者,维护了一个线程池,用来计算各种Task
  142. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
  143. resources = _resources)
  144. // 给Driver发LaunchedExecutor消息
  145. driver.get.send(LaunchedExecutor(executorId))
  146. -----------------------------------------------------
  147. 第五部分: Driver处理LaunchedExecutor请求
  148. DriverEndPoint.receive
  149. -- case LaunchedExecutor(executorId) =>
  150. executorDataMap.get(executorId).foreach { data =>
  151. data.freeCores = data.totalCores
  152. }
  153. //Driver向Executor发送工作任务(Task)
  154. makeOffers(executorId)
  155. //从调度池中,根据TaskSet的优先级,调度其中的Task。
  156. // 按照轮流发送的原则将Task调度到哦所有的Executor,保证负载均衡。
  157. //调度应该发往次Executor的Task
  158. --scheduler.resourceOffers(workOffers)
  159. -- launchTasks(taskDescs)
  160. -- val serializedTask = TaskDescription.encode(task)
  161. //为要发送的Executor打个招呼,发送Task
  162. --executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
  163. --------------------------------------------------
  164. 第六部分: Executor收到Task
  165. -- case LaunchTask(data) =>
  166. if (executor == null) {
  167. exitExecutor(1, "Received LaunchTask command but executor was null")
  168. } else {
  169. // 反序列化得到Task的描述(TaskDescription)
  170. val taskDesc = TaskDescription.decode(data.value)
  171. logInfo("Got assigned task " + taskDesc.taskId)
  172. taskResources(taskDesc.taskId) = taskDesc.resources
  173. //启动Task的运算
  174. executor.launchTask(this, taskDesc)
  175. //创建一个Task运行的线程
  176. -- val tr = new TaskRunner(context, taskDescription)
  177. runningTasks.put(taskDescription.taskId, tr)
  178. // 启动线程
  179. threadPool.execute(tr)
  180. --TaskRunner.run()
  181. //构造可以运行的Task对象
  182. --task = ser.deserialize[Task[Any]]
  183. //运行Task获取结果
  184. -- val res = task.run()
  185. // ShuffleMapTask 或 ResultTask
  186. --runTask(context)
  187. }

job的stage的task的调度

  • Application: 应用
    Job: 任务 .—>一个行动算子一个job
    stage: 阶段
    task: 单个任务
    一个SparkContext对应一个Application
    一个Application对应多个Job,一个action(行动算子)产生一个job
    image.png
    一个job中对应多个stage,stage的个数 = shuffle个数(shufflerMapStage) + 1(ResultStage),也就是说一个action对应一个resultStage
    * 一个stage对应多个task,task的个数 = 当前stage中最后一个rdd的分区数
    一个task也对应多个stage
    只有Executor的代码会产生job,Driver端执行的代码不走job
    stage与task是交叉关系
    下图是一个job:
    image.png

    task源码流程概述

    image.png
    在driver中:
    rdd.collect处理各分区的task汇聚成job
    通过dagScheduler.handleJobSubmitted方法,开始创建stage
    递归依次从后往前找,将ResultStage的所有祖先 ShuffleMapStage都创建完毕,最后才创建最后一个Stage:finalStage = createResultStage
    然后提交ResultStage、ShuffleMapStage。
    之后根据Stage类型创建对应的Task,(ShuffleMapStage =>new ShuffleMapTask,ResultStage =>new ResultTask)
    task个数根据当前Stage要计算的分区数(最后一个stage的分区数)确定

task在hadoop的mr里和在spark的区别

hadoop中:maptask、reducetask:进程级别
spark中:ShuffleMapTask 或ResultTask:线程级别

task调度源码解释

  1. Task的产生
  2. --------------
  3. RDD.collect
  4. --val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  5. //判断是否存在闭包,闭包变量是否可以序列化
  6. -- val cleanedFunc = clean(func)
  7. -- dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  8. -- val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  9. // event处理循环,放入一个事件 JobSubmitted ,提交job
  10. // eventProcessLoop = new DAGSchedulerEventProcessLoop
  11. -- eventProcessLoop.post(JobSubmitted())
  12. --EventLoop.onReceive(event)
  13. -- doOnReceive(event)
  14. --case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
  15. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  16. ----------------------
  17. dagScheduler.handleJobSubmitted
  18. // Job的最后一个阶段stage
  19. --var finalStage: ResultStage = null
  20. -- finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
  21. -- 递归依次从后往前找,将ResultStage的所有祖先 ShuffleMapStage都创建完毕,最后才创建最后一个Stage
  22. -- val parents = getOrCreateParentStages(rdd, jobId)
  23. --val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  24. // ResultStage创建完毕后,才会生成Job
  25. -- val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  26. //提交ResultStage
  27. --submitStage(finalStage)
  28. //查看当前阶段有没有未提交的父阶段,按照父阶段的ID排序
  29. --val missing = getMissingParentStages(stage).sortBy(_.id)
  30. //如果父阶段未提交,按照顺序依次提交,最后提交自己
  31. --if (missing.isEmpty) {
  32. logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
  33. submitMissingTasks(stage, jobId.get)
  34. } else {
  35. for (parent <- missing) {
  36. submitStage(parent)
  37. }
  38. waitingStages += stage
  39. }
  40. // 根据Stage类型创建对应的Task,创建时,根据当前Stage要计算的分区数确定
  41. // 如何知道每一个Stage要计算的partition的数量? 取决于Stage最后一个RDD的分区数
  42. --val tasks: Seq[Task[_]] = try {
  43. val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
  44. stage match {
  45. case stage: ShuffleMapStage =>
  46. stage.pendingPartitions.clear()
  47. partitionsToCompute.map { id =>
  48. val locs = taskIdToLocations(id)
  49. val part = partitions(id)
  50. stage.pendingPartitions += id
  51. new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
  52. taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
  53. Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
  54. }
  55. case stage: ResultStage =>
  56. partitionsToCompute.map { id =>
  57. val p: Int = stage.partitions(id)
  58. val part = partitions(p)
  59. val locs = taskIdToLocations(id)
  60. new ResultTask(stage.id, stage.latestInfo.attemptNumber,
  61. taskBinary, part, locs, id, properties, serializedTaskMetrics,
  62. Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
  63. stage.rdd.isBarrier())
  64. }
  65. }
  66. //将每个Stage的task放入调度池,通知Driver,去调度
  67. // TaskSet: 包含了一个Stage所有的Tasks, 一个Task的集合
  68. --taskScheduler.submitTasks(new TaskSet(
  69. tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
  70. //TaskSetManager: 追踪每一个TaskSet中所有Task的运行情况,在它们失败时重启。通过延迟调度
  71. // 处理位置敏感的调度,调度每个Task
  72. --val manager = createTaskSetManager(taskSet, maxTaskFailures)
  73. //将TaskSetManager加入调度池(FIFO | FAIR),不同的调度池,会决定不同的TaskSet被调度的优先级
  74. -- schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  75. -- backend.reviveOffers()
  76. //给Driver发消息,告诉Driver有新的TaskSetManager入池了,可以开始发送了
  77. -- driverEndpoint.send(ReviveOffers)
  78. -- case ReviveOffers =>
  79. makeOffers()

shuffler

什么是shuffler

在Spark中,很多算子都会引起RDD中数据的重分区!新的分区被创建,旧的分区被合并或数据被重新分配!在重分区的过程中,如果数据发生了跨节点的移动,就称为shuffle!

shuffle的作用就是为了在不同的task中交换数据!

hashshuffler(过时)

对应hadoop中的MR来解释

每个maptask(spark task)的数据会配分配到多个reducetask中,
N个maptask(spark task)分配到M个reducetask中就是生成:M*N个小文件
image.png

优化后的hashshuffler

原理:
core number: M
将所有task的内容写进一个core中
reduceTask个数:N
M*N个小文件

应用:
启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。

image.png

SortShuffle(sort-based shuffle)

随着Hadoop2.0的发布,Spark借鉴了Hadoop2.0中的shuffle过程(sort-based shuffle)!

本质:
有由索引实现排序

sort-based shuffle核心要义:
1)一个MapTask最终只生成一个数据文件,
2)这一个数据文件中包含2个文件:
一个是包含有若干分区的数据文件,
另一个是包含索引的元数据文件,其中记录了分区的边界!
在产生文件时,默认SortShuffleWriter会进行排序!

3)并不是所有的sort-based shuffle都会对shuffle写出的数据进行排序!:


对比Hadoop2.0的shuffle:
在MapTask上,先分区———>溢写前,进行排序——->溢写为一个片段
所有片段全部溢写完成后———->merge———->合并为一个总的文件!

区别
hadoop: MapTask: map————->sort————->merge
ReduceTask: sort————->reduce

spark : MapTask: map(各种算子)————->sort————->merge
ReduceTask: merge——->算子(reduce阶段不排序,算子中排序)

spark的shuffle在ReduceTask端不排序!
原因:
1)hadoop的reduce是一个迭代器,且只有一个map和reduce
如果有序:
(a,1),(a,1),(b,1)则输出(a,2),(b,1)
如果无序如:
(a,1),(b,1),(a,1) 则输出(a,1),(b,1),(a,1)

2)而spark会有一些算子实现排序:

  1. - `mapPartitions` to sort each partition using, for example, `.sorted`
  2. - `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning
  3. - `sortBy` to make a globally ordered RDD

普通SortShuffle

在该模式下,数据会先写入一个数据结构,reduceByKey写入Map,一边通过Map局部聚合,一遍写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个Task过程会产生多个临时文件。
最后在每个Task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
image.png

bypass SortShuffle

bypass运行机制的触发条件如下:
1) shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200。
2) 不是聚合类的shuffle算子(比如reduceByKey)。
此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟优化后的的HashShuffleManager是一模一样的,只多了创建的索引文件,通过索引文件,节省了排序功能,提升性能
而该机制与普通SortShuffleManager运行机制的不同在于:不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
image.png