• 为什么不使用原生的而是要自己造轮子

    • 有效数据密度低
    • 垃圾回收长
    • OOM问题影响稳定性
    • CPU 缓存利用率不高 (缓存未命中问题)
  • 为什么自己造的轮子效果要比前者好

    • 内存安全执行和高效的核外算法 (分配的内存块数量是固定的,监控起来比较方便 还有 核外算法)
    • 减少垃圾收集压力 (所有数据以二进制的形式存在 ,可重用/可变/生命周期短暂 降低了垃圾收集的压力)
    • 节省空间的数据存储 (数据以二进制的形式存储, 不需要像 Java 对象一样有额外的存储开销)
    • 高效的二进制操作和缓存敏感性 (二进制数据相邻的存储在一起,可以提高CPU 的缓存命中率)
  • 轮子的制造过程(整链路的执行过程)

  • 制造前与制造后效果的对比


为什么不使用原生的而是要自己造轮子

  • 如今,许多用于分析大型数据集的开源系统都是用 Java 或者是基于 JVM 的编程语言实现的。比如 : Apache Spark、Apache Drill、Apache Flink ..
  • 基于 JVM 的数据分析引擎面临的一个常见挑战就是如何在内存中存储大量的数据(包括缓存和高效处理)。合理的管理好 JVM 内存可以将难以配置且不可预测的系统与少量配置且稳定运行的系统区分开来。

    使用Java 管理的JVM 有一些缺陷

  1. 有效数据密度低
    1. JAVA 对象包含 (对象头,实例数据,对其填充部分)

      1. 32 位虚拟机 32bit | 64位虚拟机 64bit
    2. 为了提高效率,内存中的数据存储不是连续的,而是按照 8 byte 整数倍 进行存储

    3. 这样就会导致为了达到 8byte 的整数倍条件 而填充了很多数据byte 进去 浪费了资源 (有效信息存储密度)
  1. 垃圾回收
    1. JVM 默认垃圾回收

      1. 优点: 开发者无序资源回收,可以提高开发效率,减少了内存泄露的可能
      2. 缺点: 内存回收时不可控的,一旦出现 Full GC,GC 会达到秒级甚至分钟级,严重影响效率
    2. 将数据作为对象大都是生存在新生代,当 JVM 进行垃圾回收时,垃圾收集的开销很容易达到 50% 甚至更多

    3. GC 中断会使集群中的心跳信息超时,导致节点被踢出集群,集群进入不稳定状态
  1. OutOfMemoryError 代价比较高

    当 JVM 中所有对象大小 超过分配给 JVM 大小时,就会出现 OutOfMemory,导致系统崩溃,分布式框架的健壮性和性能都会有影响

  1. CPU 缓存利用率不高
    1. Cpu 进行计算时,是从 CPU 缓存中获取数据,而不是直接从内存中获取数据,缓存有一个程序局部性原理

      1. 时间局部性: 最近被 CPU 访问的数据,短期内 CPU 还要访问
      2. 空间局部性: 被 CPU 访问的数据附近的数据, 短期内 CPU 还要访问
    2. JAVA 在堆上的数据不是连续的,所以从内存中读 JAVA 对象时,缓存附近的内存区域的数据往往不是 CPU 计算所需要的 (这就是缓存未命中)

      1. CPU 需要等待 从内存中重新读取数据
      2. CPU 利用率不高 (白等)
      3. CPU 的速度 和 内存 的速度之间差好几个数量级

image.png
image.png

  1. 操作(监控/控制)内存中的数据比较困难

    在 JVM 中处理大量数据最直接的方式就是将这些数据做为对象存储在堆内存中,然后直接在内存中操作这些数据,如果想进行排序则就是对对象列表进行排序。然而这种方法有一些明显的缺点,首先,在频繁的创建和销毁大量对象的时候,监视和控制堆内存的使用并不是一件很简单的事情。如果对象分配过多的话,那么会导致内存过度使用,从而触发 OutOfMemoryError,导致 JVM 进程直接被杀死。


为什么自己造的轮子效果要比前者好

  • Flink 将对象序列化为固定数量的预先分配的内存段,而不是直接把对象放在堆内存上。

    它的 DBMS 风格的排序和连接算法尽可能多地对这个二进制数据进行操作,以此将序列化和反序列化开销降到最低。

  • 如果需要处理的数据多于可以保存在内存中的数据,Flink 的运算符会将部分数据溢出到磁盘。

    事实上,很多Flink 的内部实现看起来更像是 C / C ++,而不是普通的 Java。下图概述了 Flink 如何在内存段中存储序列化数据并在必要时溢出到磁盘:

  • 操作二进制数据的优势

image.png

Flink 的主动内存管理和操作二进制数据有几个好处:

→ 1、内存安全执行和高效的核外算法

由于分配的内存段的数量是固定的,因此监控剩余的内存资源是非常简单的。在内存不足的情况下,处理操作符可以有效地将更大批的内存段写入磁盘,后面再将它们读回到内存。因此,OutOfMemoryError 就有效的防止了。

核外算法 指的是 : 处理太大而无法放入计算机主内存的数据

→ 2、减少垃圾收集压力

因为所有长生命周期的数据都是在 Flink 的管理内存中以二进制表示的,所以所有数据对象都是短暂的,甚至是可变的,并且可以重用。短生命周期的对象可以更有效地进行垃圾收集,这大大降低了垃圾收集的压力。现在,预先分配的内存段是 JVM 堆上的长期存在的对象,为了降低垃圾收集的压力,Flink 社区正在积极地将其分配到堆外内存。这种努力将使得 JVM 堆变得更小,垃圾收集所消耗的时间将更少。(已经做到了)

→ 3、节省空间的数据存储

Java 对象具有存储开销,如果数据以二进制的形式存储,则可以避免这种开销。

→ 4、高效的二进制操作和缓存敏感性

在给定合适的二进制表示的情况下,可以有效地比较和操作二进制数据。此外,二进制表示可以将相关值、哈希码、键和指针等相邻地存储在内存中。这使得数据结构通常具有更高效的缓存访问模式。

评语
  • 主动内存管理的这些特性在用于大规模数据分析的数据处理系统中是非常可取的,但是要实现这些功能的代价也是高昂的。

    要实现对二进制数据的自动内存管理和操作并非易事,使用 java.util.HashMap 比实现一个可溢出的 hash-table (由字节数组和自定义序列化支持)。当然,Apache Flink 并不是唯一一个基于 JVM 且对二进制数据进行操作的数据处理系统。

  • 例如 Apache Drill、Apache Ignite、Apache Spark、Apache Geode 也有应用类似技术。


轮子的制造过程(整链路的执行过程)

1. Flink 如何分配内存 (MemorySegment| MemorySegment)

1.1 Flink TaskManager 组成

  1. actor 系统(负责与 Flink master 协调)
  2. IOManager(负责将数据溢出到磁盘并将其读取回来)
  3. MemoryManager(负责协调内存使用)

**

  • 在Flink 中,Java 的对象信息被序列化为二进制数据流,在内存中连续存储,保存在预分配的内存块上(固定长度为32k)的 MemorySegment
    1. Flink 对其提供了高效的读写,很多运算可以直接操作二进制,都不需要反序列化

→ 1.2 MemoryManager

MemoryManager负责将 MemorySegment分配、计算和分发给数据处理操作符,例如 sort 和 join 等操作符。

在初始化 TaskManager 的所有内部数据结构并且已启动所有核心服务之后,MemoryManager开始创建 MemorySegments

  • 默认情况下,服务初始化后,70% 可用的 JVM 堆内存由 MemoryManager分配(也可以配置全部)。

剩余的 JVM 堆内存用于在任务处理期间实例化的对象,包括由用户定义的函数创建的对象。下图显示了启动后 TaskManager JVM 中的内存分布
image.png

→ 1.3 MemorySegment

MemorySegment在 FLink 中 叫做 MemorySegment,是Flink 内存抽象最小单元, 默认情况下,一个 MemorySegment对应一个 32k 内存块,这个内存块可以是

  • 堆上内存 (JVAV Byte 数组)
  • 内外内存 (基于 Netty 的DirctByteBuffer)

MemorySegment可以保存在堆上,其内部存储为一个 Byte数组,也可以保留在堆外的ByteBuffer 上,在堆上管理序列化之后的数据,如果超过内存的限制,可以将部分数据保留在磁盘上

  • 操作磁盘 就像操作一大块连续的内存一样
    • 为了在更大的连续内存块上操作多个 MemorySegment,Flink 使用了实现 Java 的 java.io.DataOutput 和 java.io.DataInput 接口的逻辑视图。
  • 提供了 AbstractPagedInputView

MemorySegment通过使用 Java 的 unsafe 方法对其支持的字节数组提供非常有效的读写访问

  • 你可以将 MemorySegment看作是 Java 的 NIO ByteBuffer 的定制版本
  • 对于基本类型: MemorySegment 内置了方法,可以直接返回和写入数组
  • 对于其他类型: MemorySegment 读取二进制数组(byte[]) 后进行反序列化,序列化为二进制数组 (byte[]) 后写入

  • MemorySegmentTaskManager 启动时分配一次,并在 TaskManager 关闭时销毁。

  • 因此,在 TaskManager 的整个生命周期中,MemorySegment是重用的,而不会被垃圾收集的。

2. Flink 如何对对象进行序列化和反序列化

📘


3. Flink 如何对二进制数据进行操作

与其他的数据处理框架的 API(包括 SQL)类似,Flink 的 API 也提供了对数据集进行分组、排序和连接等转换操作。这些转换操作的数据集可能非常大。关系数据库系统具有非常高效的算法,比如 merge-sort、merge-join 和 hash-join。Flink 建立在这种技术的基础上,但是主要分为使用自定义序列化和自定义比较器来处理任意对象。在下面文章中我们将通过 Flink 的内存排序算法示例演示 Flink 如何使用二进制数据进行操作。

Flink 为其数据处理操作符预先分配内存,初始化时,排序算法从 MemoryManager请求内存预算,并接收一组相应的 MemorySegments。这些 MemorySegments 变成了缓冲区的内存池,缓冲区中收集要排序的数据。下图说明了如何将数据对象序列化到排序缓冲区中:
image.png
排序缓冲区在内部分为两个内存区域:第一个区域保存所有对象的完整二进制数据,第二个区域包含指向完整二进制对象数据的指针(取决于 key 的数据类型)。将对象添加到排序缓冲区时,它的二进制数据会追加到第一个区域,指针(可能还有一个 key)被追加到第二个区域。分离实际数据和指针以及固定长度的 key 有两个目的:它可以有效的交换固定长度的 entries(key 和指针),还可以减少排序时需要移动的数据。如果排序的 key 是可变长度的数据类型(比如 String),则固定长度的排序 key 必须是前缀 key,比如字符串的前 n 个字符。请注意:并非所有数据类型都提供固定长度的前缀排序 key。将对象序列化到排序缓冲区时,两个内存区域都使用内存池中的 MemorySegments进行扩展。一旦内存池为空且不能再添加对象时,则排序缓冲区将会被完全填充并可以进行排序。Flink 的排序缓冲区提供了比较和交换元素的方法,这使得实际的排序算法是可插拔的。默认情况下, Flink 使用了 Quicksort(快速排序)实现,可以使用 HeapSort(堆排序)。下图显示了如何比较两个对象:
image.png
排序缓冲区通过比较它们的二进制固定长度排序 key 来比较两个元素。如果元素的完整 key(不是前缀 key) 或者二进制前缀 key 不相等,则代表比较成功。如果前缀 key 相等(或者排序 key 的数据类型不提供二进制前缀 key),则排序缓冲区遵循指向实际对象数据的指针,对两个对象进行反序列化并比较对象。根据比较结果,排序算法决定是否交换比较的元素。排序缓冲区通过移动其固定长度 key 和指针来交换两个元素,实际数据不会移动,排序算法完成后,排序缓冲区中的指针被正确排序。下图演示了如何从排序缓冲区返回已排序的数据:
image.png
通过顺序读取排序缓冲区的指针区域,跳过排序 key 并按照实际数据的排序指针返回排序数据。此数据要么反序列化并作为对象返回,要么在外部合并排序的情况下复制二进制数据并将其写入磁盘。


Other

Flink 内存模型

在 1.10 以前的版本中,内存存在一些缺陷,导致资源充分利用率比较困难

  1. 流批配置上使用的模型不同,配置参数多,关系乱
  2. 流中 RocksDBStateBackend 配置参数多,需要依赖用户进行复杂的配置

1.x Flink 内存布局

📘


2.x 内存计算

  • JVM 启动之前就需要确定各个内存的大小,一旦JVM 启动,在TaskManager进程内部就不在启动了
  • 进行内存大小计算有两种情况

    • Standloan : 内存的计算在启动脚本中实现
    • 容器(Kubernetes | Yarn | Mesos): 在ResourceManager中进行


      3.x 启动方式

  • 在启动脚本与容器环境下的内存大小计算都调用了FLink 的Java 代码,保证了所有模式下的统一,计算好的参数使用-D 参数 交给 Java 进程

必须使用如下三种方式之一来配置Flink的内存(本地执行除外),否则Flink启动将失败。这意味着必须显式配置以下没有默认值的选项子集之一:

注意: 不建议同时配置总进程内存和Flink总内存。由于潜在的内存配置冲突,可能会导致部署失败。其他内存组件的其他配置也需要谨慎,因为它可能会导致进一步的配置冲突。
image.png


内存段 MemorySegment

1. MemorySegment 介绍

: -> 上面介绍过了 看上面

2. MemorySegment 结构

  • BYTE_ARRAY_BASE_OFFSET
    • 二进制字节数组的起始索引,相对于字节数组对象而言
  • LITTLE_ENDIAN
    • 判断是否为 Little Endian 模式的字节存储顺序,若不是,就是 Big Endian 模式
  • HeapMemory
    • MemorySegment使用的是 堆上内存还是堆外内存 (堆上内存为堆上字节数组,堆外内存为 null)
  • address
    • HeapMemory 不为 null 则为 字节数组相应的相对地址
    • HeapMemory 为 null, 则为堆外内存的绝对地址
  • addressLimit
    • 标识地址结束位置 (address Size)
  • size

    内存段的大小(以字节为单位)

3. 字节顺序 Big Endian 与 Little Endian

字节顺序

  • 字节顺序是指占内存多于一个字节类型的数据在内存中的存放顺序,通常有小端、大端两种字节顺序。
    • 小端字节序指低字节数据存放在内存低地址处,高字节数据存放在内存高地址处
    • 大端字节序是高字节数据存放在低地址处,低字节数据存放在高地址处。

不同 Cpu 架构使用不同存储

4. MemorySegment 实现

image.png


内存页

1. 内存页 介绍

  1. MemorySegment 是内存分配最小单元,但太偏向于底层操作,对于上层使用者来说,需要考虑所有细节,非常繁琐,所有Flink 对 MemorySegment 又向上 抽象了一层

  2. 内存页是 MemorySegment 之上的数据访问视图 两层抽象会跨 MemorySegment 读取写入

    1. 数据读取抽象为 DataInputView
    2. 数据写入抽象为 DataOutPutView

2. DataInputView

  1. MemorySegment 数据读取抽象视图,继承自 Java.Io.DataInput , 提供了从二进制流中读取不同数据类型方法

  2. InputView 中持有多个 MemorySegment 的引用 (MemorySegment[]),这一组MemorySegment 被视为一个 内存页(Page), 可以顺序读取 MemorySegment 中的数据

  3. 基本上所有的 InputView 实现子类都继承了 AbstractPageInputView,所有InputView 都支持 Page(内存页)

    3. DataOutPutView

  4. MemorySegment 数据写入抽象视图,继承自 Java.Io.DataOutput, 提供了将不同数据写入到二进制流的方法

  5. OutPutView 中持有多个 MemorySegment 的引用 (MemorySegment[]),这一组MemorySegment 被视为一个 内存页(Page), 可以顺序的向 MemorySegment 写数据

  6. 基本上所有的 OutPutView 实现子类都继承了AbstractPageOutPutView,所有OutPutView 都支持 Page(内存页) 都支持 跨 MemorySegment 写入

4. 内存页的使用

  1. 对于内存的读写请求是很底层的行为,只要涉及到对二进制读写都使用到了 Data Input/OutPut View,而不是直接操作 MemorySegment

  2. 例如: flink-table-runtime-blink 中

    1. BinaryRowSerializable中使用 AbstractPageInputView 从内存中读取二进制数据并转换成 BinaryRow
    2. 使用 AbstractPageOutPutViewBinaryRow 写入到 MemorySegment

Buffer

  1. Buffer底层是: MemorySegment,Buffer申请和释放由 Flink 管理
  2. Flink 引入 “数” 的概念
    1. 当有新的Buffer消费时,引用数+1
    2. 当消费者消费完 Buffer时引用数-1
    3. 最终当引用变为0时 Buffer释放重用

NetWorkBuffer同时继承了 AbstractReferenceCountedByteBuf, 具备了计数的能力,并实现了对 MemorySegment的读写


Buffer 资源池

image.png

  1. Buffer 资源池在 Flink 中 叫做 BufferPool

    • BufferPool 用来管理 Buffer包含以下:

      • Buffer申请资源
      • 释放
      • 销毁
      • 可用Buffer通知等
    • 每个 Task 拥有自己的 LocalBufferPool

  2. 为了方便 BufferPool 管理,Flink 设计了 BufferPoolFactory,提供了 Buffer 的创建和销毁,其唯一的实现类是 NetWorkBufferPool

  3. 每个TaskManager 上只有一个 NetWorkBufferPool, 同一个 TaskManager上的Task 共享 NetWorkBufferPool,在TaskManager启动时就会创建 NetWorkBufferPool,为其分配内存,

  4. NetWorkBufferPool持有该 TaskManager在进行数据传输时,所能使用的所有资源 (功能: )

    1. BufferPool的工厂外
    2. Task 所需 MemorySegment的提供者
    3. 每个 Task 的 LocalBufferPool所需的内存也是从 NetWorkBufferPool中申请而来的

内存管理器 MemoryManager

MemoryManager是 Flink 中托管内存的组件,其管理的托管内存只使用堆外内部

  • 在批处理中用在 排序,Hash 表 和中间结果的缓存中
  • 在流中 作为 RocksDBStabackend的内存

需要注意的是

  • 在1.10 之前的版本中, MemoryManager负责 TaskManager的所有内存
  • 在1.10 之后的版本,MemoryManager的管理范围缩小 为 Slot 级别为Task 管理内容

    • TaskManager为每个 Solt 分配相同的内容,Task 不能使用超过其Slot 分配的资源, 目的是为了能够更好的隔离任务,让系统更加稳定
    • 未来的版本可能回采取更好的额Slot 资源使用策略,在资源空闲的时候,允许 Task 中的StreamOperator 申请超过预先分配的资源
  • MemoryManager 主要通过内部接口 MemoryPool 来管理所有的 MemorySegment, 托管内存的管理相比于 Network Buffers 的管理更为简单,而不需要 Buffer这一层的封装

内存申请

  • 在批处理计算中,MemoryManager负责为算子申请堆外内存,最终实际申请的是堆外的 ByteBuffer
    • 申请MemorySegment虽然只有两种类型,实际只使用了 OFF_HEAP
  • 在流处理计算中,MemoryManager更多的是管理

    • 控制RocksDB的内存使用量
    • 通过RocksDB,的Block Cache和 WriteBufferManager 参数来限制
    • 参数的具体值从TaskManager内存配置参数中计算而来
    • RocksDB 自己来负责运行过程中的内存申请和内存释放

      • ROCKSDB 申请内存资源 Code ```java /* 获取由类型字符串标识的共享资源 如果资源已经存在,则返回资源的描述符 如果资源尚不存在,则方法将初始化 使用初始值设定项函数和给定大小的新资源 / public OpaqueMemoryResource getExternalSharedMemoryResource(

        1. String type,
        2. LongFunctionWithException<T, Exception> initializer,
        3. long numBytes) throws Exception {

        // This object identifies the lease in this request. It is used only to identify the release operation. // Using the object to represent the lease is a bit nicer safer than just using a reference counter. final Object leaseHolder = new Object();

        final SharedResources.ResourceAndSize resource =

            sharedResources.getOrAllocateSharedResource(type, leaseHolder, initializer, numBytes);
        

        final ThrowingRunnable disposer = () -> sharedResources.release(type, leaseHolder);

        return new OpaqueMemoryResource<>(resource.resourceHandle(), resource.size(), disposer); }


<a name="7JZXK"></a>
#### 内存释放
Flink 自己管理内存,也就意味着内存的申请和释放由Flink 来负责,触发Java堆外内存释放的行为一般有两种

- 内存使用完毕
- Task 停止(正常或异常)执行

在Flink 中 实现了一个`JavaGcCleanerWrapper` 进行堆外内存的释放,提供了两个`JavaCleaner`

- **LegacyCleanerProvider**
   - 该 CleanerProvider 提供了1.8以下版本的JDK 的Flink 管理的内存的垃圾回收,使用sun.misc.Cleaner来释放内存
```java
private static CleanerProvider createLegacyCleanerProvider() .../
  • Java9CleanerProvider

    • 该 CleanerProvider 提供了1.9及以上版本的JDK 的Flink 管理的内存的垃圾回收,使用java.lang.ref.CLeaner 来释放内存
      private static CleanerProvider createJava9CleanerProvider()
      
  • JavaGcCleanerWrapper会为每个 Owner 创建一个包含 CleanerRunnable 对象,在每个 MemorySegment释放内存的时候,调用此 Cleaner 进行内存的释放

  • MemoryManager 关闭的时候会对所申请的 MemoryManager进行释放,交还给操作系统

  • 在流计算中,当Task 停止执行的时候 RocksDBStateBackend 负责释放物理内存,并将资源归还给 MemoryManager,
  • 资源归还给 MemoryManager只是更新其可用资源的大小数值,并不是对内存的物理操作
      @Override
      public void free() {
          super.free();
          offHeapBuffer = null; // to enable GC of unsafe memory
      }
    

网络缓冲器

NetworkBuffer 是网络交换数据的包装 其对应于 MemorySegment内存段

  • 当结果分区(ResultPartition) 开始写出数据的时候,需要向 LocalBufferPool申请 Buffer资源
  • 使用 BufferBuilder将数据写入 MemorySegment
  • MemorySegment分配完毕后 则会持续等待 Buffer的释放

    申请内存

  1. Buffer 申请

结果分区 ResultPartitioni 申请Buffer进行数据写入

  • LocalBufferPool首先从自身持有的 MemorySegment中分配可用的 Buffer
  • 如果没有则从 TaskManagerNetworkBuffer 中申请
  • 如果没有 则阻塞等待可用的 MemorySegment
  1. MemorySegment 申请

申请 Buffer 本质就是申请 MemorySegment,如果在LocalBufferPool 中,则申请新的堆外内存 MemorySegment


内存回收

Buffer 使用了引用计数机制来判断什么时候可以释放 Buffer 到可用资源池

  • 每创建一个 BufferConsumer 就会对Buffer 的引用计数+1
  • 每个Buffer 被消费完就会对 Buffer 的引用计数-1
  • Buffer 的引用计数为0的时候,就可以回收
  1. Buffer 回收

Buffer 回收之后

  • 并不会释放 MemorySegment
  • 此时 MemorySegment 依然在 LocalBufferPool 资源池中
  • 除非 TaskManager 级别内存不足时,才会释放回 TaskManager 持有的全局资源中

    释放 MemorySegment 的时候,同样要根据 MemorySegment 的类型来进行,并且要在不低于保留内存的情况下,将内存释放回内存段中,变为可用内存,后续申请 MemorySegment 的时候,可以重复利用该内存片段

  1. MemorySegment 释放

NetworkBufferPool 关闭的时候进行内存释放,交还给操作系统

  • 其实就是 通过 JavaGcCleanerWrapper 方式 进行内存释放