6 内存管理

在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程:

  • Driver为主控进程,负责创建 Spark 上下文,提交 Spark 作业,将作业转化为 Task,并在各个 Executor 进程间协调任务的调度
  • Executor负责在工作节点上执行具体的计算任务,并将结果返回给 Driver,同时为需要持久化的 RDD 提供存储功能

Driver 的内存管理(缺省值 1G)相对来说较为简单,这里主要针对 Executor 的内存管理进行分析,下文中提到的Spark 内存均特指 Executor 的内存。

6.1 堆内内存与堆外内存

  • 作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
  • 堆内内存受到 JVM 统一管理,堆外内存是直接向操作系统进行内存的申请和释放。

image.png

1 、堆内内存

  • 堆内内存的大小,由 Spark 应用程序启动时的 executor-memory 或spark.executor.memory 参数配置。Executor内运行的并发任务共享 JVM 堆内内存。
    • 缓存 RDD 数据和广播变量占用的内存被规划为存储内存
    • 执行 Shuffle 时占用的内存被规划为执行内存
    • Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间
  • Spark 对堆内内存的管理是一种逻辑上 ”规划式” 的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark只能在申请后和释放前记录这些内存。
  • 虽然不能精准控制堆内内存的申请和释放,但 Spark 通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现。

    2 、堆外内存

  • 为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。

  • 堆外内存意味着把内存对象分配在 Java 虚拟机的堆以外的内存,这些内存直接受操作系统管理。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
  • 利用 JDK Unsafe API,Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放(堆外内存之所以能够被精确的申请和释放,是由于内存的申请和释放不再通过 JVM 机制,而是直接向操作系统申请,JVM 对于内存的清理是无法准确指定时间点的,因此无法实现精确的释放),而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
  • 在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享 存储内存 和 执行内存。

    6.2 静态内存管理

  • Spark 2.0 以前版本采用静态内存管理机制。存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如下图所示:

image.png

  • 可用的存储内存 = systemMaxMemory*spark.storage.memoryFraction *spark.storage.safetyFraction
  • 可用的执行内存 = systemMaxMemory*spark.shuffle.memoryFraction *spark.shuffle.safetyFraction
  • systemMaxMemory 为当前 JVM 堆内内存的大小
  • 这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和”其它内存”一样交给了 JVM去管理。

  • 堆外内存分配较为简单,只有存储内存和执行内存。可用的执行内存和存储内存占用的空间大小直接由参数spark.memory.storageFraction 决定。由于堆外内存占用的空间可以被精确计算,无需再设定保险区域。

image.png

  • 静态内存管理机制实现起来较为简单,但如果用户不熟悉 Spark 的存储机制,或没有根据具体的数据规模和计算任务或做相应的配置,很容易造成”一半海水,一半火焰”的局面,即存储内存和执行内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。由于新的内存管理机制的出现,这种方式目前已经很少有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。

    6.3 统一内存管理

    Spark 2.0 之后引入统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构如下图所示:
    image.png

  • 统一内存管理的堆外内存结构如下图所示:

image.png

  • 其中最重要的优化在于动态占用机制,其规则如下:
    • 设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围
    • 双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
    • 执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间
    • 存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂

image.png

  • 在执行过程中:执行内存的优先级 > 存储内存的优先级(理解)

    • 凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的。所以要想充分发挥 Spark 的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。

      6.4 存储内存管理

      1 、RDD 持久化机制

  • RDD作为 Spark 最根本的数据抽象,是只读的分区记录的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的 RDD 上执行转换操作产生一个新的 RDD。转换后的 RDD 与原始的 RDD 之间产生的依赖关系。凭借Lineage,Spark保证了每一个 RDD 都可以被重新恢复。但 RDD 的所有转换都是惰性的,即只有当一个返回结果给Driver 的Action发生时,Spark 才会创建任务读取 RDD,然后真正触发转换的执行。

  • Task 在启动之初读取一个分区时:

    • 先判断这个分区是否已经被持久化
    • 如果没有则需要检查 Checkpoint 或按照血统重新计算。如果一个 RDD 上要执行多次Action,可以在第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在执行后面的Action时提升计算速度。
  • RDD 的持久化由 Spark 的 Storage【BlockManager】 模块负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时Driver 端和 Executor 端 的 Storage 模块构成了主从式架构,即 Driver 端 的 BlockManager 为Master,Executor 端的 BlockManager 为 Slave。

  • Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个Partition 经过处理后唯一对应一个Block。Driver 端的 Master 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而 Executor 端的 Slave 需要将 Block 的更新等状态上报到 Master,同时接收Master 的命令,如新增或删除一个 RDD。

  • 在对 RDD 持久化时,Spark 规定了 MEMORY_ONLY、MEMORY_AND_DISK 等存储级别 ,这些存储级别是以下 5个变量的组合:

image.png

  • Spark中存储级别如下:

image.png

  • 存储级别从三个维度定义了 RDD Partition 的存储方式:
    • 存储位置:磁盘/堆内内存/堆外内存
    • 存储形式:序列化方式 / 反序列化方式
    • 副本数量: 1 份 / 2份

      2 、RDD 缓存过程

  1. File => RDD1 => RDD2 =====> RDD3 => RDD4 =====> RDD5 => Action
  2. RDD缓存的源头:Other (Iterator / 内存空间不连续)
  3. RDD缓存的目的地:存储内存(内存空间连续)
  • RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法。通过 Iterator 可以获取分区中每一条序列化或者非序列化的数据项(Record),这些 Record的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同 Record 的存储空间并不连续。

  • RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将Partition 由不连续的存储空间转换为连续存储空间的过程,Spark 称之为展开(Unroll)。

  • Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别:

    • 非序列化的 Block 以 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的对象实例
    • 序列化的 Block 以 SerializedMemoryEntry 的数据结构定义,用字节缓冲区(ByteBuffer)存储二进制数据

image.png

  • 每个 Executor 的 Storage 模块用 LinkedHashMap 来管理堆内和堆外存储内存中所有的 Block 对象的实例,对这个HashMap 新增和删除间接记录了内存的申请和释放。

image.png
备注:MemoryStroe => BlockManager

因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时可以继续进行。

  • 序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请
  • 非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的Unroll 空间并进行申请,空间不足时可以中断,释放已占用的 Unroll 空间
  • 如果最终 Unroll 成功,当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间

image.png

  • 在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对Unroll 空间进行特别区分,当存储空间不足时会根据动态占用机制进行处理。

    3 、淘汰与落盘

  • 由于同一个 Executor 的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对 LinkedHashMap 中的旧 Block 进行淘汰(Eviction),而被淘汰的 Block 如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该 Block。

    Memory_And_Disk => cache => Memory
    淘汰:从内存空间中清除
    落盘:将存储内存中的数据(RDD缓存的数据)写到磁盘上
    
  • 存储内存的淘汰规则为:

    • 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存
    • 新旧 Block 不能属于同一个 RDD,避免循环淘汰
    • 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题
    • 遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block 所需的空间。其中LRU 是 LinkedHashMap 的特性。
  • 落盘的流程则比较简单,如果其存储级别符合_useDisk 为 true 的条件,再根据其 _deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在 Storage 模块中更新其信息。

    6.5 执行内存管理

  • 执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程,Shuffle 的 Write 和 Read 两阶段对执行内存的使用:

    • Shuffle Write
      • 在 map 端会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
    • Shuffle Read
      • 在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间
      • 如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间
  • 在 ExternalSorter 和 Aggregator 中,Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据,但在 Shuffle 过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从 MemoryManager 申请到新的执行内存时,Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并。

image.png

  • Spark 的存储内存和执行内存有着截然不同的管理方式:
    • 对存储内存来说,Spark 用一个 LinkedHashMap 来集中管理所有的 Block,Block 由需要缓存的 RDD 的Partition 转化而成;
    • 对执行内存来说,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据,在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制。