问题待思考
- Flink 作业是如何提交到集群的
- Flink 集群是如何在(各个)资源管理集群上启动起来的
- Flink 的计算资源是如何分配给作业的
- Flink 作业提交之后是如何启动的
大体框架
Flink 的 Jar 文件并不是 Flink 集群的可执行文件,需要经过转换之后提交给集群,其转换过程大概分两大步骤
- 在 Flink Client 中通过反射启动 Jar 中的 main 方法,生成
Flink StreamGraph
\Job Graph
将Job Graph
提交给 Flink 集群 - Flink 集群收到
Job Graph
之后将Job Graph
转换成Execution Graph
, 然后开始调度执行,启动成功之后开始消费数据
Flink 的核心执行流程,对用户 API 调用 通过4层抽象 进行转换
Stream Graph
->Job Graph
->Execution Graph
-> 物理执行拓扑 (Task DAG
)
提交流程
- Flink 作业在开发完毕之后,需要提交到 Flink 集群执行,
ClientFrontend
是入口,触发用户开发的 Flink 应用 Jar 文件中的 main 方法,然后交给pipelineExecutor.execue()
方法,最终会选择一个具体的PiplineExecutor
执行
作业执行可以选择 Session
和 Per-Job
两种集群模式
Session
模式的集群,一个集群中运行多个作业Per-Job
模式的集群,一个集群中只运行一个作业,作业执行完毕则集群销毁
不同模式适用场景
Dispatcher
和ResourceManager
的占用- 集群资源的共享性
- 执行任务特性
| 运行模式 | 适用场景 |
| —- | —- |
| Session 模式 | 共享 Dispatcher 和 ResourceManager
按需申请资源,作业共享集群资源
适合执行时间短,频繁执行的短任务 | | Per-Job 模式 | 独享 Dispatcher 和 ResourceManager
长周期执行的任务,集群异常影响范围小 |
根据 Flink Client
提交作业之后是否还可以退出 Client 进程,提交模式又可以分为
- Detached
Flink Client
创建完集群之后,可以退出命令行窗口,集群独立运行
- Attached
Flink Client
创建完集群之火,不能关闭命令行窗口,需要与集群之间维持练连接,好处是能够感知集群的退出,集群退出之后还会做一些资源清理的工作
pipelineExecutor
pipelineExecutor
执行器是 Flink Client 生成 JobGraph 之后,将作业提交给集群的重要环节
Session
模式 :AbstractSessionClusterExecutor
Per-Job
模式 : AbstractJobClusterExecutorMiniCluster
模式 : LocalExecutor
Session 模式
该模式下,作业共享集群资源,作业通过 Http 协议进行提交
Flink 提供的会话模式:
- Yarn 会话模式
- K8s 会话模式
Standalone
Standalone
模式比较特别,Flink 安装在物理机上,不能像在集群上一样,可以随时启动一个新集群,所有的作业共享Standalone
集群,本质上就是一种Session
模式,所以不支持Per-Job
模式
在
Session
模式- Yarn 作业提交使用
yarn-session.sh
脚本 - K8s 作业提交使用
kubernetes-session
脚本
- Yarn 作业提交使用
在启动脚本的时候就会检查是否存在已经启动好的
Flink Session
模式集群- 如果没有: 则启动一个
Flihk Session
模式集群 - 然后在
PipelineExecutor
中通过Dispatcher
提供的Rest
接口提交JobGraph
Dispatcher
为每个作业启动一个JobMaster
,进入作业执行阶段
- 如果没有: 则启动一个
Per-Job 模式
该模式下,一个作业一个集群,作业之间相互隔离
在 Flink 1.10 版本中,只有 Yarn 上实现了 Per-Job
模式
- 疑问: K8s 的
Per-Job
?
Per-Job
模式下,因为不需要共享集群,所以在 PipelineExecutor
中执行作业提交的时候,可以创建集群并将 JobGraph
以及所需的文件等一同提交给 Yarn 集群
- Yarn 集群在容器中启动 Flink Master 进程 (JobManager 进程)
- 进行一系列的初始化动作,初始化完毕之后从文件系统中获取
JobGraph
- 交给
Dispatcher
,之后的执行流程与Session
模式下的执行流程相同
Yarn Session 提交流程
1. 启动集群
使用 bin/yarn-session.sh
提交会话模式的作业
如果提交到已经存在的集群,则获取 Yarn 集群信息/应用ID/ 并准备提交作业
- 如果启动新的
Yarn Sesssion
则直接跳到 作业提交阶段 如果没有集群 则创建一个新的 Session 模式的集群
首先将 应用配置
flink-conf.yaml
☺logback.xml
☺log4j.properties
和 相关文件Flink Jar
☺ 配置类文件 ☺ 用户 Jar 文件 ☺JobGraph
对象等信息上传至 分布式存储 (HDFS) 的应用暂存目录通过
Yarn Client
向 Yarn 提交 Flink 创建集群的申请,Yarn 分配资源,在申请的Yarn Container
中初始化并启动Flink JobManager
进程,在Job Manager
进程中运行YarnSessionClusterEntrypoint
作为集群的入口初始化
Dispatcher
/ResourceManager
,启动相关的 RPC 服务等待Client
通过 Rest 接口提交作业
2. 作业提交
Yarn 集群准备好后,开始作业提交Flink Client
通过 Rest 向 Dispatcher
提交JobGraph
Dispatcher
是 Rest
接口
- 不负责实际的调度/执行方面的工作,
- 当收到
JobGraph
后,为作业创建一个JobMaster
,将工作交给JobMaster
- (负责 作业调度,管理作业 和 Task 的生命周期)
- 构建
ExecutionGraph
(第三层物理Graph)
这两个步骤完成后,作业进入调度执行阶段
3.作业调度执行
JobMaster
向 YarnResourceManager
申请资源,开始调度 ExecutionGraph
的执行,向 YarnResourceManager
申请资源
- 初次提交作业集群中尚没有
TaskManager
此时资源不足,需要先申请资源
YarnResourceManager
收到 JobMaster
的资源请求,如果当前有空闲 Slot
则将 Slot
分配给 JobMaster
, 否则 YarnReSourceManager
将向 Yarn Master
请求创建 TaskManager
YarnResourceManager
将资源请求加入等待请求队列,并通过心跳向 YARN ResourceManager
申请新的 Container
资源来启动 TaskManager
进程,Yarn 分配新的 Container
给 TaskManager
YarnResourceManager
启动,然后从 HDFS 加载 Jar 文件等所需的相关资源,在容器中启动 TaskManager
TaskManager
启动后 向 ResourceManager
注册,并把自己的 Slot
资源情况汇报给 ResourceManager
ResourceManager
从等待队列中取出 Slot
请求,向 TaskManager
确认资源可用情况,并告知 TaskManager
将 Slot
分配给了哪个 JobMaster
TaskManager
向 JobMaster
提供 Slot
,JobMaster
调度 Task
到 TaskManager
的 Slot
上执行
Yarn Per-Job 提交流程
1. 启动集群
使用 ./flink run -m yarn-cluster
提交 Per-Job
模式的作业
Yarn 启动 Flink 集群,该模式下 Flink 集群的入口是 yarnJobClusterEntryPoint
2. 作业提交
该步骤与 Session
模式不同, Client 并不会通过 Rest 向 Dispatcher
提交 JobGraph
,由 Dispatcher
从本地文件系统获取 JobGraph
, 剩下的一样
3.作业调度执行
略
K8s Session 提交流程
1. 启动集群
Flink 客户端首先连接 Kubernetes API Server
,提交 Flink 集群的资源描述文件
flink-configuration-configmap.yaml
/jobmanager-service.yaml
/jobmanager-deployment.yaml
/taskmanager-deployment.yaml
Kubernetes Master
会根据这些资源描述文件去创建对应的 Kubernetes
实体
以 JobMaster
部署为例
Kubernetes
集群中的某个节点收到请求后Kubelet
进程会从中央仓库下载 Flink 镜像,准备和挂载卷,然后执行启动命令- Pod 启动后
Flink Master
(JobMaster) 进程随之启动 - 初始化
Dispatcher
和KubernetesResourceManager
并通过 K8s服务对外暴露Flink Master
端口- K8s 服务类似于路由
两个步骤完成之后,Session
模式的集群就创建成功了,集群可以接收作业提交请求,但是此时还没有JobMaster
/TaskManager
, 当提交作业需要执行时,才会按需创建
2. 作业提交
Client 用户可以通过 Flink 命令行 (Flink Client),向这个会话模式的集群提交任务,此时 JobGraph
会在 Flink Client
端生成,然后和用户 Jar 包一起通过 RestClient
上传
作业提交成功后,Dispatcher
会为每个作业启动一个 JobMaster
,将 JobGraph
交给 JobMaster
调度执行
这两个步骤完成后,作业进入调度执行阶段
3.作业调度执行
K8s Session
模式集群下, ResourceManager
向 K8s Master
申请和释放 TaskManager
,除此之外 作业的调度与执行和Yarn 模式一样
JobMaster
向 KubernetesResourceManager
申请 Slot
KubernetesResourceManager
从 Kubernetes
集群分配 TaskManager
- 每个
TaskManager
都是具有唯一标识的 Pod KubernetesResourceManager
会为TaskManager
生成一份新的配置文件- 里面有
Flink Master
上的service name
作为地址 - 这样在
Flink Master failover
之后TaskManager
仍然可以重新连上
Kuberbetes
集群分配一个新的 Pod 后,在上面启动 TaskManager
TaskManager
启动后注册到 SlotManager
SlotManager
向 TaskManager
请求 Slot
TaskManager
提供 Slot
给 JobMaster
然后任务就被分配到 Slot
上运行
Graph
1. 流计算应用的 Graph 转换
- 对于流计算应用来说,首先要将
DataStream
API
的调用转换为Transformation
然后经过StreamGraph
-> JobGraph
-> ExecutionGraph
->Flink 调度执行
2. 批计算应用的 Graph 转换
- 对于批计算应用来说,首先将
DataSet
API
的调用转换为OptimizedPlan
然后转换为JobGraph
, 批处理和流处理在JobGraph
上完成了统一
3. Table & SQL API 的 Graph 转换
Blink Table Planner
中,批处理和流处理都依赖于流计算体系,所以无论是 流计算还是批处理 与 流计算应用的Graph
转换过程都是一样的- 在旧的
Flink Table Planner
中,流计算依赖于DataStream
API
,其转换过程就是流式计算应用的Graph
转换过程 - 批处理依赖于
DataSet
API
, 所以其转换过程就是批式计算应用的Graph
转换过程
- 在旧的
流图 (StreamGraph)
- 使用
DataStream
API
开发的应用程序,首先被转换为Transformation
- 然后被映射为
StreamGraph
StreamGraph
|StreamNode
|StreamEdge
StreamGraph 核心对象
StreamNode
StreamNode
是 StreamGraph
中的节点
- 从
Transformation
转换而来,可以简单理解为 一个StreamNode
表示一个算子 从逻辑上来说
StreamNode
在StreamGraph
中存在实体和虚拟的StreamNode
StreamNode
可以有多个输入 也可以有多个输出
实体的
StreamNode
会最终变成物理的算子- 虚拟的
StreamNode
会附着在StreamEdge
上
StreamEdge
StreamEdge
是 StreamGraph
中的边,用来连接两个 StreamNode
- 一个
StreamEdge
可以有多个出边 , 入边 StreamEdge
中包含了- 旁路输出
- 分区器
- 字段筛选输出 (与 SQL Select 中 选择字段的逻辑一样)
-
StreamGraph 生成过程
StreamGraph
在Flink Client
中生成,由Flink Client
在提交的时候触发 Flink 应用的 main 方法
用户编写的业务逻辑组装成Transformation
Pipeline
在最后调用StreamExecutionEnvironment.execute()
的时候 开始触发StreamGraph
的构建Code
StreamGraph
入口StreamGraphGenerator
生成StreamGraph
StreamGraphGenerator
转换Transformation
- 单输入
Transformation
的转换 (Node) - 虚拟
Transformation
的转换 (Edge)
作业图 (JobGraph)
Job Graph
可以由流计算的 StreamGraph
和 批处理的 OptimizedPlan
转换而来
流计算中: 在
StreamGraph
的基础上进行了一些优化 通过OperatorChain
机制 将算子合并起来- 在执行时调度在同一个 Task 线程上
- 避免数据的跨线程
- 跨网络的传递
Job Graph
中 实现了 流和批的统一表达,从JobGraph
的图里 可以看到,数据从上一个算子流到下一个算子的过程中,上游作为生产者提供了中间数据集(IntermediateDataset
)而下游作为消费者需要JobEdge
JobEdge
是一个通信管道,连接了上游生产的中间数据集和下游的JobVertex
节点
JobVertex
|JobEdge
|IntermediateDataSet
JobGraph 核心对象
JobVertex
经过算子融合优化后符合条件的多个 StreamNode
可能会融合在一起生成一个 JobVertex
即一个JobVertex
包含一个或多个算子,JobVertex
的输入是 JobEdge
输出是 IntermediateDataSet
JobEdge
JobEdge
是 JobGraph
中连接 JobVertex
和 IntermediateDataSet
的边,表示 JobGraph
中的一个数据流转通道
其上游数据源是 IntermediateDataSet
下游消费者是 JobVertex
即数据通过 JobEdge
由 IntermediateDataSet
传递给目标 JobVertex
IntermediateDataSet
中间数据集 IntermediateDataSet
是一种逻辑结构,用来表示 JobVertex
的输出
即该 JobVertex
中包含的算子会产生的数据集
不同的执行模式下,其对应的结果分区类型不同,决定了在执行时数据交换的模式IntermediateDataSet
的个数与该 JobVertex
对应的 StreamNode
的出边数量相同,可以是一个或多个
JobGraph 生成过程
JobGraph
的生成入口是在 StreamGraph
中,流计算的 JobGraph
和批处理的 JobGraph
的生成逻辑不同
- 对于流而言: 使用的是
StreamingJobGraphGenerator
-
Code
StreamGraph 中触发生成 JobGraph
- StreamingJobGraphGenerator 生成 JobGraph 预处理
- 构建 JobGraph 的 点和边
- JobGraph 构建与 OperatorChain 优化
Operator chain 融合
为了更好的分布式执行,Flink 会尽可能地将多个算子融合在一起,形成一个 OperatorChain
,一个 OperatorChain
在同一个 Task
线程内执行OperatorChain
内的算子之间,在同一个线程内通过方法调用的的方式传递数据 好处有:
- 减少线程之间的切换
- 减少消息的序列化/反序列化
- 无序借助内存缓冲区
- 无需通过网络在算子间传递数据
- 可以减少延迟的同时提高整体的吞吐量
Operator chain 条件
- 上下游的并行度一致
- 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
- 上下游节点都在同一个 slot group 中
- 下游节点的 chain 策略为
ALWAYS
(可以与上下游链接,map、flatmap、filter等默认是ALWAYS) - 上游节点的 chain 策略为
ALWAYS
或HEAD
(只能与下游链接,不能与上游链接,Source默认是HEAD) StreamEdge
数据分区方式是 forward- 用户没有禁用 chain
执行图 (Execution)
ExecutioGraph
是调度 Flink 作业执行的核心数据结构,包含了
- 作业中所有并行执行的 Task 信息
- Task 之间的关联关系
- 数据流转关系
StreamGraph
JobGraph
在 Flink 客户端中生成,然后提交给 Flink 集群 JobGraph
到 ExecutionGraph
的转换在 JobMaster
中完成,转换过程中的重要变化有
1) 加入了并行度的概念,成为真正可调度的图结构
2) 与 JobGraph
的对应关系为:
JobVertex
对应的ExecutionJobVertex
和ExecutionVertex
IntermediateDataSet
对应的IntermediateResult
和IntermediateResultPartition
每个
Task
对应一个ExecutionGraph
的一个ExecutionVertex
ExecutionVertex
Task
用InputGate
|InputChannel
|ResultPartition
对应了IntermediateResult
和ExecutionEdge
ExecutionGraph 核心对象
ExecutionJobVertex
ExecutionJobVertex
与JobGraph
中的 JobVertex
一一对应
数量与 JobVertex
中所包含的 StreamNode
的并行度一致
ExecutionJobVertex
用来将一个 JobVertex
封装成 ExecutionJobVertex
- 并依次创建
ExecutionVertex
Execution
IntermediateResult
IntermediateResultPartirion
在 ExecutionJobVertex
的构造函数中,首先是依据对应的 JobVertex
的并发度,生成对应个数的 ExecutionVertex
其中一个 ExecutionVertex
代表一个 ExecutionJobVertex
的并发子 Task
然后是将原来 JobVertex
的中间结果 IntermediateDataSet
转换为 ExecutionGraph
中的 IntermediateResult
ExecutionVertex
ExecutionJobVertex
中会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex
构建 ExecutionVertex
的同时, 也会构建 ExecutionVertex
的输出 IntermediateResult
, 并且将 ExecutionVertex
的输出为 IntermediateResultPartition
ExecutionVertex
的构造函数中,首先会创建 IntermediateResultPartition
并通过 IntermediateResult.setPartition()
建立 IntermediateResult
和 IntermediateResultPartition
之间的关系,然后生成 Execution
并配置资源
Execution
Execution Vertex
相当于每个 Task
执行的模板,在真正执行的时候,会将 ExecutionVertex
中的信息包装为一个 Execution
执行一个 ExecutionVertex
JobManager
和 TaskManager
之间关于 Task
的部署 和 Task
执行状态的更新都是通过 ExecutionAttemptID
来标识 实例的, 在发生故障或者数据需要重新计算的情况下, ExecutionVertex
可能会有多个 ExecutionAttemptID
一个 Execution
通过 ExecutionAttemptID
来唯一标识
IntermediateResult
IntermediateResult
又叫做中间结果集,该对象是个逻辑概念,表示 ExecutionJobVertex
的输出和 JobGraph
中的 IntermediateDataSet
一一对应
同样一个 ExecutionJobVertex
可以有多个中间结果 取决于当前 JobVertex
有几个出边 (JobEdge
)
一个中间结果集包含了多个中间结果分区 IntermediateResultPartition
其个数等于 该 JobVertex
的并发度,或者叫做算子的并行度
IntermediateResultPartition
IntermediateResultPartition
又叫做中间结果分区,表示1个 ExecutionVertex
输出结果与 ExecutionEdge
相关联
ExecutionEdge
ExecutionEdge
表示 ExecutionVertex
的输入,连接到上游产生的 IntermediateResultPartition
一个 Execution
对应于唯一的 1个 IntermediateResultPartition
和 1个 ExecutionVertex
一个 ExecutionVertex
可以有多个 ExecutionEdge
ExecutionGraph 生成过程
初始化作业调度器的时候,根据 JobGraph
生成 ExecutionGraph
在 ScheduleBase
的构造方法中触发构建,最终调用 SchedulerBase.createExecutionGraph()
触发实际的构建操作,使用 ExecutionGraphBuilder
构建 ExecutionGraph
Code
- ExecutionGraph 构建入口
- 构建 ExecutionGraph 的核心逻辑
- 构建 ExecutionGraph 节点
构造 ExecutionEdge
点对点连接 < DistributionPattern.POINTWISE >
一对一连接 并发的 Task 数量与分区数相等
多对一连接 下游的 Task 数量小与上游的分区数
一对多连接 下游的 Task 数量多于上游的分区数
全连接 < DistributionPattern.ALL_TO_ALL >
- shuffle