问题待思考

  • Flink 作业是如何提交到集群的
  • Flink 集群是如何在(各个)资源管理集群上启动起来的
  • Flink 的计算资源是如何分配给作业的
  • Flink 作业提交之后是如何启动的

大体框架
Flink 的 Jar 文件并不是 Flink 集群的可执行文件,需要经过转换之后提交给集群,其转换过程大概分两大步骤

  1. 在 Flink Client 中通过反射启动 Jar 中的 main 方法,生成 Flink StreamGraph \ Job GraphJob Graph 提交给 Flink 集群
  2. Flink 集群收到 Job Graph 之后将 Job Graph 转换成 Execution Graph , 然后开始调度执行,启动成功之后开始消费数据

Flink 的核心执行流程,对用户 API 调用 通过4层抽象 进行转换

  • Stream Graph -> Job Graph -> Execution Graph -> 物理执行拓扑 (Task DAG)

提交流程

  • Flink 作业在开发完毕之后,需要提交到 Flink 集群执行,ClientFrontend 是入口,触发用户开发的 Flink 应用 Jar 文件中的 main 方法,然后交给 pipelineExecutor.execue()方法,最终会选择一个具体的 PiplineExecutor 执行

作业执行可以选择 SessionPer-Job 两种集群模式

  • Session 模式的集群,一个集群中运行多个作业
  • Per-Job 模式的集群,一个集群中只运行一个作业,作业执行完毕则集群销毁

不同模式适用场景

  • DispatcherResourceManager 的占用
  • 集群资源的共享性
  • 执行任务特性 | 运行模式 | 适用场景 | | —- | —- | | Session 模式 | 共享 Dispatcher 和 ResourceManager
    按需申请资源,作业共享集群资源
    适合执行时间短,频繁执行的短任务 | | Per-Job 模式 | 独享 Dispatcher 和 ResourceManager

长周期执行的任务,集群异常影响范围小 |

根据 Flink Client 提交作业之后是否还可以退出 Client 进程,提交模式又可以分为

  • Detached
    • Flink Client 创建完集群之后,可以退出命令行窗口,集群独立运行
  • Attached
    • Flink Client 创建完集群之火,不能关闭命令行窗口,需要与集群之间维持练连接,好处是能够感知集群的退出,集群退出之后还会做一些资源清理的工作

pipelineExecutor

pipelineExecutor 执行器是 Flink Client 生成 JobGraph 之后,将作业提交给集群的重要环节

  • Session 模式 : AbstractSessionClusterExecutor
  • Per-Job 模式 : AbstractJobClusterExecutor
  • MiniCluster 模式 : LocalExecutor

Session 模式

该模式下,作业共享集群资源,作业通过 Http 协议进行提交

Flink 提供的会话模式:

  • Yarn 会话模式
  • K8s 会话模式
  • Standalone
    • Standalone 模式比较特别,Flink 安装在物理机上,不能像在集群上一样,可以随时启动一个新集群,所有的作业共享 Standalone 集群,本质上就是一种 Session 模式,所以不支持 Per-Job 模式
  • Session 模式

    • Yarn 作业提交使用 yarn-session.sh 脚本
    • K8s 作业提交使用 kubernetes-session 脚本
  • 在启动脚本的时候就会检查是否存在已经启动好的 Flink Session 模式集群

    • 如果没有: 则启动一个 Flihk Session 模式集群
    • 然后在 PipelineExecutor 中通过 Dispatcher 提供的 Rest 接口提交 JobGraph
    • Dispatcher 为每个作业启动一个 JobMaster,进入作业执行阶段

Per-Job 模式

该模式下,一个作业一个集群,作业之间相互隔离
在 Flink 1.10 版本中,只有 Yarn 上实现了 Per-Job 模式

  • 疑问: K8s 的 Per-Job ?

Per-Job 模式下,因为不需要共享集群,所以在 PipelineExecutor 中执行作业提交的时候,可以创建集群并将 JobGraph 以及所需的文件等一同提交给 Yarn 集群

  • Yarn 集群在容器中启动 Flink Master 进程 (JobManager 进程)
  • 进行一系列的初始化动作,初始化完毕之后从文件系统中获取 JobGraph
  • 交给 Dispatcher,之后的执行流程与 Session 模式下的执行流程相同

Yarn Session 提交流程

image.png

1. 启动集群

使用 bin/yarn-session.sh 提交会话模式的作业

如果提交到已经存在的集群,则获取 Yarn 集群信息/应用ID/ 并准备提交作业

  • 如果启动新的 Yarn Sesssion 则直接跳到 作业提交阶段
  • 如果没有集群 则创建一个新的 Session 模式的集群

    • 首先将 应用配置 flink-conf.yamllogback.xmllog4j.properties 和 相关文件 Flink Jar ☺ 配置类文件 ☺ 用户 Jar 文件 ☺ JobGraph 对象等信息上传至 分布式存储 (HDFS) 的应用暂存目录

    • 通过 Yarn Client 向 Yarn 提交 Flink 创建集群的申请,Yarn 分配资源,在申请的Yarn Container 中初始化并启动 Flink JobManager 进程,在 Job Manager 进程中运行 YarnSessionClusterEntrypoint 作为集群的入口

    • 初始化 Dispatcher / ResourceManager,启动相关的 RPC 服务等待 Client 通过 Rest 接口提交作业

2. 作业提交

Yarn 集群准备好后,开始作业提交
Flink Client 通过 Rest 向 Dispatcher 提交JobGraph

DispatcherRest 接口

  • 不负责实际的调度/执行方面的工作,
  • 当收到 JobGraph 后,为作业创建一个 JobMaster,将工作交给 JobMaster
    • (负责 作业调度,管理作业 和 Task 的生命周期)
  • 构建 ExecutionGraph(第三层物理Graph)

这两个步骤完成后,作业进入调度执行阶段

3.作业调度执行

JobMasterYarnResourceManager申请资源,开始调度 ExecutionGraph 的执行,向 YarnResourceManager申请资源

  • 初次提交作业集群中尚没有 TaskManager 此时资源不足,需要先申请资源

YarnResourceManager 收到 JobMaster 的资源请求,如果当前有空闲 Slot 则将 Slot 分配给 JobMaster , 否则 YarnReSourceManager 将向 Yarn Master 请求创建 TaskManager

YarnResourceManager将资源请求加入等待请求队列,并通过心跳向 YARN ResourceManager 申请新的 Container 资源来启动 TaskManager进程,Yarn 分配新的 ContainerTaskManager

YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需的相关资源,在容器中启动 TaskManager

TaskManager 启动后 向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResourceManager

ResourceManager 从等待队列中取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManagerSlot 分配给了哪个 JobMaster

TaskManagerJobMaster 提供 Slot,JobMaster 调度 TaskTaskManagerSlot 上执行


Yarn Per-Job 提交流程

image.png

1. 启动集群

使用 ./flink run -m yarn-cluster 提交 Per-Job 模式的作业
Yarn 启动 Flink 集群,该模式下 Flink 集群的入口是 yarnJobClusterEntryPoint

2. 作业提交

该步骤与 Session 模式不同, Client 并不会通过 Rest 向 Dispatcher 提交 JobGraph,由 Dispatcher 从本地文件系统获取 JobGraph , 剩下的一样

3.作业调度执行


K8s Session 提交流程

image.png
image.png

1. 启动集群

Flink 客户端首先连接 Kubernetes API Server,提交 Flink 集群的资源描述文件

  • flink-configuration-configmap.yaml / jobmanager-service.yaml / jobmanager-deployment.yaml / taskmanager-deployment.yaml

Kubernetes Master 会根据这些资源描述文件去创建对应的 Kubernetes 实体
JobMaster 部署为例

  • Kubernetes 集群中的某个节点收到请求后
  • Kubelet 进程会从中央仓库下载 Flink 镜像,准备和挂载卷,然后执行启动命令
  • Pod 启动后 Flink Master(JobMaster) 进程随之启动
  • 初始化 DispatcherKubernetesResourceManager 并通过 K8s服务对外暴露Flink Master 端口
    • K8s 服务类似于路由

两个步骤完成之后,Session 模式的集群就创建成功了,集群可以接收作业提交请求,但是此时还没有JobMaster/TaskManager, 当提交作业需要执行时,才会按需创建

2. 作业提交

Client 用户可以通过 Flink 命令行 (Flink Client),向这个会话模式的集群提交任务,此时 JobGraph 会在 Flink Client 端生成,然后和用户 Jar 包一起通过 RestClient 上传

作业提交成功后,Dispatcher 会为每个作业启动一个 JobMaster,将 JobGraph 交给 JobMaster 调度执行

这两个步骤完成后,作业进入调度执行阶段

3.作业调度执行

K8s Session 模式集群下, ResourceManagerK8s Master 申请和释放 TaskManager,除此之外 作业的调度与执行和Yarn 模式一样

JobMasterKubernetesResourceManager 申请 Slot

KubernetesResourceManagerKubernetes 集群分配 TaskManager

  • 每个TaskManager 都是具有唯一标识的 Pod
  • KubernetesResourceManager 会为 TaskManager 生成一份新的配置文件
  • 里面有 Flink Master 上的 service name 作为地址
  • 这样在 Flink Master failover 之后TaskManager 仍然可以重新连上

Kuberbetes 集群分配一个新的 Pod 后,在上面启动 TaskManager

TaskManager 启动后注册到 SlotManager

SlotManagerTaskManager 请求 Slot

TaskManager 提供 SlotJobMaster 然后任务就被分配到 Slot 上运行


Graph

1. 流计算应用的 Graph 转换

  • 对于流计算应用来说,首先要将 DataStream API 的调用转换为 Transformation 然后经过StreamGraph

-> JobGraph -> ExecutionGraph ->Flink 调度执行

2. 批计算应用的 Graph 转换

  • 对于批计算应用来说,首先将 DataSet API 的调用转换为 OptimizedPlan 然后转换为 JobGraph, 批处理和流处理在 JobGraph 上完成了统一

3. Table & SQL API 的 Graph 转换

  • Blink Table Planner中,批处理和流处理都依赖于流计算体系,所以无论是 流计算还是批处理 与 流计算应用的 Graph 转换过程都是一样的
    • 在旧的 Flink Table Planner 中,流计算依赖于 DataStream API,其转换过程就是流式计算应用的 Graph 转换过程
    • 批处理依赖于 DataSet API, 所以其转换过程就是批式计算应用的 Graph 转换过程

流图 (StreamGraph)

  • 使用 DataStream API开发的应用程序,首先被转换为 Transformation
  • 然后被映射为 StreamGraph

image.png

  • StreamGraph | StreamNode | StreamEdge

StreamGraph 核心对象

StreamNode

StreamNodeStreamGraph 中的节点

  • Transformation 转换而来,可以简单理解为 一个 StreamNode 表示一个算子
  • 从逻辑上来说

    • StreamNodeStreamGraph 中存在实体和虚拟的 StreamNode
    • StreamNode 可以有多个输入 也可以有多个输出
  • 实体的 StreamNode 会最终变成物理的算子

  • 虚拟的 StreamNode 会附着在 StreamEdge

StreamEdge

StreamEdgeStreamGraph 中的边,用来连接两个 StreamNode

  • 一个 StreamEdge 可以有多个出边 , 入边
  • StreamEdge 中包含了
    • 旁路输出
    • 分区器
    • 字段筛选输出 (与 SQL Select 中 选择字段的逻辑一样)
  • 等信息

    StreamGraph 生成过程

    StreamGraphFlink Client 中生成,由 Flink Client 在提交的时候触发 Flink 应用的 main 方法
    用户编写的业务逻辑组装成 Transformation Pipeline 在最后调用 StreamExecutionEnvironment.execute() 的时候 开始触发 StreamGraph 的构建

    Code

  • StreamGraph 入口

  • StreamGraphGenerator 生成 StreamGraph
  • StreamGraphGenerator 转换 Transformation
  • 单输入 Transformation 的转换 (Node)
  • 虚拟 Transformation 的转换 (Edge)

作业图 (JobGraph)

Job Graph 可以由流计算的 StreamGraph 和 批处理的 OptimizedPlan转换而来

  • 流计算中: 在 StreamGraph 的基础上进行了一些优化 通过 OperatorChain 机制 将算子合并起来

    • 在执行时调度在同一个 Task 线程上
    • 避免数据的跨线程
    • 跨网络的传递
  • Job Graph 中 实现了 流和批的统一表达,从 JobGraph的图里 可以看到,数据从上一个算子流到下一个算子的过程中,上游作为生产者提供了中间数据集(IntermediateDataset)而下游作为消费者需要 JobEdge

  • JobEdge 是一个通信管道,连接了上游生产的中间数据集和下游的 JobVertex 节点

image.png

  • JobVertex | JobEdge | IntermediateDataSet

JobGraph 核心对象

JobVertex

经过算子融合优化后符合条件的多个 StreamNode 可能会融合在一起生成一个 JobVertex
即一个JobVertex 包含一个或多个算子,JobVertex 的输入是 JobEdge 输出是 IntermediateDataSet

JobEdge

JobEdgeJobGraph 中连接 JobVertexIntermediateDataSet 的边,表示 JobGraph 中的一个数据流转通道
其上游数据源是 IntermediateDataSet 下游消费者是 JobVertex
即数据通过 JobEdgeIntermediateDataSet 传递给目标 JobVertex

IntermediateDataSet

中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出
即该 JobVertex 中包含的算子会产生的数据集
不同的执行模式下,其对应的结果分区类型不同,决定了在执行时数据交换的模式
IntermediateDataSet 的个数与该 JobVertex 对应的 StreamNode 的出边数量相同,可以是一个或多个


JobGraph 生成过程

JobGraph 的生成入口是在 StreamGraph 中,流计算的 JobGraph 和批处理的 JobGraph 的生成逻辑不同

  • 对于流而言: 使用的是 StreamingJobGraphGenerator
  • 对于批而言: 使用的是 JobGraphGenerator

    Code

  • StreamGraph 中触发生成 JobGraph

  • StreamingJobGraphGenerator 生成 JobGraph 预处理
  • 构建 JobGraph 的 点和边
  • JobGraph 构建与 OperatorChain 优化

Operator chain 融合

为了更好的分布式执行,Flink 会尽可能地将多个算子融合在一起,形成一个 OperatorChain,一个 OperatorChain 在同一个 Task 线程内执行
OperatorChain 内的算子之间,在同一个线程内通过方法调用的的方式传递数据 好处有:

  • 减少线程之间的切换
  • 减少消息的序列化/反序列化
  • 无序借助内存缓冲区
  • 无需通过网络在算子间传递数据
  • 可以减少延迟的同时提高整体的吞吐量

Operator chain 条件

  • 上下游的并行度一致
  • 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
  • 上下游节点都在同一个 slot group 中
  • 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
  • 上游节点的 chain 策略为 ALWAYSHEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
  • StreamEdge 数据分区方式是 forward
  • 用户没有禁用 chain

执行图 (Execution)

ExecutioGraph 是调度 Flink 作业执行的核心数据结构,包含了

  • 作业中所有并行执行的 Task 信息
  • Task 之间的关联关系
  • 数据流转关系

StreamGraph JobGraph 在 Flink 客户端中生成,然后提交给 Flink 集群 JobGraphExecutionGraph 的转换在 JobMaster 中完成,转换过程中的重要变化有

1) 加入了并行度的概念,成为真正可调度的图结构
2) 与 JobGraph 的对应关系为:

  • JobVertex 对应的 ExecutionJobVertexExecutionVertex
  • IntermediateDataSet 对应的 IntermediateResultIntermediateResultPartition

  • 每个 Task 对应一个 ExecutionGraph 的一个 ExecutionVertex

    • ExecutionVertex
    • TaskInputGate | InputChannel | ResultPartition 对应了 IntermediateResultExecutionEdge

image.png

ExecutionGraph 核心对象

ExecutionJobVertex

ExecutionJobVertexJobGraph 中的 JobVertex 一一对应
数量与 JobVertex 中所包含的 StreamNode 的并行度一致

ExecutionJobVertex 用来将一个 JobVertex 封装成 ExecutionJobVertex

  • 并依次创建
    • ExecutionVertex
    • Execution
    • IntermediateResult
    • IntermediateResultPartirion

ExecutionJobVertex 的构造函数中,首先是依据对应的 JobVertex 的并发度,生成对应个数的 ExecutionVertex
其中一个 ExecutionVertex 代表一个 ExecutionJobVertex 的并发子 Task

然后是将原来 JobVertex 的中间结果 IntermediateDataSet 转换为 ExecutionGraph 中的 IntermediateResult


ExecutionVertex

ExecutionJobVertex 中会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex

构建 ExecutionVertex 的同时, 也会构建 ExecutionVertex 的输出 IntermediateResult, 并且将 ExecutionVertex 的输出为 IntermediateResultPartition

ExecutionVertex 的构造函数中,首先会创建 IntermediateResultPartition 并通过 IntermediateResult.setPartition() 建立 IntermediateResultIntermediateResultPartition 之间的关系,然后生成 Execution 并配置资源

Execution

Execution Vertex 相当于每个 Task 执行的模板,在真正执行的时候,会将 ExecutionVertex 中的信息包装为一个 Execution 执行一个 ExecutionVertex

JobManagerTaskManager 之间关于 Task 的部署 和 Task 执行状态的更新都是通过 ExecutionAttemptID 来标识 实例的, 在发生故障或者数据需要重新计算的情况下, ExecutionVertex 可能会有多个 ExecutionAttemptID
一个 Execution 通过 ExecutionAttemptID 来唯一标识

IntermediateResult

IntermediateResult 又叫做中间结果集,该对象是个逻辑概念,表示 ExecutionJobVertex 的输出和 JobGraph 中的 IntermediateDataSet 一一对应
同样一个 ExecutionJobVertex 可以有多个中间结果 取决于当前 JobVertex 有几个出边 (JobEdge)

一个中间结果集包含了多个中间结果分区 IntermediateResultPartition 其个数等于 该 JobVertex 的并发度,或者叫做算子的并行度

IntermediateResultPartition

IntermediateResultPartition 又叫做中间结果分区,表示1个 ExecutionVertex 输出结果与 ExecutionEdge 相关联

ExecutionEdge

ExecutionEdge 表示 ExecutionVertex 的输入,连接到上游产生的 IntermediateResultPartition
一个 Execution 对应于唯一的 1个 IntermediateResultPartition 和 1个 ExecutionVertex
一个 ExecutionVertex 可以有多个 ExecutionEdge

image.png


ExecutionGraph 生成过程

初始化作业调度器的时候,根据 JobGraph 生成 ExecutionGraph
ScheduleBase 的构造方法中触发构建,最终调用 SchedulerBase.createExecutionGraph() 触发实际的构建操作,使用 ExecutionGraphBuilder 构建 ExecutionGraph

Code

  • ExecutionGraph 构建入口
  • 构建 ExecutionGraph 的核心逻辑
  • 构建 ExecutionGraph 节点
  • 构造 ExecutionEdge

    • 点对点连接 < DistributionPattern.POINTWISE >

      • 一对一连接 并发的 Task 数量与分区数相等

      • 多对一连接 下游的 Task 数量小与上游的分区数

      • 一对多连接 下游的 Task 数量多于上游的分区数

    • 全连接 < DistributionPattern.ALL_TO_ALL >

      • shuffle