存储系统为谁服务?

RDD 缓存、Shuffle 中间文件、广播变量

RDD 缓存

RDD 缓存 —-> RDD 以缓存的形式物化到内存或磁盘的过程。

RDD 缓存的好处

对于一些计算成本和访问频率都比较高的 RDD 来说,缓存有两个好处:

  1. 是通过截断 DAG,可以降低失败重试的计算开销;
  2. 是通过对缓存内容的访问,可以有效减少从头计算的次数,从整体上提升作业端到端的执行性能。

    shuffle 中间文件

map阶段(shuffle write):按照 Reducer 的分区规则将中间数据写入本地磁盘
Reduce阶段(shuffle read):从各个节点下载数据分片,并根据需要进行聚合计算

广播变量(memorystore管理)

利用存储系统,广播变量可以在 Executors 进程范畴内保存全量数据。这样一来,对于同一 Executors 内的所有计算任务就能够以 Process local (还有node,rack,any?)的本地性级别,来共享广播变量中携带的全量数据了。

存储系统都有哪些组件

BlockManager(Executor端)

在 Executors 端负责统一管理和协调数据的本地存取跨节点传输
对外:Executor 向 BlockManagerMaster 汇报自身元信息,不定时按需拉取其它Executor数据存储状态
不同 Executors 的 BlockManager 之间也会以 Server/Client 模式跨节点推送和拉取数据块。
对内:数据的存储和收发

内存存储抽象:MemoryStore(Executor端)

广播变量的全量数据存储在 Executors 进程中,因此它由 MemoryStore 管理。

MemoryEntry

两个实现类:DeserializedMemoryEntry 和 SerializedMemoryEntry
分别用于封装原始对象值和序列化之后的字节数组。DeserializedMemoryEntry 用 Array[T]来存储对象值序列,其中 T 是对象类型,而 SerializedMemoryEntry 使用 ByteBuffer 来存储序列化后的字节序列。

底层是一个LinkedHashMap[BlockId, MemoryEntry],即 Key 为 BlockId,Value 是 MemoryEntry 的链式哈希字典。在这个字典中,一个 Block 对应一个 MemoryEntry。显然,这里的 MemoryEntry 既可以是 DeserializedMemoryEntry,也可以是 SerializedMemoryEntry。有了这个字典,我们通过 BlockId 即可方便地查找和定位 MemoryEntry,实现数据块的快速存取。

磁盘存储抽象:DiskStore(Executor端)

Shuffle 中间文件往往会落盘到本地节点,所以这些文件的落盘和访问就要经由 DiskStore
RDD 缓存会稍微复杂一些,由于 RDD 缓存支持内存缓存和磁盘缓存两种模式,因此我们要视情况而定,缓存在内存中的数据会封装到 MemoryStore,缓存在磁盘上的数据则交由 DiskStore 管理。

存储的数据对象类型

对象值(Object Values)和字节数组(Byte Array)
对象值压缩为字节数组的过程叫做序列化,而字节数组还原成原始对象值的过程就叫做反序列化
ps:对象就是组装好的家具,拿来即用,字节数组就是未组装的家具,还得先装好再用
核心原则就是:如果想省地儿,你可以优先考虑字节数组;如果想以最快的速度访问对象,还是对象值更直接一些

RDD 缓存的本质

缓存 RDD 的过程,就是将 RDD 计算数据的迭代器(Iterator)进行物化的过程,可以分成三步走。
image.png

既然要把数据内容缓存下来,自然得先把 RDD 的迭代器展开成实实在在的数据值才行。因此,第一步就是通过调用 putIteratorAsValues 或是 putIteratorAsBytes 方法,把 RDD 迭代器展开为数据值,然后把这些数据值暂存到一个叫做 ValuesHolder 的数据结构里。这一步,我们通常把它叫做“Unroll”。

第二步,为了节省内存开销,我们可以在存储数据值的 ValuesHolder 上直接调用 toArray 或是 toByteBuffer 操作,把 ValuesHolder 转换为 MemoryEntry 数据结构。这一步的转换不涉及内存拷贝,也不产生额外的内存开销从 Unroll memory 到 Storage memory 的 Transfer(转移)”。

第三步,这些包含 RDD 数据值的 MemoryEntry 和与之对应的 BlockId,会被一起存入 Key 为 BlockId、Value 是 MemoryEntry 引用的链式哈希字典中。因此,LinkedHashMap[BlockId, MemoryEntry]缓存的是元数据,MemoryEntry 才是真正保存 RDD 数据实体的存储单元。

当所有的 RDD 数据分片都物化为 MemoryEntry,并且所有的(Block ID, MemoryEntry)对都记录到 LinkedHashMap 字典之后,RDD 就完成了数据缓存到内存的过程。

如果内存空间不足以容纳整个 RDD 怎么办?很简单,强行把大 RDD 塞进有限的内存空间肯定不是明智之举,所以 Spark 会按照 LRU 策略逐一清除字典中最近、最久未使用的 Block(再次注意,memorystore是内存资源),以及其对应的 MemoryEntry。

透过 Shuffle 看 DiskStore

DiskStore 中数据的存取本质上就是字节序列与磁盘文件之间的转换,它通过 putBytes 方法把字节序列存入磁盘文件,再通过 getBytes 方法将文件内容转换为数据块。

DiskStore 不像 MemoryStore 采用链式哈希字典来维护元数据,它并没有亲自维护这些元数据,而是交给 DiskBlockManager

DiskBlockManager

记录逻辑数据块 Block 与磁盘文件系统中物理文件的对应关系,每个 Block 都对应一个磁盘文件。同理,每个磁盘文件都有一个与之对应的 Block ID,这就好比货架上的每一件货物都有唯一的 ID 标识。

DiskBlockManager 在初始化的时候,首先根据配置项 spark.local.dir 在磁盘的相应位置创建文件目录。然后,在 spark.local.dir 指定的所有目录下分别创建子目录,子目录的个数由配置项 spark.diskStore.subDirectories 控制,它默认是 64。所有这些目录均用于存储通过 DiskStore 进行物化的数据文件,如 RDD 缓存文件、Shuffle 中间结果文件等。

image.png

Spark 默认采用 SortShuffleManager 来管理 Stages 间的数据分发,在 Shuffle write 过程中,有 3 类结果文件:temp_shuffle_XXX、shuffle_XXX.data 和 shuffle_XXX.index。Data 文件存储分区数据,它是由 temp 文件合并而来的,而 index 文件记录 data 文件内不同分区的偏移地址。Shuffle 中间文件具体指的就是 data 文件和 index 文件,temp 文件作为暂存盘文件最终会被删除。

Shuffle write 的不同阶段,Shuffle manager 通过 BlockManager 调用 DiskStore 的 putBytes 方法将数据块写入文件。文件由 DiskBlockManager 创建,文件名就是 putBytes 方法中的 Block ID,这些文件会以“temp_shuffle”或“shuffle”开头,保存在 spark.local.dir 目录下的子目录里。

Shuffle read 阶段,Shuffle manager 再次通过 BlockManager 调用 DiskStore 的 getBytes 方法,读取 data 文件和 index 文件,将文件内容转化为数据块,最终这些数据块会通过网络分发到 Reducer 端进行聚合计算。

通过 MemoryStore 使用 getValues/getBytes 方法去访问 RDD 缓存的过程

getBytes/getValues 的实现都比较简单,

  1. 先对LinkedHashMap加锁,通过blockId取出对应的MemoryEntry,然后通过模式匹配,
    1. getBytes负责处理序列化的SerializedMemoryEntry,并返回Option[ChunkedByteBuffer],ChunkedByteBuffer是一个只读字节缓冲区,物理上存储为多个块而不是单个连续数组;
    2. getValues负责处理对象值序列DeserializedMemoryEntry,返回一个Iterator

广播变量存入 MemoryStore 的流程

我们在对一个RDD进行广播之后,返回的是一个Broadcast[T],而Broadcast是一个抽象类,它目前(Spark 2.4.5的源码)只有一个实现类TorrentBroadcast。TorrentBroadcast的机制:driver将序列化的对象划分为多个小的chunks(chunks为ByteBuffer类型的数组,即Array[ByteBuffer]),然后将这些chunks存储到driver的BlockManager中。

调用链路:
TorrentBroadcast 的 writeBlocks -> BlockManager 的 putBytes -> BlockManager 的 doPutBytes()
到这一步会判断存储级别

通过调用 doPutBytes() 方法将序列化的 bytes(ChunkedByteBuffer)通过指定的 StorageLevel 保存到 memoryStore 中;
3. 接下来,重点就在doPutBytes() 方法的实现,首先它会根据传入此方法中的 StorageLevel (广播变量driver端默认是 MEMORY_AND_DISK)来判断缓存是写入内存还是磁盘,也就是用 MemoryStore 还是 DiskStore 来进行缓存;

如果 useMemory&&deserialized,会走这条链路:
BlockManager 的 saveDeserializedValuesToMemoryStore -> (这里看英文就很好懂了,就是保存非序列化值到memoryStore)
MemoryStore 的 putIteratorAsValues ->
MemoryStore的 putIterator
这一步尝试将给定的块作为值或字节放入内存存储。 但是迭代器可能太大,为了避免OOM异常,这里会逐渐展开迭代器,同时定期检查是否有足够的可用内存。如果块被成功物化,那么物化过程中使用的临时展开内存就被“转移”到存储内存中;再回到上面存储级别的判断
如果 使用内存并且序列化 ,则走下面的调用链路:
BlockManager 的 saveSerializedValuesToMemoryStore(保存序列化值到memoryStore) -> MemoryStore 的 putBytes
这里会测试 MemoryStore 中是否有足够的空间。如果空间足够,则创建 ByteBuffer 并将其放入MemoryStore。否则就不会创建 ByteBuffer。最终会用 SerializedMemoryEntry 将 ByteBuffer 封装起来,放到老师在文中提到的 LinkedHashMap。