Spark 计算速度远胜于 Hadoop 的原因之一就在于中间结果是缓存在内存而不是直接写入到 disk,本文尝试分析 Spark 中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系。

6.11.1 存储子系统概览

Storage 模块主要分为两层:

  • 通信层:storage 模块采用的是 master-slave 结构来实现通信层,master 和 slave 之间传输控制信息、状态信息,这些都是通过通信层来实现的。
  • 存储层:storage 模块需要把数据存储到 disk 或是 memory 上面,有可能还需 replicate(复制) 到远端,这都是由存储层来实现和提供相应接口。

而其他模块若要和 storage 模块进行交互,storage 模块提供了统一的操作类 BlockManager,外部类与 storage 模块打交道都需要通过调用 BlockManager 相应接口来实现。


上图是 Spark 存储子系统中几个主要模块的关系示意图,现简要说明如下:

  • CacheManagerRDD 在进行计算的时候,通过 CacheManager 来获取数据,并通过 CacheManager 来存储计算结果
  • BlockManagerCacheManager 在进行数据读取和存取的时候主要是依赖 BlockManager 接口来操作,BlockManager 决定数据是从内存(MemoryStore) 还是从磁盘(DiskStore) 中获取。
  • MemoryStore:负责将数据保存在内存或从内存读取。
  • DiskStore:负责将数据写入磁盘或从磁盘读入。
  • BlockManagerWorker:数据写入本地的 MemoryStore 或 DiskStore 是一个同步操作,为了容错还需要将数据复制到别的计算结点,以防止数据丢失的时候还能够恢复,数据复制的操作是异步完成,由 BlockManagerWorker 来处理这一部分事情。
  • ConnectionManager:负责与其它计算结点建立连接,并负责数据的发送和接收。
  • BlockManagerMaster:注意该模块只运行在 Driver Application 所在的 Executor,功能是负责记录下所有 BlockIds 存储在哪个 SlaveWorker 上,比如 RDD Task 运行在机器 A,所需要的 BlockId 为 3,但在机器 A 上没有 BlockId 为 3 的数值,这个时候 Slave worker 需要通过 BlockManager 向 BlockManagerMaster 询问数据存储的位置,然后再通过 ConnectionManager 去获取。

6.11.2 启动过程分析

上述的各个模块由 SparkEnv 来创建,创建过程在 SparkEnv.create 中完成,代码如下:

  1. val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
  2. "BlockManagerMaster",
  3. new BlockManagerMasterActor(isLocal, conf)), conf)
  4. val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
  5. val connectionManager = blockManager.connectionManager
  6. val broadcastManager = new BroadcastManager(isDriver, conf)
  7. val cacheManager = new CacheManager(blockManager)

下面这段代码容易让人疑惑,看起来像是在所有的 cluster node 上都创建了 BlockManagerMasterActor,其实不然,仔细看 registerOrLookup 函数的实现。如果当前节点是 driver 则创建这个 actor,否则建立到 driver 的连接。代码如下:

  1. def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
  2. if (isDriver) {
  3. logInfo("Registering " + name)
  4. actorSystem.actorOf(Props(newActor), name = name)
  5. } else {
  6. val driverHost: String = conf.get("", "localhost")
  7. val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  8. Utils.checkHost(driverHost, "Expected hostname")
  9. val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
  10. val timeout = AkkaUtils.lookupTimeout(conf)
  11. logInfo(s"Connecting to $name: $url")
  12. Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
  13. }
  14. }

初始化过程中一个主要的动作就是 BlockManager 需要向 BlockManagerMaster 发起注册。

6.11.3 通信层


BlockManager 包装了 BlockManagerMaster,发送信息包装成 BlockManagerInfo。Spark 在 Driver 和 Worker 端都创建各自的 BlockManager,并通过 BlockManagerMaster 进行通信,通过 BlockManager 对 Storage 模块进行操作。

BlockManager 对象在 SparkEnv.create 函数中进行创建,代码如下:

  1. def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = {
  2. if (isDriver) {
  3. logInfo("Registering " + name)
  4. rpcEnv.setupEndpoint(name, endpointCreator)
  5. } else {
  6. RpcUtils.makeDriverRef(name, conf, rpcEnv)
  7. }
  8. }
  9. ......
  10. val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
  11. BlockManagerMaster.DRIVER_ENDPOINT_NAME,
  12. new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
  13. conf, isDriver)
  14. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
  15. serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,numUsableCores)

并且在创建之前对当前节点是否是 Driver 进行了判断。如果是,则创建这个 Endpoint;否则,创建 Driver 的连接。

在创建 BlockManager 之后,BlockManager 会调用 initialize 方法初始化自己。并且初始化的时候,会调用 BlockManagerMaster 向 Driver 注册自己,同时,在注册时也启动了 Slave Endpoint。另外,向本地 shuffle 服务器注册 Executor 配置,如果存在的话。代码如下:

  1. def initialize(appId: String): Unit = {
  2. ......
  3. master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
  4. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
  5. registerWithExternalShuffleServer()
  6. }
  7. }

BlockManagerMaster 将注册请求包装成 RegisterBlockManager 注册到 Driver。Driver 的 BlockManagerMasterEndpoint 会调用 register 方法,通过对消息 BlockManagerInfo 检查,向 Driver 注册,代码如下:

  1. private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
  2. val time = System.currentTimeMillis()
  3. if (!blockManagerInfo.contains(id)) {
  4. blockManagerIdByExecutor.get(id.executorId) match {
  5. case Some(oldId) =>
  6. logError("Got two different block manager registrations on same executor - "
  7. + s" will replace old one $oldId with new one $id")
  8. removeExecutor(id.executorId)
  9. case None =>
  10. }
  11. logInfo("Registering block manager %s with %s RAM, %s".format(
  12. id.hostPort, Utils.bytesToString(maxMemSize), id))
  13. blockManagerIdByExecutor(id.executorId) = id
  14. blockManagerInfo(id) = new BlockManagerInfo(
  15. id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
  16. }
  17., id, maxMemSize))
  18. }

不难发现 BlockManagerInfo 对象被保存到 Map 映射中。在通信层中 BlockManagerMaster 控制着消息的流向,这里采用了模式匹配,所有的消息模式都在 BlockManagerMessage 中。

6.11.4 存储层

Spark Storage 的最小存储单位是 block,所有的操作都是以 block 为单位进行的。 在 BlockManager 被创建的时候 MemoryStoreDiskStore 对象就被创建出来了。代码如下:

  1. val diskBlockManager = new DiskBlockManager(this, conf)
  2. private[spark] val memoryStore = new MemoryStore(this, maxMemory)
  3. private[spark] val diskStore = new DiskStore(this, diskBlockManager) Disk Store

由于当前的 Spark 版本对 Disk Store 进行了更细粒度的分工,把对文件的操作提取出来放到了 DiskBlockManager 中,DiskStore 仅仅负责数据的存储和读取。

DiskStore 会配置多个文件目录,Spark 会在不同的文件目录下创建文件夹,其中文件夹的命名方式是:spark-UUID(随机 UUID 码)。Disk Store 在存储的时候创建文件夹。并且根据【高内聚,低耦合】原则,这种服务型的工具代码就放到了 Utils 中(调用路径:DiskStore.putBytes —> DiskBlockManager.createLocalDirs —> Utils.createDirectory),代码如下:

  1. def createDirectory(root: String, namePrefix: String = "spark"): File = {
  2. var attempts = 0
  3. val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
  4. var dir: File = null
  5. while (dir == null) {
  6. attempts += 1
  7. if (attempts > maxAttempts) {
  8. throw new IOException("Failed to create a temp directory (under " + root + ") after " +
  9. maxAttempts + " attempts!")
  10. }
  11. try {
  12. dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
  13. if (dir.exists() || !dir.mkdirs()) {
  14. dir = null
  15. }
  16. } catch { case e: SecurityException => dir = null; }
  17. }
  18. dir.getCanonicalFile
  19. }

在 DiskBlockManager 里,每个 block 都被存储为一个 file,通过计算 blockId 的 hash 值,将 block 映射到文件中。

  1. def getFile(filename: String): File = {
  2. val hash = Utils.nonNegativeHash(filename)
  3. val dirId = hash % localDirs.length
  4. val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
  5. val subDir = subDirs(dirId).synchronized {
  6. val old = subDirs(dirId)(subDirId)
  7. if (old != null) {
  8. old
  9. } else {
  10. val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
  11. if (!newDir.exists() && !newDir.mkdir()) {
  12. throw new IOException(s"Failed to create local dir in $newDir.")
  13. }
  14. subDirs(dirId)(subDirId) = newDir
  15. newDir
  16. }
  17. }
  18. new File(subDir, filename)
  19. }
  20. def getFile(blockId: BlockId): File = getFile(

通过 hash 值的取模运算,求出 dirIdsubDirId。然后,在从 subDirs 中找到 subDir,如果 subDir 不存在,则创建一个新 subDir。最后,以 subDir 为路径,blockId 的 name 属性为文件名,新建该文件。 文件创建完之后,那么 Spark 就会在 DiskStore 中向文件写与之映射的 block,代码如下:

  1. override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
  2. val bytes = _bytes.duplicate()
  3. logDebug(s"Attempting to put block $blockId")
  4. val startTime = System.currentTimeMillis
  5. val file = diskManager.getFile(blockId)
  6. val channel = new FileOutputStream(file).getChannel
  7. Utils.tryWithSafeFinally {
  8. while (bytes.remaining > 0) {
  9. channel.write(bytes)
  10. }
  11. } {
  12. channel.close()
  13. }
  14. val finishTime = System.currentTimeMillis
  15. logDebug("Block %s stored as %s file on disk in %d ms".format(
  16. file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
  17. PutResult(bytes.limit(), Right(bytes.duplicate()))
  18. }

读取过程就简单了,DiskStore 根据 blockId 读取与之映射的 file 内容,当然,这中间需要从 DiskBlockManager 中得到文件信息。代码如下:

  1. private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
  2. val channel = new RandomAccessFile(file, "r").getChannel
  3. Utils.tryWithSafeFinally {
  4. if (length < minMemoryMapBytes) {
  5. val buf = ByteBuffer.allocate(length.toInt)
  6. channel.position(offset)
  7. while (buf.remaining() != 0) {
  8. if ( == -1) {
  9. throw new IOException("Reached EOF before filling buffer\n" +
  10. s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
  11. }
  12. }
  13. buf.flip()
  14. Some(buf)
  15. } else {
  16. Some(, offset, length))
  17. }
  18. } {
  19. channel.close()
  20. }
  21. }
  22. override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
  23. val file = diskManager.getFile(
  24. getBytes(file, 0, file.length)
  25. } Memory Store

相对 Disk Store,Memory Store 就显得容易很多。Memory Store 用一个 LinkedHashMap 来管理,其中 KeyblockIdValueMemoryEntry 样例类,MemoryEntry 存储着数据信息。代码如下:

  1. private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
  2. private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)

在 MemoryStore 中存储 block 的前提是当前内存有足够的空间存放。通过对 tryToPut 函数的调用对内存空间进行判断。代码如下:

  1. def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
  2. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
  3. val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
  4. val data =
  5. if (putAttempt.success) {
  6. assert(bytes.limit == size)
  7. Right(bytes.duplicate())
  8. } else {
  9. null
  10. }
  11. PutResult(size, data, putAttempt.droppedBlocks)
  12. }

tryToPut 函数中,通过调用 enoughFreeSpace 函数判断内存空间。如果内存空间足够,那么就把 block 放到 LinkedHashMap 中;如果内存不足,那么就告诉 BlockManager 内存不足,如果允许 DiskStore,那么就把该 block 放到 disk 上。代码如下:

  1. private def tryToPut(blockId: BlockId, value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = {
  2. var putSuccess = false
  3. val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
  4. accountingLock.synchronized {
  5. val freeSpaceResult = ensureFreeSpace(blockId, size)
  6. val enoughFreeSpace = freeSpaceResult.success
  7. droppedBlocks ++= freeSpaceResult.droppedBlocks
  8. if (enoughFreeSpace) {
  9. val entry = new MemoryEntry(value(), size, deserialized)
  10. entries.synchronized {
  11. entries.put(blockId, entry)
  12. currentMemory += size
  13. }
  14. val valuesOrBytes = if (deserialized) "values" else "bytes"
  15. logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
  16. blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
  17. putSuccess = true
  18. } else {
  19. lazy val data = if (deserialized) {
  20. Left(value().asInstanceOf[Array[Any]])
  21. } else {
  22. Right(value().asInstanceOf[ByteBuffer].duplicate())
  23. }
  24. val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
  25. droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
  26. }
  27. releasePendingUnrollMemoryForThisTask()
  28. }
  29. ResultWithDroppedBlocks(putSuccess, droppedBlocks)
  30. }

MemoryStore 读取 block 也很简单,只需要从 LinkedHashMap 中取出 blockIdValue 即可。代码如下:

  1. override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
  2. val entry = entries.synchronized {
  3. entries.get(blockId)
  4. }
  5. if (entry == null) {
  6. None
  7. } else if (entry.deserialized) {
  8. Some(entry.value.asInstanceOf[Array[Any]].iterator)
  9. } else {
  10. val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate()
  11. Some(blockManager.dataDeserialize(blockId, buffer))
  12. }
  13. }

6.11.5 数据写入过程分析



  1. RDD.iterator() 是与 storage 子系统交互的入口。
  2. CacheManager.getOrCompute 调用 BlockManagerput 接口来写入数据。
  3. 数据优先写入到 MemoryStore 即内存,如果 MemoryStore 中的数据已满则将最近使用次数不频繁的数据写入到磁盘
  4. 通知 BlockManagerMaster 有新的数据写入,在 BlockManagerMaster 中保存元数据
  5. 将写入的数据与其它 slave worker 进行同步,一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即 replicanumber=1

其实,我们在 put 和 get block 的时候并没有那么复杂,前面的细节 BlockManager 都包装好了,我们只需要调用 BlockManager 中的 put 和 get 函数即可。


  1. def putBytes(
  2. blockId: BlockId,
  3. bytes: ByteBuffer,
  4. level: StorageLevel,
  5. tellMaster: Boolean = true,
  6. effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
  7. require(bytes != null, "Bytes is null")
  8. doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
  9. }
  10. private def doPut(
  11. blockId: BlockId,
  12. data: BlockValues,
  13. level: StorageLevel,
  14. tellMaster: Boolean = true,
  15. effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
  16. require(blockId != null, "BlockId is null")
  17. require(level != null && level.isValid, "StorageLevel is null or invalid")
  18. effectiveStorageLevel.foreach {
  19. level => require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
  20. }
  21. val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
  22. val putBlockInfo = {
  23. val tinfo = new BlockInfo(level, tellMaster)
  24. val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
  25. if (oldBlockOpt.isDefined) {
  26. if (oldBlockOpt.get.waitForReady()) {
  27. logWarning(s"Block $blockId already exists on this machine; not re-adding it")
  28. return updatedBlocks
  29. }
  30. oldBlockOpt.get
  31. } else {
  32. tinfo
  33. }
  34. }
  35. val startTimeMs = System.currentTimeMillis
  36. var valuesAfterPut: Iterator[Any] = null
  37. var bytesAfterPut: ByteBuffer = null
  38. var size = 0L
  39. val putLevel = effectiveStorageLevel.getOrElse(level)
  40. val replicationFuture = data match {
  41. case b: ByteBufferValues if putLevel.replication > 1 =>
  42. val bufferView = b.buffer.duplicate()
  43. Future {
  44. replicate(blockId, bufferView, putLevel)
  45. }(futureExecutionContext)
  46. case _ => null
  47. }
  48. putBlockInfo.synchronized {
  49. logTrace("Put for block %s took %s to get into synchronized block".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  50. var marked = false
  51. try {
  52. val (returnValues, blockStore: BlockStore) = {
  53. if (putLevel.useMemory) {
  54. (true, memoryStore)
  55. } else if (putLevel.useOffHeap) {
  56. (false, externalBlockStore)
  57. } else if (putLevel.useDisk) {
  58. (putLevel.replication > 1, diskStore)
  59. } else {
  60. assert(putLevel == StorageLevel.NONE)
  61. throw new BlockException(blockId, s"Attempted to put block $blockId without specifying storage level!")
  62. }
  63. }
  64. val result = data match {
  65. case IteratorValues(iterator) => blockStore.putIterator(blockId, iterator, putLevel, returnValues)
  66. case ArrayValues(array) => blockStore.putArray(blockId, array, putLevel, returnValues)
  67. case ByteBufferValues(bytes) =>
  68. bytes.rewind()
  69. blockStore.putBytes(blockId, bytes, putLevel)
  70. }
  71. size = result.size
  72. match {
  73. case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
  74. case Right (newBytes) => bytesAfterPut = newBytes
  75. case _ =>
  76. }
  77. if (putLevel.useMemory) {
  78. result.droppedBlocks.foreach { updatedBlocks += _ }
  79. }
  80. val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
  81. if (putBlockStatus.storageLevel != StorageLevel.NONE) {
  82. marked = true
  83. putBlockInfo.markReady(size)
  84. if (tellMaster) {
  85. reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
  86. }
  87. updatedBlocks += ((blockId, putBlockStatus))
  88. }
  89. } finally {
  90. if (!marked) {
  91. blockInfo.remove(blockId)
  92. putBlockInfo.markFailure()
  93. logWarning(s"Putting block $blockId failed")
  94. }
  95. }
  96. }
  97. logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  98. if (putLevel.replication > 1) {
  99. data match {
  100. case ByteBufferValues(bytes) =>
  101. if (replicationFuture != null) {
  102. Await.ready(replicationFuture, Duration.Inf)
  103. }
  104. case _ =>
  105. val remoteStartTime = System.currentTimeMillis
  106. if (bytesAfterPut == null) {
  107. if (valuesAfterPut == null) {
  108. throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
  109. }
  110. bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
  111. }
  112. replicate(blockId, bytesAfterPut, putLevel)
  113. logDebug("Put block %s remotely took %s".format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
  114. }
  115. }
  116. BlockManager.dispose(bytesAfterPut)
  117. if (putLevel.replication > 1) {
  118. logDebug("Putting block %s with replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  119. } else {
  120. logDebug("Putting block %s without replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  121. }
  122. updatedBlocks
  123. }
  124. }

对于 doPut 函数,主要做了以下几个操作:

  1. 创建 BlockInfo 对象存储 block 信息。
  2. 将 BlockInfo 加锁,然后根据 Storage Level 判断存储到 Memory 还是 Disk。同时,对于已经准备好读的 BlockInfo 要进行解锁。
  3. 根据 block 的副本数量决定是否向远程发送副本 序列化与否

写入的具体内容可以是序列化之后的 bytes 也可以是没有序列化的 value。此处有一个对 scala 的语法中 Either, Left, Right 关键字的理解。

6.11.6 数据读取过程分析

  1. def get(blockId: BlockId): Option[Iterator[Any]] = {
  2. val local = getLocal(blockId)
  3. if (local.isDefined) {
  4. logInfo("Found block %s locally".format(blockId))
  5. return local
  6. }
  7. val remote = getRemote(blockId)
  8. if (remote.isDefined) {
  9. logInfo("Found block %s remotely".format(blockId))
  10. return remote
  11. }
  12. None
  13. } 本地读取

首先在查询本机MemoryStoreDiskStore 中是否有所需要的 block 数据存在,如果没有则发起远程数据获取 远程读取

远程获取调用路径, getRemote —> doGetRemote, 在 doGetRemote 中最主要的就是调用 BlockManagerWorker.syncGetBlock 来从远程获得数据。

syncGetBlock 代码:

  1. def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
  2. val blockManager = blockManagerWorker.blockManager
  3. val connectionManager = blockManager.connectionManager
  4. val blockMessage = BlockMessage.fromGetBlock(msg)
  5. val blockMessageArray = new BlockMessageArray(blockMessage)
  6. val responseMessage = connectionManager.sendMessageReliablySync(
  7. toConnManagerId, blockMessageArray.toBufferMessage)
  8. responseMessage match {
  9. case Some(message) => {
  10. val bufferMessage = message.asInstanceOf[BufferMessage]
  11. logDebug("Response message received " + bufferMessage)
  12. BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
  13. logDebug("Found " + blockMessage)
  14. return blockMessage.getData
  15. })
  16. }
  17. case None => logDebug("No response message received")
  18. }
  19. null
  20. }

上述这段代码中最有意思的莫过于 sendMessageReliablySync,远程数据读取毫无疑问是一个异步 i/o 操作,这里的代码怎么写起来就像是在进行同步的操作一样呢。也就是说如何知道对方发送回来响应的呢? 别急,继续去看看 sendMessageReliablySync 的定义:

  1. def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
  2. : Future[Option[Message]] = {
  3. val promise = Promise[Option[Message]]
  4. val status = new MessageStatus(
  5. message, connectionManagerId, s => promise.success(s.ackMessage))
  6. messageStatuses.synchronized {
  7. messageStatuses += ((, status))
  8. }
  9. sendMessage(connectionManagerId, message)
  10. promise.future
  11. }

要是我说秘密在这里,你肯定会说我在扯淡,但确实在此处。注意到关键字 Promise 和 Future 没? 如果这个 future 执行完毕,返回 s.ackMessage。我们再看看这个 ackMessage 是在什么地方被写入的呢。看一看 ConnectionManager.handleMessage 中的代码片段:

  1. case bufferMessage: BufferMessage =>
  2. {
  3. if (authEnabled) {
  4. val res = handleAuthentication(connection, bufferMessage)
  5. if (res == true) {
  6. logDebug("After handleAuth result was true, returning")
  7. return
  8. }
  9. }
  10. if (bufferMessage.hasAckId) {
  11. val sentMessageStatus = messageStatuses. synchronized {
  12. messageStatuses.get(bufferMessage.ackId) match {
  13. case Some(status) =>{
  14. messageStatuses -= bufferMessage.ackId
  15. status
  16. }
  17. case None =>{
  18. throw new Exception("Could not find reference for received ack message " +
  20. null
  21. }
  22. }
  23. }
  24. sentMessageStatus. synchronized {
  25. sentMessageStatus.ackMessage = Some(message)
  26. sentMessageStatus.attempted = true
  27. sentMessageStatus.acked = true
  28. sentMessageStaus.markDone()
  29. }
  30. }
  31. }

注意:此处的所调用的 sentMessageStatus.markDone 就会调用在 sendMessageReliablySync 中定义的 promise.Success,不妨看看 MessageStatus 的定义:

  1. class MessageStatus(
  2. val message: Message,
  3. val connectionManagerId: ConnectionManagerId,
  4. completionHandler: MessageStatus => Unit) {
  5. var ackMessage: Option[Message] = None
  6. var attempted = false
  7. var acked = false
  8. def markDone() { completionHandler(this) }
  9. }

6.11.7 Partition 如何转化为 Block

storage 模块里面所有的操作都是和 block 相关的,但是在 RDD 里面所有的运算都是基于 partition 的,那么 partition 是如何与 block 对应上的呢? RDD 计算的核心函数是 iterator() 函数:

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  2. if (storageLevel != StorageLevel.NONE) {
  3. SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  4. } else {
  5. computeOrReadCheckpoint(split, context)
  6. }
  7. }

如果当前 RDD 的 storage level 不是 NONE 的话,表示该 RDD 在 BlockManager 中有存储,那么调用 CacheManager 中的 getOrCompute() 函数计算 RDD,在这个函数中 partition 和 block 发生了关系:

  1. 首先根据 RDD idpartition index 构造出 block id (rdd_xx_xx)
  2. 接着从 BlockManager 中取出相应的 block:  
    • 如果该 block 存在,表示此 RDD 在之前已经被计算过和存储在 BlockManager 中,因此取出即可,无需再重新计算。   
    • 如果该 block 不存在则需要调用 RDD 的 **computeOrReadCheckpoint()** 函数计算出新的 block,并将其存储到 BlockManager 中。

需要注意的是 block 的计算和存储是阻塞的,若另一线程也需要用到此 block 则需等到该线程 block 的 loading 结束。


  1. def getOrCompute[T](rdd:RDD[T],split:Partition,context:TaskContext,storageLevel:StorageLevel):Iterator[T] = {
  2. val key = "rdd_%d_%d".format(, split.index)
  3. logDebug("Looking for partition " + key)
  4. blockManager.get(key) match {
  5. case Some(values) =>
  6. return values.asInstanceOf[Iterator[T]]
  7. case None =>
  8. loading. synchronized {
  9. if (loading.contains(key)) {
  10. logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
  11. while (loading.contains(key)) {
  12. try {
  13. loading.wait()
  14. } catch {
  15. case _:
  16. Throwable =>}
  17. }
  18. logInfo("Finished waiting for %s".format(key))
  19. blockManager.get(key) match {
  20. case Some(values) =>
  21. return values.asInstanceOf[Iterator[T]]
  22. case None =>
  23. logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
  24. loading.add(key)
  25. }
  26. } else {
  27. loading.add(key)
  28. }
  29. }
  30. try {
  31. logInfo("Partition %s not found, computing it".format(key))
  32. val computedValues = rdd.computeOrReadCheckpoint(split, context)
  33. if (context.runningLocally) {
  34. return computedValues
  35. }
  36. val elements = new ArrayBuffer[Any]
  37. elements++ = computedValues
  38. blockManager.put(key, elements, storageLevel, true)
  39. return elements.iterator.asInstanceOf[Iterator[T]]
  40. } finally {
  41. loading. synchronized {
  42. loading.remove(key)
  43. loading.notifyAll()
  44. }
  45. }
  46. }

这样 RDDtransformationaction 就和 block 数据建立了联系,虽然抽象上我们的操作是在 partition 层面上进行的,但是 partition 最终还是被映射成为 block,因此实际上我们的所有操作都是对 block 的处理和存取

6.11.8 partition 和 block 的对应关系

在 RDD 中,核心的函数是 iterator()

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  2. if (storageLevel != StorageLevel.NONE) {
  3. SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  4. } else {
  5. computeOrReadCheckpoint(split, context)
  6. }
  7. }

如果当前 RDDstorage level 不是 NONE 的话,表示该 RDD 在 BlockManager 中有存储,那么调用 CacheManager 中的 getOrCompute 函数计算 RDD,在这个函数中 partition 和 block对应起来了:getOrCompute 函数会先构造 RDDBlockId其中 RDDBlockId 就把 block 和 partition 联系起来了,RDDBlockId 产生的 name 就是 BlockId 的 name 属性,形式是:rdd_rdd.id_partition.index

getOrCompute 代码:

  1. def getOrCompute[T](
  2. rdd: RDD[T],
  3. partition: Partition,
  4. context: TaskContext,
  5. storageLevel: StorageLevel): Iterator[T] = {
  6. val key = RDDBlockId(, partition.index)
  7. logDebug(s"Looking for partition $key")
  8. blockManager.get(key) match {
  9. case Some(blockResult) =>
  10. val existingMetrics = context.taskMetrics
  11. .getInputMetricsForReadMethod(blockResult.readMethod)
  12. existingMetrics.incBytesRead(blockResult.bytes)
  13. val iter =[Iterator[T]]
  14. new InterruptibleIterator[T](context, iter) {
  15. override def next(): T = {
  16. existingMetrics.incRecordsRead(1)
  18. }
  19. }
  20. case None =>
  21. val storedValues = acquireLockForPartition[T](key)
  22. if (storedValues.isDefined) {
  23. return new InterruptibleIterator[T](context, storedValues.get)
  24. }
  25. try {
  26. logInfo(s"Partition $key not found, computing it")
  27. val computedValues = rdd.computeOrReadCheckpoint(partition, context)
  28. if (context.isRunningLocally) {
  29. return computedValues
  30. }
  31. val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
  32. val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
  33. val metrics = context.taskMetrics
  34. val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
  35. metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
  36. new InterruptibleIterator(context, cachedValues)
  37. } finally {
  38. loading.synchronized {
  39. loading.remove(key)
  40. loading.notifyAll()
  41. }
  42. }
  43. }
  44. }

同时 getOrCompute 函数会对 block 进行判断:

  • 如果该 block 存在,表示此 RDD 在之前已经被计算过和存储在 BlockManager 中,因此取出即可,无需再重新计算。
  • 如果该 block 不存在则需要调用 RDD 的 computeOrReadCheckpoint() 函数计算出新的 block,并将其存储到 BlockManager 中。

需要注意的是 block 的计算和存储是阻塞的,若另一线程也需要用到此 block 则需等到该线程 block 的 loading 结束。