• 利用 GPU 进行加速 (CPU/GPU . 内存/Optane . SSD/DISK)

  • 不同类型的计算任务使用不同的计算引擎

  • 不同用户使用不同的资源集

计算任务可以分为

  • IO 密集型
  • CPU 密集型
  • 内存密集型

image.png
资源抽象分为两个层次

  • 集群资源抽象
    • 使用资源管理器 ResourceManager 来解耦 Flink 和资源管理集群 (Yarn,K8s,Mesos 等)
  • Flink 自身资源抽象
    • Flink 自身的资源使用 Slot 精细地划分计算资源
    • 作业能够充分利用计算资源
    • 同时使用 Slot 共享进一步提高资源利用效率

资源抽象

Flink 涉及的资源分为两大类
集群资源

  • 集群资源管理的是硬件资源
    • CPU
    • 内存
    • GPU等
  • 由资源管理框架(Yarn,K8s,Mesos) 来管理, Flink 从资源管理框架申请和释放资源

Flink 自身资源

  • Flink 从资源管理框架申请资源容器(YarnContainer 或者 K8sPod) , 1个容器中运行1个 TaskManager 进程,容器的资源对于 Flink 来说粒度比较大,所以可能无法充分利用资源,所以单个容器会被多个 Flink 的任务共享
  • Flink 对申请到的资源进行切分,每一份叫做 Task Slot

📘 <Slot Manager / Pool> - 图3

在资源管理中涉及了

  • JobMaster ( Slot 资源的使用者,向 ResourceManager 申请资源 )
  • ResourceManager ( 负责分配资源和资源不足时申请资源,资源空闲时释放资源 )
  • TaskManager ( Slot 资源的持有者,在 Slot 清单中记录了 Slot 分配给了哪个作业的哪个 Task )

ResourceManager

资源管理器

资源管理器在 Flink 中叫做 ResourceManager
Flink 同时支持不同的资源集群类型
ResourceManager 位于 Flink 和资源管理集群 (Yarn / K8s) 之间
Flink 集群级资源管理的抽象,主要作用有:

  • 申请容器启动新的 TM , 或者为作业申请 Slot
  • 处理 JobManagerTaskManager 的异常退出
  • 缓存 TaskManager,等待一段时间之后再释放掉不用的容器,避免资源反复的重复释放
  • JobManagerTaskManager 的心跳感知,对 JobManagerTaskManager 的退出进行对应的处理

Flink 中有四种 ResourceManager 分别对应于不同的资源管理框架

  • YarnResourceManager

    • Yarn 资源管理器,对于在 Yarn 上启动的和运行的 Flink 任务,能够实现动态的 资源申请和释放
  • KubernetesResourceManager

  • StandaloneResourceManager
  • MesosResourceManager

Slot Manager <管理器>

Slot ManagerResourceManager 的组件

  • 维护当前有多少 TaskManager
  • 每个 TaskManager 有多少空闲的 Slot
  • Slot 等资源的使用情况
  • 当作业调度执行时,根据 Slot 分配策略为 Task 分配执行的位置

Slot Manager 面向不同的对象提供不同的功能

  • TaskManager 提供注册、取消注册、空闲退出等管理动作、注册则集群可用的 Slot 变多、取消注册、空闲退出则释放资源、还给资源管理器

  • Flink 作业

  • 接收 Slot 的请求和释放、资源汇报等
  • 当资源不足的时候 SlotManager 将资源请求暂存在资源队列中
    • SlotManager 通知 ResourceManager 去申请更多的资源
    • 启动了新的 TaskManager, TaskManager 注册到 SlotManager之后
    • SlotManager 就可以有更多可用的新资源,从等待队列中依次分配资源

Job Master

Slot Pool <资源池>

Slot PoolJobMaster 中记录当前作业从 TaskManager 获取到的 Slot 集合
JobMaster 的调度器首先会从 SlotPool 中 获取 Slot 来调度任务, SlotPool 在没有足够的 Slot 资源执行作业的时候,首先会尝试从 ResourceManager 中获取资源,如果 ResourceManager 当前不可用, ResourceManager 拒绝资源请求或者请求超时,资源申请失败,则作业启动失败

JobMaster 申请到资源之后,会在本地持有 Slot, 避免 ResourceManager 异常导致作业运行失败
对于批处理而言,持有资源 JobMaster 首先可以避免多次向 ResourceManager 申请资源,同时 ResourceManager 不可用也不会影响作业的继续执行,只有资源不足时才会导致作业执行失败
当作业已经执行完毕或者作业完全启动且资源有剩余时,JobMaster 会将剩余资源交还给 ResourceManager


Slot 共享组与 Slot 共享管理器

Slot 共享管理器 <SlotSharingManager>

  • 管理共享资源与分配

Slot 共享组 <SlotSharingGroup> 一对一的关系 (1个 共享组 对应一个 共享管理器)两者在作业调度执行的时候发挥作用,部署 Task 之前,选择 > Slot 确定 > Task 发布到哪个 > TaskManager 共享组

  • SlotSharingGroup <非强制性共享约束>

非强制性共享约束, Slot 共享根据组内的 JobVertex ID , 查找是否已有可以共享的 Slot,只要确保相同的 JobVertex ID 不出现在一个共享的 Slot 中即可
在符合资源要求的 Slot 中,找到没有相同的 JobVertex IDSlot ,根据 Slot 选择策略选择一个 Slot 即可,如果没有符合条件的,则申请新的 Slot

  • CoLocationGroup <本地约束共享组>

本地约束共享组,具有强制性的 Slot 共享限制
CoLocationGroup 用在迭代计算中

迭代计算中的 Task 必须共享同一个 TaskManagerSlot ,CoLocationGroup 可以看做是 SlotSharingGroup 的特例

JobGraphExecutionGraph 的转换过程中,为每一个 ExecutionVertex 赋予了按照并行度编写的编号,相同编号的 ExecutionVertex 会被放入本地共享约束组中,共享相同的 CoLocationConstraint 对象,在调度的时候根据编号就能找到本组其他 TaskSlot 信息

CoLocation 共享根据组内每个 ExecutionVertex 关联的 CoLocationConstraint 查找是否有相同的 CoLocationConstraint 约束已分配 Slot 可用,在调度作业执行的时候,首先要找到本约束中其他 Task 部署的 TaskManager ,如果没有则申请一个新的 Slot,如果有则共享该 TaskManager 上的 Slot


Slot selection strategy <选择策略>

Flink 在决定 Task 运行在哪个 TaskManager 上时,会根据策略进行选择,选择 Slot 的时候有不同的选择策略,
SlotSelectionStrategy 就是策略定义的接口

选择策略从总体上分为两大类
1) 位置优先的选择 LocationPreferenceSlotSelectionStrategy

  • 默认策略

    • DefalutLocationPreferenceSlotSelectionStrategy
    • 该策略不考虑资源的均衡分配,会从满足条件的可用 Slot 集合选择第1个 以此类推
  • 均衡策略

    • EvenlySpreadOutLocationPreferenceSlotSelectionStrategy
    • 该策略考虑资源的均衡分配,会从满足条件的可用 Slot 集合中选择剩余资源最多的 Slot,尽量让各个 TaskManager 均衡的承担计算压力

2) 已分配 Slot 优先的选择策略 PreviousAllocationSlotSelectionStrategy

  • 如果当前没有空闲的已分配的 Slot,则依然会使用位置优先的策略来分配和申请 Slot

SlotProvider <请求方式>

SlotProvider 接口定义了 Slot 请求行为,支持两种请求方式

  • 立即响应模式: Slot 请求会立即执行
  • 排队模式: 排队等待可用 Slot,当资源可用时分配资源

最终的实现在 SchedulerImpl 中,其中 Scheduler 接口增加了 SlotSelectionStrategy


Slot

processes.svg

Slot 共享

TaskManager 是一个JVM进程,并且可以在单独的线程中执行一个或多个子任务。为了控制一个 TaskManager 接受多少个任务,一个 TaskManager 有一个所谓的 Slot(至少一个)。

每个 Slot 代表 TaskManager 的资源的固定子集。例如,具有三个 SlotTaskManager 会将其托管内存的1/3专用于每个Slot

分配资源意味着子任务不会与其他作业的子任务竞争托管内存,而是具有一定数量的保留托管内存。请注意,此处没有发生CPU隔离。当前插槽仅将任务的托管内存分开。

通过调整任务槽的数量,用户可以定义子任务如何相互隔离。每个 TaskManager 具有一个插槽,意味着每个任务组都在单独的JVM中运行(例如,可以在单独的容器中启动)。具有多个插槽意味着更多子任务共享同一JVM。

同一JVM中的任务共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。


Slot 共享的优点

默认情况下,Flink 作业共享同一个 SlotSharingGroup
同一个作业中来自不同 JobVertexTask 可以共享作业
使用 Slot 共享 可以在一个 Slot 中运行 Task 组成的任务
共享 Slot 的好处 :

1) : 资源分配简单
.Flink 集群需要的 Slot 数量和作业中的最高并行度一致,不需要计算一个程序总共包含多少个 Task

Flink集群需要的 Slot 数与作业中使用的最高并行度完全相同。不需要计算程序总共包含多少个任务(并行度不同)。

2): 资源利用率高
多个task运行在同一个JVM中, 而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。


Slot 资源申请

  • 单独 Slot 资源申请

该类型的 Slot 申请首先会从 JobManager 的当前 SlotPool 中尝试获取资源
如果资源不足

  • 则从 SlotPool 中申请新的 Slot ,然后 SlotPoolResourceManager 请求新的 Slot

  • 共享 Slot 资源申请

共享 Slot 在申请的时候,需要向 SlotSharingManager 请求资源
如果有 CoLocation限制,

  • 则申请 CoLocation MultiTaskSlot 否则申请一般的 MutilTaskSlot

TaskSlot

  • MultiTaskSlot
    • MultiTaskSlot 中包含了一组 TaskSlot
  • SingleTaskSlot
    • 运行单个 TaskSlot ,每个 SingleTaskSlot 对应于一个 LogicalSlot

借助 SingleTaskSlotMultiTaskSlot Flink 实现了 一般 Slot 共享和 CoLocationGroup


Coding - Chain