ShuffleMapTask.scala
// ShuffleMapTask的 runTask 有 MapStatus返回值override def runTask(context: TaskContext): MapStatus = {// Deserialize the RDD using the broadcast variable.val threadMXBean = ManagementFactory.getThreadMXBeanval deserializeStartTime = System.currentTimeMillis()val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime} else 0L// 对task要处理的数据,做反序列化操作/*问题:多个task在executor中并发运行,数据可能都不在一台机器上,一个stage处理的rdd都是一样的task怎么拿到自己要处理的数据的?答案:通过broadcast value 广播变量获取*/val ser = SparkEnv.get.closureSerializer.newInstance()val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime} else 0Lvar writer: ShuffleWriter[Any, Any] = nulltry {// 拿到shuffleManagerval manager = SparkEnv.get.shuffleManager// 拿到shuffleWriterwriter = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)// 首先,调用rdd的iterator方法,并且传入了当前要处理的partition// 核心逻辑就在rdd的iterator()方法中// 执行完成rdd之后,rdd或返回处理过后的partition数据,这些数据通过shuffleWriter// 在经过HashPartitioner写入对应的分区中writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])// 返回结果 MapStatus ,里面封装了ShuffleMapTask存储在哪里,其实就是BlockManager相关信息writer.stop(success = true).get} catch {case e: Exception =>try {if (writer != null) {writer.stop(success = false)}} catch {case e: Exception =>log.debug("Could not stop writer", e)}throw e}}
ShuffledRDD.scala
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]// ResultTask或ShuffleMapTask执行到ShuffledRDD的时候,计算当前RDD的partition数据// 会调用ShuffleManager的getReader() 获取到HashShuffleReader,然后调用read()方法// 读取ResultTask或ShuffleMapTask的数据SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]}
BlockStoreShuffleReader.scala
override def read(): Iterator[Product2[K, C]] = {// ResultTask在读取数据的时候,调用ShuffleBlockFetcherIterator从那个DAGSchduler的mapOutputTracker中获取数据// 通过BlockManager从对应的位置读取val blockFetcherItr = new ShuffleBlockFetcherIterator(context,blockManager.shuffleClient,blockManager,mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),// Note: we use getSizeAsMb when no suffix is provided for backwards compatibilitySparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)// Wrap the streams for compression based on configurationval wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>blockManager.wrapForCompression(blockId, inputStream)
ShuffleBlockFetcherIterator.scala
private[this] def initialize(): Unit = {// Add a task completion callback (called in both success case and failure case) to cleanup.context.addTaskCompletionListener(_ => cleanup())// Split local and remote blocks.val remoteRequests = splitLocalRemoteBlocks()// Add the remote requests into our queue in a random orderfetchRequests ++= Utils.randomize(remoteRequests)// Send out initial requests for blocks, up to our maxBytesInFlightfetchUpToMaxBytes()val numFetches = remoteRequests.size - fetchRequests.sizelogInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))// Get Local BlocksfetchLocalBlocks()logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))}
private def fetchUpToMaxBytes(): Unit = {// Send fetch requests up to maxBytesInFlight// 这里有一个重要的参数,max.bytes.in.flight 它决定了最多能拉取多少数据到本地// 然后就开始执行reduce中自定义算子while (fetchRequests.nonEmpty &&(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {// 发送请求到远程获取数据sendRequest(fetchRequests.dequeue())}}
