组件

新版Flink JobManager包括 Dispatcher、ResourceManager、JobManager三部分。

0x01 ResourceManager

ResourceManager基类下面有四种实现:

  1. YARN
  2. Mesos
  3. Standalone-multi-job (Standalone 模式)
  4. Self-contained-single-job(Docker/Kubernetes 模式)

ResourceManager核心组件:Slot Manager

对外RPC调用

requestSlotcancelSlotRequest 主要供 JobMaster 进行调用,而 sendSlotReportnotifySlotAvailable 则主要供 TaskExecutor 调用。ResourceManager 在接收到上述 RPC 调用后,会通过 SlotManager 完成具体的工作。
YarnResourceManager通过startNewWorker和stopWorker在TM不够时候申请和停止。

Slot Manager

管理和分配Slot
RM会缓存 TaskManager (container) 以便重复利用,并将一段时间未使用的空闲资源归还给集群。
在Yarn模式下,Slot不足SlotManager会通过ResourceActions#allocateResource(ResourceProfile)告知RM,RM可能申请新的TM。
四大功能:Slot注册、请求Slot、取消Slot请求、超时检测。

超时检测

SlotRequest超时检测和TM空闲检测。
TM空闲超时调用ResourceActions#releaseResource()释放。

0x02 Dispatcher

负责接收用户提供的作业,并且负责为这个新提交的作业拉起一个新的 JobManager及相关服务(包括REST endpoint等),在per-job运行模式下,Dispatcher将直接从Container工作目录加载JobGraph文件;在yarn-session运行模式下,Dispatcher将在接收客户端提交的Job(通过BlockServer接收JobGraph文件)

0x03 JobManager

JobManager核心组件:Checkpoint Coordinator、Slot Pool、ExecutionGraph、SlotSharingManager、Scheduler
SlotSharingGroup是软限制、CoLocationGroup是硬限制。
默认情况下SlotSharingGroup全在defalut组中。尽可能让所有subtask在一个Slot里。
CoLocationGroup限制JobVertex同一编号(每个subtask有个编号)的必须在一个Slot中。
CoLocationGroup通过CoLocationConstraint限制Slot共享,用户可以通过API自定义。
JobManager 会随 JobGraph 一起创建,并在作业结束后销毁。JobManager 的构造器可以接受一个 Savepoint 或 Checkpoint 来初始化作业,这为作业的容器化提供了基础。
同样需要关注一下 AllocationID 和 SlotRequestID 的区别:AllocationID 是用来区分物理内存的分配,它总是和 AllocatedSlot 向关联的;而 SlotRequestID 是任务调度执行的时候请求 LogicalSlot,是和 LogicalSlot 关联的。
代码指定共享组:

  1. someStream.filter(...).slotSharingGroup("group1");

Slot管理

PhysicalSlot和LogicSlot

PhysicalSlot实现是AllocatedSlot。
LogicSlot实现是SingleLogicalSlot。
SingleLogicalSlot 实现 PhysicalSlot.Payload接口,所以SingleLogicalSlot可以作为Payload装配到PhysicalSlot上。
SingleLogicalSlot类也有Payload属性,可以装配Execution类(实际就是Subtask)。

AllocationID和SlotRequestID

AllocationID和PhysicalSlot绑定,SlotRequestID和LogicSlot绑定。

TaskSlot

MultiTaskSlot和SingleTaskSlot两种实现。
TaskSlot🌲:
构建树形节点SingTaskSlot是叶子节点,MultiTaskSlot包含SingleTaskSlot。根节点为MultiTaskSlot有SlotContext属性代表PhysicalSlot。🌲中所有subtask都会在一个Slot中运行。区分叶子节点的是AbstractID(JobVertexID或CoLocationGroupID)
MultiTaskSlot作为根节点可以作为Payload装载到PhysicalSlot上。

SlotPool

SlotPool里都是Physical Slot
JobManager 有一个 SlotPool 来暂存 TaskManager 提供给它并被接受的 slot。JobManager 的调度器从这个 SlotPool 获取资源,因此即使 ResourceManager 不可用 JobManager 仍然可以使用已经分配给它的 slot。
当 SlotPool 无法满足一个资源请求时,它会向 ResourceManager 请求新的 slot。这时如果 ResourceManager 不可用,或者 ResourceManager 拒绝了请求,或者请求超时,都会导致 slot 请求失败。
SlotPool 会将未使用的 slot 归还给 ResourceManager。一个 slot 被视为未使用的标准是当作业已经完全处于运行状态时它未被使用。

向RM申请Slot

  1. class SlotPoolImpl implements SlotPool {
  2. /** The book-keeping of all allocated slots. */
  3. //所有分配给当前 JobManager 的 slots
  4. private final AllocatedSlots allocatedSlots;
  5. /** The book-keeping of all available slots. */
  6. //所有可用的 slots(已经分配给该 JobManager,但还没有装载 payload)
  7. private final AvailableSlots availableSlots;
  8. /** All pending requests waiting for slots. */
  9. //所有处于等待状态的slot request(已经发送请求给 ResourceManager)
  10. private final DualKeyMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;
  11. /** The requests that are waiting for the resource manager to be connected. */
  12. //处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接)
  13. private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
  14. }

CheckpointCoordinator

在不开启 Checkpoint 的情况下,Flink 依然会初始化 CheckpointCoordinator,并且在 HDFS 上创建对应的 Checkpoint 目录,目的是支持Savepoint。

ScheduleMode

流是Eager模式,批是LAZY_FROM_SOURCES模式。

  1. /**
  2. * The ScheduleMode decides how tasks of an execution graph are started.
  3. */
  4. public enum ScheduleMode {
  5. /** Schedule tasks lazily from the sources. Downstream tasks are started once their input data are ready */
  6. LAZY_FROM_SOURCES(true),
  7. /**
  8. * Same as LAZY_FROM_SOURCES just with the difference that it uses batch slot requests which support the
  9. * execution of jobs with fewer slots than requested. However, the user needs to make sure that the job
  10. * does not contain any pipelined shuffles (every pipelined region can be executed with a single slot).
  11. */
  12. LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST(true),
  13. /** Schedules all tasks immediately. */
  14. EAGER(false);
  15. private final boolean allowLazyDeployment;
  16. ScheduleMode(boolean allowLazyDeployment) {
  17. this.allowLazyDeployment = allowLazyDeployment;
  18. }
  19. /**
  20. * Returns whether we are allowed to deploy consumers lazily.
  21. */
  22. public boolean allowLazyDeployment() {
  23. return allowLazyDeployment;
  24. }
  25. }

0x04 WebServer

image.png

BolbServer

线程,管理二进制大文件。
存三种类型文件:jar包、高负荷RPC消息、TM日志文件。
两类:PERMANENT_BLOB(和job生命周期一致,可恢复,会上传HDFS)、TRANSIENT_BLOB(生命周期用户自行管理,不可恢复,不上传HDFS)

BlobStore:一个interface。HDFS、S3、FTP等实现
${high-availability.storageDir}/${applicationid}/blob/job${jobid}
${high-availability.storageDir}/${application_id}/blob/job
${job_id}/${blob_key}
blob.storage.directory 默认 java.io.tmpdir(/tmp)Blob本地存储目录
BlobClient
优先使用本地文件;再从HDFS中获取;最后才尝试从BlobServer下载。

HA

ZK和HDFS存储数据以便主备切换,JobGraph要存起来。

Graph

StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
第⼀层:Program -> StreamGraph
Operator节点生成Transform,组成的图。
第⼆层:StreamGraph -> JobGraph
operator chain到一起,生成用于CheckPoint的JobVertexID
Task组成的图。
JobGraph在Client端生成提交到集群。
operator chain条件:

  1. 上下游的并行度一致
  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
  3. 上下游节点都在同一个 slot group 中
  4. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
  5. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
  6. 两个节点间数据分区方式是 forward
  7. 用户没有禁用 chain

第三层:JobGraph -> ExecutionGraph
subtask组成的图。
ExecutionGraph开始考虑并发。JobManager创建。调度层面最重要的数据结构。

第四层:ExecutionGraph -> 物理执⾏计划
物理执⾏计划是ExecutionGraph被JobManager调度后,装配到Slot后,并不是真正的图。
ResultSubpartition由下游并行度决定,InputChannel和ResultSubpartition一一相连
ResultPartition、InputGate
Off-heap Memory -> NetworkBufferPool -> LocalBufferPool -> ResultSubPartition
NetworkBufferPool由TaskManager共享

image.png

参考

https://www.whitewood.me/2018/06/17/FLIP6-资源调度模型重构/