配置Flink 进程的内存

Flink JVM 进程的总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。 其中堆外内存包括直接内存(Direct Memory)和本地内存(Native Memory)。
Flink 内存配置 - 图1
配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

配置项 TaskManager 配置参数 JobManager 配置参数
Flink 总内存 taskmanager.memory.flink.size jobmanager.memory.flink.size
进程总内存 taskmanager.memory.process.size jobmanager.memory.process.size

配置 TaskManager 内存

Flink 的 TaskManager 负责执行用户代码。 根据实际需求为 TaskManager 配置内存将有助于减少 Flink 的资源占用,增强作业运行的稳定性。

内存模型

Flink 内存配置 - 图2
如上图所示,下表中列出了 Flink TaskManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分 配置参数 描述
框架堆内存(Framework Heap Memory) taskmanager.memory.framework.heap.size 用于 Flink 框架的 JVM 堆内存(进阶配置)。
任务堆内存(Task Heap Memory) taskmanager.memory.task.heap.size 用于 Flink 应用的算子及用户代码的 JVM 堆内存。
托管内存(Managed memory) taskmanager.memory.managed.size

taskmanager.memory.managed.fraction | 由 Flink 管理的用于排序、哈希表、缓存中间结果及 RocksDB State Backend 的本地内存。 | | 框架堆外内存(Framework Off-heap Memory) | taskmanager.memory.framework.off-heap.size | 用于 Flink 框架的堆外内存(直接内存或本地内存)
(进阶配置)。 | | 任务堆外内存(Task Off-heap Memory) | taskmanager.memory.task.off-heap.size | 用于 Flink 应用的算子及用户代码的堆外内存(直接内存或本地内存)
。 | | 网络内存(Network Memory) | taskmanager.memory.network.min

taskmanager.memory.network.max

taskmanager.memory.network.fraction | 用于任务之间数据传输的直接内存(例如网络传输缓冲)。该内存部分为基于 Flink 总内存
受限的等比内存部分
。 | | JVM Metaspace | taskmanager.memory.jvm-metaspace.size | Flink JVM 进程的 Metaspace。 | | JVM 开销 | taskmanager.memory.jvm-overhead.min

taskmanager.memory.jvm-overhead.max

taskmanager.memory.jvm-overhead.fraction | 用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存
受限的等比内存部分
。 |

框架内存

通常情况下,不建议对框架堆内存框架堆外内存进行调整。 除非你非常肯定 Flink 的内部数据结构及操作需要更多的内存。 这可能与具体的部署环境及作业结构有关,例如非常高的并发度。 此外,Flink 的部分依赖(例如 Hadoop)在某些特定的情况下也可能会需要更多的直接内存或本地内存。

内存计算

假设只配置 Total Process Memory = 2gb
Total Process Memory = Total Flink Memory +JVM Metaspace +JVM Overhead
JVM Metaspace 通过taskmanager.memory.jvm-metaspace.size配置, 默认96mb
JVM Overhead 计算:

配置参数 默认值
taskmanager.memory.jvm-overhead.min 192mb
taskmanager.memory.jvm-overhead.max 1gb
taskmanager.memory.jvm-overhead.fraction 0.1

首先计算 JVM Overhead = Total Process Memory taskmanager.memory.jvm-overhead.fraction = 2gb0.1 = 204.8mb
如果计算出来的JVM Overhead < taskmanager.memory.jvm-overhead.min则为taskmanager.memory.jvm-overhead.min;
如果计算出来的JVM Overhead > taskmanager.memory.jvm-overhead.max则为taskmanager.memory.jvm-overhead.max;
如果计算出来的 taskmanager.memory.jvm-overhead.min <= JVM Overhead <= taskmanager.memory.jvm-overhead.max 则为JVM Overhead .
因为 192mb< 204.8mb <1gb, 所以 JVM Overhead=204.8mb
Total Flink Memory = Total Process Memory - JVM Metaspace- JVM Overhead = 2gb - 96mb - 204.8mb = 1747.2mb
Managed memory = Total Flink Memory taskmanager.memory.managed.fraction = 1747.2mb 0.4 = 698.88mb
Framework Off-heap Memory通过taskmanager.memory.framework.off-heap.size配置, 默认128mb
Task Off-heap Memory 默认为0byte
Network Memory 计算方式同JVM overhead

配置参数 默认值
taskmanager.memory.network.min 64mb
taskmanager.memory.network.max 1gb
taskmanager.memory.network.fraction 0.1

Total Flink Memory taskmanager.memory.network.fraction = 1747.2mb 0.1 = 174.72mb
64mb < 174.72mb < 1gb,所以 Network Memory=174.72mb
Direct Memory = Framework Off-heap Memory + Task Off-heap Memory + Network Memory = 128mb + 0 + 174.72mb = 302.72mb
Off Heap Memory = Managed memory + Direct Memory = 698.88mb + 302.72mb = 1001.6mb
JVM Heap = Total Flink Memory - Off Heap Memory = 1747.2mb - 1001.6mb = 745.6mb 与web-ui显示一致
Framework Heap Memory 通过taskmanager.memory.framework.heap.size配置, 默认128mb
Task Heap 剩下的内存 JVM Heap- Framework Heap Memory = 617.6mb

配置 JobManager 内存

JobManager 是 Flink 集群的控制单元。 它由三种不同的组件组成:ResourceManager、Dispatcher 和每个正在运行作业的 JobMaster。 本篇文档将介绍 JobManager 内存在整体上以及细粒度上的配置方法。

内存模型

Flink 内存配置 - 图3
如上图所示,下表中列出了 Flink JobManager 内存模型的所有组成部分,以及影响其大小的相关配置参数。

组成部分 配置参数 描述
JVM 堆内存 jobmanager.memory.heap.size JobManager 的 JVM 堆内存
堆外内存 jobmanager.memory.off-heap.size JobManager 的堆外内存(直接内存或本地内存)
JVM Metaspace jobmanager.memory.jvm-metaspace.size Flink JVM 进程的 Metaspace。
JVM 开销 jobmanager.memory.jvm-overhead.min

jobmanager.memory.jvm-overhead.max

jobmanager.memory.jvm-overhead.fraction | 用于其他 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。该内存部分为基于进程总内存
受限的等比内存部分
。 |

配置 JVM 堆内存

配置总内存中所述,另一种配置 JobManager 内存的方式是明确指定 JVM 堆内存的大小(jobmanager.memory.heap.size)。 通过这种方式,用户可以更好地掌控用于以下用途的 JVM 堆内存大小。

  • Flink 框架
  • 在作业提交时(例如一些特殊的批处理 Source)及 Checkpoint 完成的回调函数中执行的用户代码

Flink 需要多少 JVM 堆内存,很大程度上取决于运行的作业数量、作业的结构及上述用户代码的需求。
提示 如果已经明确设置了 JVM 堆内存,建议不要再设置进程总内存Flink 总内存,否则可能会造成内存配置冲突。
在启动 JobManager 进程时,Flink 启动脚本及客户端通过设置 JVM 参数 -Xms-Xmx 来管理 JVM 堆空间的大小。 请参考 JVM 参数

配置堆外内存

堆外内存包括 JVM 直接内存本地内存。 可以通过配置参数 jobmanager.memory.enable-jvm-direct-memory-limit 设置是否启用 JVM 直接内存限制。 如果该配置项设置为 true,Flink 会根据配置的堆外内存大小设置 JVM 参数 -XX:MaxDirectMemorySize。 请参考 JVM 参数
可以通过配置参数 jobmanager.memory.off-heap.size 设置堆外内存的大小。 如果遇到 JobManager 进程抛出 “OutOfMemoryError: Direct buffer memory” 的异常,可以尝试调大这项配置。 请参考常见问题
以下情况可能用到堆外内存:

  • Flink 框架依赖(例如 Akka 的网络通信)
  • 在作业提交时(例如一些特殊的批处理 Source)及 Checkpoint 完成的回调函数中执行的用户代码

提示 如果同时配置了 Flink 总内存JVM 堆内存,且没有配置堆外内存,那么堆外内存的大小将会是 Flink 总内存减去JVM 堆内存。 这种情况下,堆外内存的默认大小将不会生效。

调优指南

容器(Container)的内存配置

在容器化部署模式(Containerized Deployment)下(KubernetesYarnMesos),建议配置进程总内存taskmanager.memory.process.size 或者 jobmanager.memory.process.size)。 该配置参数用于指定分配给 Flink JVM 进程的总内存,也就是需要申请的容器大小。
提示 如果配置了 Flink 总内存,Flink 会自动加上 JVM 相关的内存部分,根据推算出的进程总内存大小申请容器。
注意: 如果 Flink 或者用户代码分配超过容器大小的非托管的堆外(本地)内存,部署环境可能会杀掉超用内存的容器,造成作业执行失败。
请参考容器内存超用中的相关描述。

State Backend 的内存配置

在部署 Flink 流处理应用时,可以根据 State Backend 的类型对集群的配置进行优化。

Heap State Backend

执行无状态作业或者使用 Heap State Backend(MemoryStateBackendFsStateBackend)时,建议将托管内存设置为 0。 这样能够最大化分配给 JVM 上用户代码的内存。

RocksDB State Backend

RocksDBStateBackend 使用本地内存。 默认情况下,RocksDB 会限制其内存用量不超过用户配置的托管内存。 因此,使用这种方式存储状态时,配置足够多的托管内存是十分重要的。 如果你关闭了 RocksDB 的内存控制,那么在容器化部署模式下如果 RocksDB 分配的内存超出了申请容器的大小(进程总内存),可能会造成 TaskExecutor 被部署环境杀掉。 请同时参考如何调整 RocksDB 内存以及 state.backend.rocksdb.memory.managed

批处理作业的内存配置

Flink 批处理算子使用托管内存来提高处理效率。 算子运行时,部分操作可以直接在原始数据上进行,而无需将数据反序列化成 Java 对象。 这意味着托管内存对应用的性能具有实质上的影响。 因此 Flink 会在不超过其配置限额的前提下,尽可能分配更多的托管内存。 Flink 明确知道可以使用的内存大小,因此可以有效避免 OutOfMemoryError 的发生。 当托管内存不足时,Flink 会优雅地将数据落盘。

常见问题

IllegalConfigurationException

如果遇到从 TaskExecutorProcessUtilsJobManagerProcessUtils 抛出的 IllegalConfigurationException 异常,这通常说明您的配置参数中存在无效值(例如内存大小为负数、占比大于 1 等)或者配置冲突。 请根据异常信息,确认出错的内存部分的相关文档及配置信息

OutOfMemoryError: Java heap space

该异常说明 JVM 的堆空间过小。 可以通过增大总内存、TaskManager 的任务堆内存、JobManager 的 JVM 堆内存等方法来增大 JVM 堆空间。
提示 也可以增大 TaskManager 的框架堆内存。 这是一个进阶配置,只有在确认是 Flink 框架自身需要更多内存时才应该去调整。

OutOfMemoryError: Direct buffer memory

该异常通常说明 JVM 的直接内存限制过小,或者存在直接内存泄漏(Direct Memory Leak)。 请确认用户代码及外部依赖中是否使用了 JVM 直接内存,以及如果使用了直接内存,是否配置了足够的内存空间。 可以通过调整堆外内存来增大直接内存限制。 有关堆外内存的配置方法,请参考 TaskManagerJobManager 以及 JVM 参数的相关文档。

OutOfMemoryError: Metaspace

该异常说明 JVM Metaspace 限制过小。 可以尝试调整 TaskManagerJobManager 的 JVM Metaspace。

IOException: Insufficient number of network buffers

该异常仅与 TaskManager 相关。
该异常通常说明网络内存过小。 可以通过调整以下配置参数增大网络内存