Spark 计算速度远胜于 Hadoop 的原因之一就在于中间结果是缓存在内存而不是直接写入到 disk,本文尝试分析 Spark 中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系。

6.11.1 存储子系统概览

Storage 模块主要分为两层:

  • 通信层:storage 模块采用的是 master-slave 结构来实现通信层,master 和 slave 之间传输控制信息、状态信息,这些都是通过通信层来实现的。
  • 存储层:storage 模块需要把数据存储到 disk 或是 memory 上面,有可能还需 replicate(复制) 到远端,这都是由存储层来实现和提供相应接口。

而其他模块若要和 storage 模块进行交互,storage 模块提供了统一的操作类 BlockManager,外部类与 storage 模块打交道都需要通过调用 BlockManager 相应接口来实现。

image.png

上图是 Spark 存储子系统中几个主要模块的关系示意图,现简要说明如下:

  • CacheManagerRDD 在进行计算的时候,通过 CacheManager 来获取数据,并通过 CacheManager 来存储计算结果
  • BlockManagerCacheManager 在进行数据读取和存取的时候主要是依赖 BlockManager 接口来操作,BlockManager 决定数据是从内存(MemoryStore) 还是从磁盘(DiskStore) 中获取。
  • MemoryStore:负责将数据保存在内存或从内存读取。
  • DiskStore:负责将数据写入磁盘或从磁盘读入。
  • BlockManagerWorker:数据写入本地的 MemoryStore 或 DiskStore 是一个同步操作,为了容错还需要将数据复制到别的计算结点,以防止数据丢失的时候还能够恢复,数据复制的操作是异步完成,由 BlockManagerWorker 来处理这一部分事情。
  • ConnectionManager:负责与其它计算结点建立连接,并负责数据的发送和接收。
  • BlockManagerMaster:注意该模块只运行在 Driver Application 所在的 Executor,功能是负责记录下所有 BlockIds 存储在哪个 SlaveWorker 上,比如 RDD Task 运行在机器 A,所需要的 BlockId 为 3,但在机器 A 上没有 BlockId 为 3 的数值,这个时候 Slave worker 需要通过 BlockManager 向 BlockManagerMaster 询问数据存储的位置,然后再通过 ConnectionManager 去获取。

6.11.2 启动过程分析

上述的各个模块由 SparkEnv 来创建,创建过程在 SparkEnv.create 中完成,代码如下:

  1. val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
  2. "BlockManagerMaster",
  3. new BlockManagerMasterActor(isLocal, conf)), conf)
  4. val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
  5. val connectionManager = blockManager.connectionManager
  6. val broadcastManager = new BroadcastManager(isDriver, conf)
  7. val cacheManager = new CacheManager(blockManager)

下面这段代码容易让人疑惑,看起来像是在所有的 cluster node 上都创建了 BlockManagerMasterActor,其实不然,仔细看 registerOrLookup 函数的实现。如果当前节点是 driver 则创建这个 actor,否则建立到 driver 的连接。代码如下:

  1. def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
  2. if (isDriver) {
  3. logInfo("Registering " + name)
  4. actorSystem.actorOf(Props(newActor), name = name)
  5. } else {
  6. val driverHost: String = conf.get("spark.driver.host", "localhost")
  7. val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  8. Utils.checkHost(driverHost, "Expected hostname")
  9. val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
  10. val timeout = AkkaUtils.lookupTimeout(conf)
  11. logInfo(s"Connecting to $name: $url")
  12. Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
  13. }
  14. }

初始化过程中一个主要的动作就是 BlockManager 需要向 BlockManagerMaster 发起注册。

6.11.3 通信层

**
image.png

BlockManager 包装了 BlockManagerMaster,发送信息包装成 BlockManagerInfo。Spark 在 Driver 和 Worker 端都创建各自的 BlockManager,并通过 BlockManagerMaster 进行通信,通过 BlockManager 对 Storage 模块进行操作。

BlockManager 对象在 SparkEnv.create 函数中进行创建,代码如下:

  1. def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = {
  2. if (isDriver) {
  3. logInfo("Registering " + name)
  4. rpcEnv.setupEndpoint(name, endpointCreator)
  5. } else {
  6. RpcUtils.makeDriverRef(name, conf, rpcEnv)
  7. }
  8. }
  9. ......
  10. val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
  11. BlockManagerMaster.DRIVER_ENDPOINT_NAME,
  12. new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
  13. conf, isDriver)
  14. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
  15. serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,numUsableCores)

并且在创建之前对当前节点是否是 Driver 进行了判断。如果是,则创建这个 Endpoint;否则,创建 Driver 的连接。

在创建 BlockManager 之后,BlockManager 会调用 initialize 方法初始化自己。并且初始化的时候,会调用 BlockManagerMaster 向 Driver 注册自己,同时,在注册时也启动了 Slave Endpoint。另外,向本地 shuffle 服务器注册 Executor 配置,如果存在的话。代码如下:

  1. def initialize(appId: String): Unit = {
  2. ......
  3. master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)
  4. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
  5. registerWithExternalShuffleServer()
  6. }
  7. }

BlockManagerMaster 将注册请求包装成 RegisterBlockManager 注册到 Driver。Driver 的 BlockManagerMasterEndpoint 会调用 register 方法,通过对消息 BlockManagerInfo 检查,向 Driver 注册,代码如下:

  1. private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
  2. val time = System.currentTimeMillis()
  3. if (!blockManagerInfo.contains(id)) {
  4. blockManagerIdByExecutor.get(id.executorId) match {
  5. case Some(oldId) =>
  6. logError("Got two different block manager registrations on same executor - "
  7. + s" will replace old one $oldId with new one $id")
  8. removeExecutor(id.executorId)
  9. case None =>
  10. }
  11. logInfo("Registering block manager %s with %s RAM, %s".format(
  12. id.hostPort, Utils.bytesToString(maxMemSize), id))
  13. blockManagerIdByExecutor(id.executorId) = id
  14. blockManagerInfo(id) = new BlockManagerInfo(
  15. id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
  16. }
  17. listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
  18. }

不难发现 BlockManagerInfo 对象被保存到 Map 映射中。在通信层中 BlockManagerMaster 控制着消息的流向,这里采用了模式匹配,所有的消息模式都在 BlockManagerMessage 中。

6.11.4 存储层

Spark Storage 的最小存储单位是 block,所有的操作都是以 block 为单位进行的。 在 BlockManager 被创建的时候 MemoryStoreDiskStore 对象就被创建出来了。代码如下:

  1. val diskBlockManager = new DiskBlockManager(this, conf)
  2. private[spark] val memoryStore = new MemoryStore(this, maxMemory)
  3. private[spark] val diskStore = new DiskStore(this, diskBlockManager)

6.11.4.1 Disk Store

由于当前的 Spark 版本对 Disk Store 进行了更细粒度的分工,把对文件的操作提取出来放到了 DiskBlockManager 中,DiskStore 仅仅负责数据的存储和读取。

DiskStore 会配置多个文件目录,Spark 会在不同的文件目录下创建文件夹,其中文件夹的命名方式是:spark-UUID(随机 UUID 码)。Disk Store 在存储的时候创建文件夹。并且根据【高内聚,低耦合】原则,这种服务型的工具代码就放到了 Utils 中(调用路径:DiskStore.putBytes —> DiskBlockManager.createLocalDirs —> Utils.createDirectory),代码如下:

  1. def createDirectory(root: String, namePrefix: String = "spark"): File = {
  2. var attempts = 0
  3. val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
  4. var dir: File = null
  5. while (dir == null) {
  6. attempts += 1
  7. if (attempts > maxAttempts) {
  8. throw new IOException("Failed to create a temp directory (under " + root + ") after " +
  9. maxAttempts + " attempts!")
  10. }
  11. try {
  12. dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
  13. if (dir.exists() || !dir.mkdirs()) {
  14. dir = null
  15. }
  16. } catch { case e: SecurityException => dir = null; }
  17. }
  18. dir.getCanonicalFile
  19. }

在 DiskBlockManager 里,每个 block 都被存储为一个 file,通过计算 blockId 的 hash 值,将 block 映射到文件中。

  1. def getFile(filename: String): File = {
  2. val hash = Utils.nonNegativeHash(filename)
  3. val dirId = hash % localDirs.length
  4. val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
  5. val subDir = subDirs(dirId).synchronized {
  6. val old = subDirs(dirId)(subDirId)
  7. if (old != null) {
  8. old
  9. } else {
  10. val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
  11. if (!newDir.exists() && !newDir.mkdir()) {
  12. throw new IOException(s"Failed to create local dir in $newDir.")
  13. }
  14. subDirs(dirId)(subDirId) = newDir
  15. newDir
  16. }
  17. }
  18. new File(subDir, filename)
  19. }
  20. def getFile(blockId: BlockId): File = getFile(blockId.name)

通过 hash 值的取模运算,求出 dirIdsubDirId。然后,在从 subDirs 中找到 subDir,如果 subDir 不存在,则创建一个新 subDir。最后,以 subDir 为路径,blockId 的 name 属性为文件名,新建该文件。 文件创建完之后,那么 Spark 就会在 DiskStore 中向文件写与之映射的 block,代码如下:

  1. override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
  2. val bytes = _bytes.duplicate()
  3. logDebug(s"Attempting to put block $blockId")
  4. val startTime = System.currentTimeMillis
  5. val file = diskManager.getFile(blockId)
  6. val channel = new FileOutputStream(file).getChannel
  7. Utils.tryWithSafeFinally {
  8. while (bytes.remaining > 0) {
  9. channel.write(bytes)
  10. }
  11. } {
  12. channel.close()
  13. }
  14. val finishTime = System.currentTimeMillis
  15. logDebug("Block %s stored as %s file on disk in %d ms".format(
  16. file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
  17. PutResult(bytes.limit(), Right(bytes.duplicate()))
  18. }

读取过程就简单了,DiskStore 根据 blockId 读取与之映射的 file 内容,当然,这中间需要从 DiskBlockManager 中得到文件信息。代码如下:

  1. private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
  2. val channel = new RandomAccessFile(file, "r").getChannel
  3. Utils.tryWithSafeFinally {
  4. if (length < minMemoryMapBytes) {
  5. val buf = ByteBuffer.allocate(length.toInt)
  6. channel.position(offset)
  7. while (buf.remaining() != 0) {
  8. if (channel.read(buf) == -1) {
  9. throw new IOException("Reached EOF before filling buffer\n" +
  10. s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
  11. }
  12. }
  13. buf.flip()
  14. Some(buf)
  15. } else {
  16. Some(channel.map(MapMode.READ_ONLY, offset, length))
  17. }
  18. } {
  19. channel.close()
  20. }
  21. }
  22. override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
  23. val file = diskManager.getFile(blockId.name)
  24. getBytes(file, 0, file.length)
  25. }

6.11.4.2 Memory Store

相对 Disk Store,Memory Store 就显得容易很多。Memory Store 用一个 LinkedHashMap 来管理,其中 KeyblockIdValueMemoryEntry 样例类,MemoryEntry 存储着数据信息。代码如下:

  1. private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
  2. private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)

在 MemoryStore 中存储 block 的前提是当前内存有足够的空间存放。通过对 tryToPut 函数的调用对内存空间进行判断。代码如下:

  1. def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
  2. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
  3. val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
  4. val data =
  5. if (putAttempt.success) {
  6. assert(bytes.limit == size)
  7. Right(bytes.duplicate())
  8. } else {
  9. null
  10. }
  11. PutResult(size, data, putAttempt.droppedBlocks)
  12. }

tryToPut 函数中,通过调用 enoughFreeSpace 函数判断内存空间。如果内存空间足够,那么就把 block 放到 LinkedHashMap 中;如果内存不足,那么就告诉 BlockManager 内存不足,如果允许 DiskStore,那么就把该 block 放到 disk 上。代码如下:

  1. private def tryToPut(blockId: BlockId, value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = {
  2. var putSuccess = false
  3. val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
  4. accountingLock.synchronized {
  5. val freeSpaceResult = ensureFreeSpace(blockId, size)
  6. val enoughFreeSpace = freeSpaceResult.success
  7. droppedBlocks ++= freeSpaceResult.droppedBlocks
  8. if (enoughFreeSpace) {
  9. val entry = new MemoryEntry(value(), size, deserialized)
  10. entries.synchronized {
  11. entries.put(blockId, entry)
  12. currentMemory += size
  13. }
  14. val valuesOrBytes = if (deserialized) "values" else "bytes"
  15. logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
  16. blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
  17. putSuccess = true
  18. } else {
  19. lazy val data = if (deserialized) {
  20. Left(value().asInstanceOf[Array[Any]])
  21. } else {
  22. Right(value().asInstanceOf[ByteBuffer].duplicate())
  23. }
  24. val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
  25. droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
  26. }
  27. releasePendingUnrollMemoryForThisTask()
  28. }
  29. ResultWithDroppedBlocks(putSuccess, droppedBlocks)
  30. }

MemoryStore 读取 block 也很简单,只需要从 LinkedHashMap 中取出 blockIdValue 即可。代码如下:

  1. override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
  2. val entry = entries.synchronized {
  3. entries.get(blockId)
  4. }
  5. if (entry == null) {
  6. None
  7. } else if (entry.deserialized) {
  8. Some(entry.value.asInstanceOf[Array[Any]].iterator)
  9. } else {
  10. val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate()
  11. Some(blockManager.dataDeserialize(blockId, buffer))
  12. }
  13. }

6.11.5 数据写入过程分析

**
image.png

数据写入的简要流程:

  1. RDD.iterator() 是与 storage 子系统交互的入口。
  2. CacheManager.getOrCompute 调用 BlockManagerput 接口来写入数据。
  3. 数据优先写入到 MemoryStore 即内存,如果 MemoryStore 中的数据已满则将最近使用次数不频繁的数据写入到磁盘
  4. 通知 BlockManagerMaster 有新的数据写入,在 BlockManagerMaster 中保存元数据
  5. 将写入的数据与其它 slave worker 进行同步,一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即 replicanumber=1

其实,我们在 put 和 get block 的时候并没有那么复杂,前面的细节 BlockManager 都包装好了,我们只需要调用 BlockManager 中的 put 和 get 函数即可。

代码如下:

  1. def putBytes(
  2. blockId: BlockId,
  3. bytes: ByteBuffer,
  4. level: StorageLevel,
  5. tellMaster: Boolean = true,
  6. effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
  7. require(bytes != null, "Bytes is null")
  8. doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
  9. }
  10. private def doPut(
  11. blockId: BlockId,
  12. data: BlockValues,
  13. level: StorageLevel,
  14. tellMaster: Boolean = true,
  15. effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
  16. require(blockId != null, "BlockId is null")
  17. require(level != null && level.isValid, "StorageLevel is null or invalid")
  18. effectiveStorageLevel.foreach {
  19. level => require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
  20. }
  21. val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
  22. val putBlockInfo = {
  23. val tinfo = new BlockInfo(level, tellMaster)
  24. val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
  25. if (oldBlockOpt.isDefined) {
  26. if (oldBlockOpt.get.waitForReady()) {
  27. logWarning(s"Block $blockId already exists on this machine; not re-adding it")
  28. return updatedBlocks
  29. }
  30. oldBlockOpt.get
  31. } else {
  32. tinfo
  33. }
  34. }
  35. val startTimeMs = System.currentTimeMillis
  36. var valuesAfterPut: Iterator[Any] = null
  37. var bytesAfterPut: ByteBuffer = null
  38. var size = 0L
  39. val putLevel = effectiveStorageLevel.getOrElse(level)
  40. val replicationFuture = data match {
  41. case b: ByteBufferValues if putLevel.replication > 1 =>
  42. val bufferView = b.buffer.duplicate()
  43. Future {
  44. replicate(blockId, bufferView, putLevel)
  45. }(futureExecutionContext)
  46. case _ => null
  47. }
  48. putBlockInfo.synchronized {
  49. logTrace("Put for block %s took %s to get into synchronized block".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  50. var marked = false
  51. try {
  52. val (returnValues, blockStore: BlockStore) = {
  53. if (putLevel.useMemory) {
  54. (true, memoryStore)
  55. } else if (putLevel.useOffHeap) {
  56. (false, externalBlockStore)
  57. } else if (putLevel.useDisk) {
  58. (putLevel.replication > 1, diskStore)
  59. } else {
  60. assert(putLevel == StorageLevel.NONE)
  61. throw new BlockException(blockId, s"Attempted to put block $blockId without specifying storage level!")
  62. }
  63. }
  64. val result = data match {
  65. case IteratorValues(iterator) => blockStore.putIterator(blockId, iterator, putLevel, returnValues)
  66. case ArrayValues(array) => blockStore.putArray(blockId, array, putLevel, returnValues)
  67. case ByteBufferValues(bytes) =>
  68. bytes.rewind()
  69. blockStore.putBytes(blockId, bytes, putLevel)
  70. }
  71. size = result.size
  72. result.data match {
  73. case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
  74. case Right (newBytes) => bytesAfterPut = newBytes
  75. case _ =>
  76. }
  77. if (putLevel.useMemory) {
  78. result.droppedBlocks.foreach { updatedBlocks += _ }
  79. }
  80. val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
  81. if (putBlockStatus.storageLevel != StorageLevel.NONE) {
  82. marked = true
  83. putBlockInfo.markReady(size)
  84. if (tellMaster) {
  85. reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
  86. }
  87. updatedBlocks += ((blockId, putBlockStatus))
  88. }
  89. } finally {
  90. if (!marked) {
  91. blockInfo.remove(blockId)
  92. putBlockInfo.markFailure()
  93. logWarning(s"Putting block $blockId failed")
  94. }
  95. }
  96. }
  97. logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  98. if (putLevel.replication > 1) {
  99. data match {
  100. case ByteBufferValues(bytes) =>
  101. if (replicationFuture != null) {
  102. Await.ready(replicationFuture, Duration.Inf)
  103. }
  104. case _ =>
  105. val remoteStartTime = System.currentTimeMillis
  106. if (bytesAfterPut == null) {
  107. if (valuesAfterPut == null) {
  108. throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
  109. }
  110. bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
  111. }
  112. replicate(blockId, bytesAfterPut, putLevel)
  113. logDebug("Put block %s remotely took %s".format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
  114. }
  115. }
  116. BlockManager.dispose(bytesAfterPut)
  117. if (putLevel.replication > 1) {
  118. logDebug("Putting block %s with replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  119. } else {
  120. logDebug("Putting block %s without replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  121. }
  122. updatedBlocks
  123. }
  124. }

对于 doPut 函数,主要做了以下几个操作:

  1. 创建 BlockInfo 对象存储 block 信息。
  2. 将 BlockInfo 加锁,然后根据 Storage Level 判断存储到 Memory 还是 Disk。同时,对于已经准备好读的 BlockInfo 要进行解锁。
  3. 根据 block 的副本数量决定是否向远程发送副本

6.11.5.1 序列化与否

写入的具体内容可以是序列化之后的 bytes 也可以是没有序列化的 value。此处有一个对 scala 的语法中 Either, Left, Right 关键字的理解。

6.11.6 数据读取过程分析

  1. def get(blockId: BlockId): Option[Iterator[Any]] = {
  2. val local = getLocal(blockId)
  3. if (local.isDefined) {
  4. logInfo("Found block %s locally".format(blockId))
  5. return local
  6. }
  7. val remote = getRemote(blockId)
  8. if (remote.isDefined) {
  9. logInfo("Found block %s remotely".format(blockId))
  10. return remote
  11. }
  12. None
  13. }

6.11.6.1 本地读取

首先在查询本机MemoryStoreDiskStore 中是否有所需要的 block 数据存在,如果没有则发起远程数据获取

6.11.6.2 远程读取

远程获取调用路径, getRemote —> doGetRemote, 在 doGetRemote 中最主要的就是调用 BlockManagerWorker.syncGetBlock 来从远程获得数据。

syncGetBlock 代码:

  1. def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
  2. val blockManager = blockManagerWorker.blockManager
  3. val connectionManager = blockManager.connectionManager
  4. val blockMessage = BlockMessage.fromGetBlock(msg)
  5. val blockMessageArray = new BlockMessageArray(blockMessage)
  6. val responseMessage = connectionManager.sendMessageReliablySync(
  7. toConnManagerId, blockMessageArray.toBufferMessage)
  8. responseMessage match {
  9. case Some(message) => {
  10. val bufferMessage = message.asInstanceOf[BufferMessage]
  11. logDebug("Response message received " + bufferMessage)
  12. BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
  13. logDebug("Found " + blockMessage)
  14. return blockMessage.getData
  15. })
  16. }
  17. case None => logDebug("No response message received")
  18. }
  19. null
  20. }

上述这段代码中最有意思的莫过于 sendMessageReliablySync,远程数据读取毫无疑问是一个异步 i/o 操作,这里的代码怎么写起来就像是在进行同步的操作一样呢。也就是说如何知道对方发送回来响应的呢? 别急,继续去看看 sendMessageReliablySync 的定义:

  1. def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
  2. : Future[Option[Message]] = {
  3. val promise = Promise[Option[Message]]
  4. val status = new MessageStatus(
  5. message, connectionManagerId, s => promise.success(s.ackMessage))
  6. messageStatuses.synchronized {
  7. messageStatuses += ((message.id, status))
  8. }
  9. sendMessage(connectionManagerId, message)
  10. promise.future
  11. }

要是我说秘密在这里,你肯定会说我在扯淡,但确实在此处。注意到关键字 Promise 和 Future 没? 如果这个 future 执行完毕,返回 s.ackMessage。我们再看看这个 ackMessage 是在什么地方被写入的呢。看一看 ConnectionManager.handleMessage 中的代码片段:

  1. case bufferMessage: BufferMessage =>
  2. {
  3. if (authEnabled) {
  4. val res = handleAuthentication(connection, bufferMessage)
  5. if (res == true) {
  6. logDebug("After handleAuth result was true, returning")
  7. return
  8. }
  9. }
  10. if (bufferMessage.hasAckId) {
  11. val sentMessageStatus = messageStatuses. synchronized {
  12. messageStatuses.get(bufferMessage.ackId) match {
  13. case Some(status) =>{
  14. messageStatuses -= bufferMessage.ackId
  15. status
  16. }
  17. case None =>{
  18. throw new Exception("Could not find reference for received ack message " +
  19. message.id)
  20. null
  21. }
  22. }
  23. }
  24. sentMessageStatus. synchronized {
  25. sentMessageStatus.ackMessage = Some(message)
  26. sentMessageStatus.attempted = true
  27. sentMessageStatus.acked = true
  28. sentMessageStaus.markDone()
  29. }
  30. }
  31. }

注意:此处的所调用的 sentMessageStatus.markDone 就会调用在 sendMessageReliablySync 中定义的 promise.Success,不妨看看 MessageStatus 的定义:

  1. class MessageStatus(
  2. val message: Message,
  3. val connectionManagerId: ConnectionManagerId,
  4. completionHandler: MessageStatus => Unit) {
  5. var ackMessage: Option[Message] = None
  6. var attempted = false
  7. var acked = false
  8. def markDone() { completionHandler(this) }
  9. }

6.11.7 Partition 如何转化为 Block

storage 模块里面所有的操作都是和 block 相关的,但是在 RDD 里面所有的运算都是基于 partition 的,那么 partition 是如何与 block 对应上的呢? RDD 计算的核心函数是 iterator() 函数:

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  2. if (storageLevel != StorageLevel.NONE) {
  3. SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  4. } else {
  5. computeOrReadCheckpoint(split, context)
  6. }
  7. }

如果当前 RDD 的 storage level 不是 NONE 的话,表示该 RDD 在 BlockManager 中有存储,那么调用 CacheManager 中的 getOrCompute() 函数计算 RDD,在这个函数中 partition 和 block 发生了关系:

  1. 首先根据 RDD idpartition index 构造出 block id (rdd_xx_xx)
  2. 接着从 BlockManager 中取出相应的 block:  
    • 如果该 block 存在,表示此 RDD 在之前已经被计算过和存储在 BlockManager 中,因此取出即可,无需再重新计算。   
    • 如果该 block 不存在则需要调用 RDD 的 **computeOrReadCheckpoint()** 函数计算出新的 block,并将其存储到 BlockManager 中。

需要注意的是 block 的计算和存储是阻塞的,若另一线程也需要用到此 block 则需等到该线程 block 的 loading 结束。

关键代码:

  1. def getOrCompute[T](rdd:RDD[T],split:Partition,context:TaskContext,storageLevel:StorageLevel):Iterator[T] = {
  2. val key = "rdd_%d_%d".format(rdd.id, split.index)
  3. logDebug("Looking for partition " + key)
  4. blockManager.get(key) match {
  5. case Some(values) =>
  6. return values.asInstanceOf[Iterator[T]]
  7. case None =>
  8. loading. synchronized {
  9. if (loading.contains(key)) {
  10. logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
  11. while (loading.contains(key)) {
  12. try {
  13. loading.wait()
  14. } catch {
  15. case _:
  16. Throwable =>}
  17. }
  18. logInfo("Finished waiting for %s".format(key))
  19. blockManager.get(key) match {
  20. case Some(values) =>
  21. return values.asInstanceOf[Iterator[T]]
  22. case None =>
  23. logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
  24. loading.add(key)
  25. }
  26. } else {
  27. loading.add(key)
  28. }
  29. }
  30. try {
  31. logInfo("Partition %s not found, computing it".format(key))
  32. val computedValues = rdd.computeOrReadCheckpoint(split, context)
  33. if (context.runningLocally) {
  34. return computedValues
  35. }
  36. val elements = new ArrayBuffer[Any]
  37. elements++ = computedValues
  38. blockManager.put(key, elements, storageLevel, true)
  39. return elements.iterator.asInstanceOf[Iterator[T]]
  40. } finally {
  41. loading. synchronized {
  42. loading.remove(key)
  43. loading.notifyAll()
  44. }
  45. }
  46. }

这样 RDDtransformationaction 就和 block 数据建立了联系,虽然抽象上我们的操作是在 partition 层面上进行的,但是 partition 最终还是被映射成为 block,因此实际上我们的所有操作都是对 block 的处理和存取

6.11.8 partition 和 block 的对应关系

在 RDD 中,核心的函数是 iterator()

  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  2. if (storageLevel != StorageLevel.NONE) {
  3. SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  4. } else {
  5. computeOrReadCheckpoint(split, context)
  6. }
  7. }

如果当前 RDDstorage level 不是 NONE 的话,表示该 RDD 在 BlockManager 中有存储,那么调用 CacheManager 中的 getOrCompute 函数计算 RDD,在这个函数中 partition 和 block对应起来了:getOrCompute 函数会先构造 RDDBlockId其中 RDDBlockId 就把 block 和 partition 联系起来了,RDDBlockId 产生的 name 就是 BlockId 的 name 属性,形式是:rdd_rdd.id_partition.index

getOrCompute 代码:

  1. def getOrCompute[T](
  2. rdd: RDD[T],
  3. partition: Partition,
  4. context: TaskContext,
  5. storageLevel: StorageLevel): Iterator[T] = {
  6. val key = RDDBlockId(rdd.id, partition.index)
  7. logDebug(s"Looking for partition $key")
  8. blockManager.get(key) match {
  9. case Some(blockResult) =>
  10. val existingMetrics = context.taskMetrics
  11. .getInputMetricsForReadMethod(blockResult.readMethod)
  12. existingMetrics.incBytesRead(blockResult.bytes)
  13. val iter = blockResult.data.asInstanceOf[Iterator[T]]
  14. new InterruptibleIterator[T](context, iter) {
  15. override def next(): T = {
  16. existingMetrics.incRecordsRead(1)
  17. delegate.next()
  18. }
  19. }
  20. case None =>
  21. val storedValues = acquireLockForPartition[T](key)
  22. if (storedValues.isDefined) {
  23. return new InterruptibleIterator[T](context, storedValues.get)
  24. }
  25. try {
  26. logInfo(s"Partition $key not found, computing it")
  27. val computedValues = rdd.computeOrReadCheckpoint(partition, context)
  28. if (context.isRunningLocally) {
  29. return computedValues
  30. }
  31. val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
  32. val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
  33. val metrics = context.taskMetrics
  34. val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
  35. metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
  36. new InterruptibleIterator(context, cachedValues)
  37. } finally {
  38. loading.synchronized {
  39. loading.remove(key)
  40. loading.notifyAll()
  41. }
  42. }
  43. }
  44. }

同时 getOrCompute 函数会对 block 进行判断:

  • 如果该 block 存在,表示此 RDD 在之前已经被计算过和存储在 BlockManager 中,因此取出即可,无需再重新计算。
  • 如果该 block 不存在则需要调用 RDD 的 computeOrReadCheckpoint() 函数计算出新的 block,并将其存储到 BlockManager 中。

需要注意的是 block 的计算和存储是阻塞的,若另一线程也需要用到此 block 则需等到该线程 block 的 loading 结束。