• HA 及 Leader 选举">HA 及 Leader 选举
  • 小结">小结
  • 参考">参考

    在 Flink 1.5.0 版本发布的时候,Flink 迎来了一个重要的改进:根据 FLIP-6 重构了 Flink 集群部署和任务处理模型,以便更好地和管理资源和调度任务,更优雅地和 Yarn、 Mesos、Kubernetes 等框架进行集成。
    在这篇文章中,我们将对 Flink 集群的启动流程加一分析。本文的分析基于 Flink 1.9-SNAPSHOT 版本的代码。

    HA 及 Leader 选举

    Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。
    Leader 地址的获取通过 LeaderRetrievalListerLeaderRetriverService 这两个接口来完成。 LeaderRetriverService 可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。

    | ``` public interface LeaderRetrievalService { void start(LeaderRetrievalListener listener) throws Exception; void stop() throws Exception; } public interface LeaderRetrievalListener { void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID); void handleError(Exception exception); }

    1. |
    2. | --- |
    3. `GatewayRetriver` 接口用于获取 `RpcGateway`,抽象类 `LeaderGatewayRetriver` 则同时继承了 `LeaderRetriever` `GatewayRetriver`,因而1)可以在Leader选举完成后得到 Leader 地址 2)可以获取到 Leader RpcGateway。<br />`RpcGatewayRetriever` `LeaderGatewayRetriver` 的具体实现,根据 Leader 的地址通过 `RpcService.connect()` 方法获得对应 Leader RpcGateway
    4. |

    class RpcGatewayRetriever> extends LeaderGatewayRetriever { @Override protected CompletableFuture createGateway(CompletableFuture> leaderFuture) { return FutureUtils.retryWithDelay( () -> leaderFuture.thenCompose( (Tuple2 addressLeaderTuple) -> rpcService.connect( addressLeaderTuple.f0, fencingTokenMapper.apply(addressLeaderTuple.f1), gatewayType)), retries, retryDelay, rpcService.getScheduledExecutor()); } }

    1. |
    2. | --- |
    3. Leader 选举是通过 `LeaderElectionService`(选举服务)和 `LeaderContender`(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 `LeaderContender` 竞选成功了,会通过 `LeaderContender#grantLeadership` 得到通知。
    4. |

    public interface LeaderElectionService { void start(LeaderContender contender) throws Exception; void stop() throws Exception; void confirmLeaderSessionID(UUID leaderSessionID); boolean hasLeadership(@Nonnull UUID leaderSessionId); } public interface LeaderContender { void grantLeadership(UUID leaderSessionID); void revokeLeadership(); String getAddress(); void handleError(Exception exception); }

    1. |
    2. | --- |
    3. `LeaderElectionService` 有多种实现,如无需进行选举过程的 `StandaloneLeaderElectionService`,以及借助 zookeeper curator 框架实现的 `ZooKeeperLeaderElectionService`,具体的实现细节可参考对应的源码。<br />`HighAvailabilityServices` 接口则提供了获取 HA 相关所有服务的方法,包括:
    4. - ResourceManager 选举服务及 Leader 获取
    5. - Dispatcher 选举服务及 Leader 获取
    6. - 任务状态的注册表
    7. - checkpoint recoveryblob store 等相关的服务
    8. <a name="363f3bcf"></a>
    9. ## [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#minicluster-%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B)MiniCluster 的启动流程
    10. 我们先从最为简单的 MiniCluster 着手,分析一下 Flink 的启动流程以及内部各组件之间的交互。 MiniCluster 可以看做是一个内嵌的 Flink 运行时环境,所有的组件都在独立的本地线程中运行。MiniCluster 的启动入口在 `LocalStreamEnvironment` 中。<br />在 `MiniCluster#start` 中,启动流程大致分为三个阶段:
    11. - 创建一些辅助的服务,如 `RpcService` `HighAvailabilityServices`, `BlobServer`
    12. - 启动 TaskManager
    13. - 启动 Dispatcher ResourceManager
    14. <a name="01592dec"></a>
    15. ### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E5%88%9B%E5%BB%BA-highavailabilityservices)创建 `HighAvailabilityServices`
    16. |

    class MiniCluster { public void start() { //…… ioExecutor = Executors.newFixedThreadPool( Hardware.getNumberCPUCores(), new ExecutorThreadFactory(“mini-cluster-io”)); haServices = createHighAvailabilityServices(configuration, ioExecutor); //…… }

    1. protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
    2. LOG.info("Starting high-availability services");
    3. return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
    4. configuration,
    5. executor);
    6. }

    }

    1. |
    2. | --- |
    3. `HighAvailabilityServicesUtils` 是创建 `HighAvailabilityServices` 的工具类,在没有配置 HA 的情况下,会创建 `EmbeddedHaServices` `EmbeddedHaServices` 不具备高可用的特性,适用于 ResourceMangaer TaksManagerJobManager 等所有组件都运行在同一个进程的情况。`EmbeddedHaService` 为各组件创建的选举服务为 `EmbeddedLeaderElectionService`, 一旦有参与选举的 `LeaderContender` 加入,该 contender 就被选择为 leader
    4. <a name="6e7f3121"></a>
    5. ### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E5%90%AF%E5%8A%A8-taskmanager)启动 TaskManager
    6. |

    class MiniCluster { public void start() { //…… startTaskManagers(); //…… } private void startTaskManagers() throws Exception { final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); for (int i = 0; i < numTaskManagers; i++) { startTaskExecutor(); } } @VisibleForTesting void startTaskExecutor() throws Exception { synchronized (lock) { final Configuration configuration = miniClusterConfiguration.getConfiguration(); final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager( configuration, new ResourceID(UUID.randomUUID().toString()), taskManagerRpcServiceFactory.createRpcService(), haServices, heartbeatServices, metricRegistry, blobCacheService, useLocalCommunication(), taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size())); taskExecutor.start(); taskManagers.add(taskExecutor); } } }

    1. |
    2. | --- |
    3. 在创建 `HighAvailabilityServices` 之后,就可以依次启动 TaskManager 了。`TaskManagerRunner#startTaskManager` 会创建一个 `TaskExecutor`, `TaskExecutor` 实现了 `RpcEndpoint` 接口。 `TaskExecutor` 需要和 `ResourceManager` 等组件进行通信,可以通过 `HighAvailabilityServices` 获得对应的服务地址。<br />在 `TaskExecutor` 启动的回调函数中,会启动一系列服务
    4. |

    class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public void onStart() throws Exception { try { //启动服务 startTaskExecutorServices(); } catch (Exception e) { final TaskManagerException exception = new TaskManagerException(String.format(“Could not start the TaskExecutor %s”, getAddress()), e); onFatalError(exception); throw exception; } //超时交由 FatalErrorHandler 进行处理 startRegistrationTimeout(); } private void startTaskExecutorServices() throws Exception { try { // start by connecting to the ResourceManager resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); // tell the task slot table who’s responsible for the task slot actions taskSlotTable.start(new SlotActionsImpl()); // start the job leader service jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService()); } catch (Exception e) { handleStartTaskExecutorServicesException(e); } }

    1. /**
    2. * The listener for leader changes of the resource manager.
    3. */
    4. private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
    5. @Override
    6. public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
    7. //获得 ResourceManager 的地址, 和 ResourceManager 建立连接
    8. runAsync(
    9. () -> notifyOfNewResourceManagerLeader(
    10. leaderAddress,
    11. ResourceManagerId.fromUuidOrNull(leaderSessionID)));
    12. }
    13. @Override
    14. public void handleError(Exception exception) {
    15. onFatalError(exception);
    16. }
    17. }
    18. private final class JobLeaderListenerImpl implements JobLeaderListener {
    19. @Override
    20. public void jobManagerGainedLeadership(
    21. final JobID jobId,
    22. final JobMasterGateway jobManagerGateway,
    23. final JMTMRegistrationSuccess registrationMessage) {
    24. //和 JobManager 建立连接
    25. runAsync(
    26. () ->
    27. establishJobManagerConnection(
    28. jobId,
    29. jobManagerGateway,
    30. registrationMessage));
    31. }
    32. @Override
    33. public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
    34. log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);
    35. runAsync(() ->
    36. closeJobManagerConnection(
    37. jobId,
    38. new Exception("Job leader for job id " + jobId + " lost leadership.")));
    39. }
    40. @Override
    41. public void handleError(Throwable throwable) {
    42. onFatalError(throwable);
    43. }
    44. }

    }

    1. |
    2. | --- |
    3. `ResourceManagerLeaderListener` 的监听被回调时,`TaskExecutor` 会试图建立和 `ResourceManager` 的连接,连接被封装为 `TaskExecutorToResourceManagerConnection`。一旦获取 `ResourceManager` leader 被确定后,就可以获取到 `ResourceManager` 对应的 RpcGateway 接下来就可以通过 RPC 调用发起 `ResourceManager#registerTaskExecutor` 注册流程。注册成功后,`TaskExecutor` `ResourceManager` 报告其资源(主要是 slots)情况。
    4. |

    class TaskExecutor { private void establishResourceManagerConnection( ResourceManagerGateway resourceManagerGateway, ResourceID resourceManagerResourceId, InstanceID taskExecutorRegistrationId, ClusterInformation clusterInformation) { //发送SlotReport final CompletableFuture slotReportResponseFuture = resourceManagerGateway.sendSlotReport( getResourceID(), taskExecutorRegistrationId, taskSlotTable.createSlotReport(getResourceID()), taskManagerConfiguration.getTimeout()); //…… //连接建立 establishedResourceManagerConnection = new EstablishedResourceManagerConnection( resourceManagerGateway, resourceManagerResourceId, taskExecutorRegistrationId); stopRegistrationTimeout(); } }

    1. |
    2. | --- |
    3. <a name="d046d63f"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E5%90%AF%E5%8A%A8-dispatcherresourcemanagercomponent)启动 `DispatcherResourceManagerComponent`
    5. |

    class MiniCluster { public void start() { //…… dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents( configuration, dispatcherResourceManagreComponentRpcServiceFactory, haServices, blobServer, heartbeatServices, metricRegistry, metricQueryServiceRetriever, new ShutDownFatalErrorHandler() )); //…… } protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents( Configuration configuration, RpcServiceFactory rpcServiceFactory, HighAvailabilityServices haServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { //Session dispatcher, standalone resource manager SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(); return Collections.singleton( dispatcherResourceManagerComponentFactory.create( configuration, rpcServiceFactory.createRpcService(), haServices, blobServer, heartbeatServices, metricRegistry, new MemoryArchivedExecutionGraphStore(), metricQueryServiceRetriever, fatalErrorHandler)); } @Nonnull private SessionDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() { return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE); } }

    1. |
    2. | --- |
    3. MiniCluster 模式下,会创建一个 `SessionDispatcherResourceManagerComponent` 对象。`SessionDispatcherResourceManagerComponent` 继承自 `DispatcherResourceManagerComponent`,用来启动 Dispatcher ResourceManager,和 WebMonitorEndpoint 这些组件都在同一个进程中运行。MiniCluster 模式下启动的是 `StandaloneDispatcher` `StandaloneResourceManager`。<br />在工厂类创建 `DispatcherResourceManagerComponent`, 会启动 Dispatcher ResourceManager 等组件:
    4. |

    public abstract class AbstractDispatcherResourceManagerComponentFactory implements DispatcherResourceManagerComponentFactory { @Override public DispatcherResourceManagerComponent create( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { //……. webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getWebMonitorLeaderElectionService(), fatalErrorHandler); log.debug(“Starting Dispatcher REST endpoint.”); webMonitorEndpoint.start();

    1. resourceManager = resourceManagerFactory.createResourceManager(
    2. configuration,
    3. ResourceID.generate(),
    4. rpcService,
    5. highAvailabilityServices,
    6. heartbeatServices,
    7. metricRegistry,
    8. fatalErrorHandler,
    9. new ClusterInformation(hostname, blobServer.getPort()),
    10. webMonitorEndpoint.getRestBaseUrl(),
    11. jobManagerMetricGroup);
    12. final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
    13. dispatcher = dispatcherFactory.createDispatcher(
    14. configuration,
    15. rpcService,
    16. highAvailabilityServices,
    17. resourceManagerGatewayRetriever,
    18. blobServer,
    19. heartbeatServices,
    20. jobManagerMetricGroup,
    21. metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
    22. archivedExecutionGraphStore,
    23. fatalErrorHandler,
    24. historyServerArchivist);
    25. log.debug("Starting ResourceManager.");
    26. resourceManager.start();
    27. resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
    28. log.debug("Starting Dispatcher.");
    29. dispatcher.start();
    30. dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
    31. return createDispatcherResourceManagerComponent(
    32. dispatcher,
    33. resourceManager,
    34. dispatcherLeaderRetrievalService,
    35. resourceManagerRetrievalService,
    36. webMonitorEndpoint,
    37. jobManagerMetricGroup);
    38. }

    }

    1. |
    2. | --- |
    3. `ResourceManager` 启动的回调函数中,会通过 `HighAvailabilityServices` 获取到选举服务,从而参与到选举之中。并启动 `JobLeaderIdService`,管理向当前 ResourceManager 注册的作业的 leader id
    4. |

    abstract class ResourceManager extends FencedRpcEndpoint implements ResourceManagerGateway, LeaderContender { @Override public void onStart() throws Exception { try { startResourceManagerServices(); } catch (Exception e) { final ResourceManagerException exception = new ResourceManagerException(String.format(“Could not start the ResourceManager %s”, getAddress()), e); onFatalError(exception); throw exception; } } private void startResourceManagerServices() throws Exception { try { leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); initialize(); //参与选举 leaderElectionService.start(this); jobLeaderIdService.start(new JobLeaderIdActionsImpl()); registerSlotAndTaskExecutorMetrics(); } catch (Exception e) { handleStartResourceManagerServicesException(e); } } }

    1. |
    2. | --- |
    3. `Dispatcher` 启动的回调函数中,当前 Dispatcher 也会通过 `LeaderElectionService` 参与选举。
    4. |

    public abstract class Dispatcher extends FencedRpcEndpoint implements DispatcherGateway, LeaderContender, SubmittedJobGraphStore.SubmittedJobGraphListener { //resource manager 的 gateway retriever,可以和 resource manager 通信 private final GatewayRetriever resourceManagerGatewayRetriever;

    1. @Override
    2. public void onStart() throws Exception {
    3. try {
    4. startDispatcherServices();
    5. } catch (Exception e) {
    6. final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e);
    7. onFatalError(exception);
    8. throw exception;
    9. }
    10. }
    11. private void startDispatcherServices() throws Exception {
    12. try {
    13. submittedJobGraphStore.start(this);
    14. leaderElectionService.start(this);
    15. registerDispatcherMetrics(jobManagerMetricGroup);
    16. } catch (Exception e) {
    17. handleStartDispatcherServicesException(e);
    18. }
    19. }

    }

    1. |
    2. | --- |
    3. <a name="4adb4f64"></a>
    4. ### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E6%8F%90%E4%BA%A4-jobgraph)提交 JobGraph
    5. 通过 `MiniCluster#executeJobBlocking` 提交 `JobGraph` 并等待运行完成,提交`JobGraph`和请求运行结果的逻辑如下,都是通过 RPC 调用来实现:
    6. |

    class MiniCluster { public CompletableFuture submitJob(JobGraph jobGraph) { //通过 Dispatcher 的 gateway retriever 获取 DispatcherGateway final CompletableFuture dispatcherGatewayFuture = getDispatcherGatewayFuture(); // we have to allow queued scheduling in Flip-6 mode because we need to request slots // from the ResourceManager jobGraph.setAllowQueuedScheduling(true); final CompletableFuture blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture); final CompletableFuture jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); //通过 RPC 调用向 Dispatcher 提交 JobGraph final CompletableFuture acknowledgeCompletableFuture = jarUploadFuture .thenCombine( dispatcherGatewayFuture, (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)) .thenCompose(Function.identity()); return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); } public CompletableFuture requestJobResult(JobID jobId) { return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.requestJobResult(jobId, RpcUtils.INF_TIMEOUT)); } }

    1. |
    2. | --- |
    3. `Dispatcher` 在接收到提交 `JobGraph` 的请求后,会将提交的 `JobGraph` 保存在 `SubmittedJobGraphStore` 中(用于故障恢复),并为提交的 `JobGraph` 启动 JobManager
    4. |

    class Dispatcher { private CompletableFuture createJobManagerRunner(JobGraph jobGraph) { final RpcService rpcService = getRpcService(); //创建 JobManagerRunner final CompletableFuture jobManagerRunnerFuture = CompletableFuture.supplyAsync( CheckedSupplier.unchecked(() -> jobManagerRunnerFactory.createJobManagerRunner( jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler)), rpcService.getExecutor()); return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner)); } private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception { final JobID jobId = jobManagerRunner.getJobGraph().getJobID(); jobManagerRunner.getResultFuture().whenCompleteAsync( (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> { // check if we are still the active JobManagerRunner by checking the identity //noinspection ObjectEquality if (jobManagerRunner == jobManagerRunnerFutures.get(jobId).getNow(null)) { if (archivedExecutionGraph != null) { jobReachedGloballyTerminalState(archivedExecutionGraph); } else { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); if (strippedThrowable instanceof JobNotFinishedException) { jobNotFinished(jobId); } else { jobMasterFailed(jobId, strippedThrowable); } } } else { log.debug(“There is a newer JobManagerRunner for the job {}.”, jobId); } }, getMainThreadExecutor()); //启动JobManager jobManagerRunner.start(); return jobManagerRunner; } }

    1. |
    2. | --- |
    3. 启动的 `JobManagerRunner` 会竞争 leader ,一旦被选举为 leader,就会启动一个 `JobMaster`
    4. |

    public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync { public void start() throws Exception { try { //竞争leader leaderElectionService.start(this); } catch (Exception e) { log.error(“Could not start the JobManager because the leader election service did not start.”, e); throw new Exception(“Could not start the leader election service.”, e); } }

    1. //被选举为 leader
    2. @Override
    3. public void grantLeadership(final UUID leaderSessionID) {
    4. synchronized (lock) {
    5. if (shutdown) {
    6. log.info("JobManagerRunner already shutdown.");
    7. return;
    8. }
    9. leadershipOperation = leadershipOperation.thenCompose(
    10. (ignored) -> {
    11. synchronized (lock) {
    12. return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
    13. }
    14. });
    15. handleException(leadershipOperation, "Could not start the job manager.");
    16. }
    17. }
    18. private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
    19. final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
    20. return jobSchedulingStatusFuture.thenCompose(
    21. jobSchedulingStatus -> {
    22. if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
    23. return jobAlreadyDone();
    24. } else {
    25. return startJobMaster(leaderSessionId);
    26. }
    27. });
    28. }
    29. private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
    30. log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
    31. jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
    32. try {
    33. runningJobsRegistry.setJobRunning(jobGraph.getJobID());
    34. } catch (IOException e) {
    35. return FutureUtils.completedExceptionally(
    36. new FlinkException(
    37. String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
    38. e));
    39. }
    40. final CompletableFuture<Acknowledge> startFuture;
    41. try {
    42. //使用特定的 JobMasterId 启动 JobMaster
    43. startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
    44. } catch (Exception e) {
    45. return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
    46. }
    47. final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
    48. return startFuture.thenAcceptAsync(
    49. (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
    50. executor);
    51. }

    }

    1. |
    2. | --- |
    3. `JobMaster` 启动后会和 `ResourceManager` 建立连接,连接被封装为 `ResourceManagerConnection`。一旦连接建立之后,`JobMaster` 就可以通过 RPC 调用和 `ResourceManager` 进行通信了:
    4. |

    public class JobMaster extends FencedRpcEndpoint implements JobMasterGateway, JobMasterService { private void startJobMasterServices() throws Exception { // start the slot pool make sure the slot pool now accepts messages for this leader slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor()); scheduler.start(getMainThreadExecutor()); //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start // try to reconnect to previously known leader reconnectToResourceManager(new FlinkException(“Starting JobMaster component.”)); // job is ready to go, try to establish connection with resource manager // - activate leader retrieval for the resource manager // - on notification of the leader, the connection will be established and // the slot pool will start requesting slots resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); } }

    1. |
    2. | --- |
    3. 在此之后就进入了任务调度执行的流程。
    4. <a name="08fc1434"></a>
    5. ## [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#standalone-cluster-%E6%A8%A1%E5%BC%8F%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B)Standalone Cluster 模式的启动流程
    6. Standalone 模式下,TaskManager ResourceManager 等都在独立的进程中运行。Standalone Cluster 有两种启动方式, standalonesession 模式和 standalonejob 方式,它们区别在于 Dispatcher 的实现方式不同。
    7. <a name="67dfbcf4"></a>
    8. ### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#jobmanager-%E7%9A%84%E5%90%AF%E5%8A%A8)JobManager 的启动
    9. 需要注意的一点是,这里我们所说的 **JobManager** 指的是包含 `Dispatcher` `ResouceManager` 等组件的单一进程,而并非 `Dispatcher` 为执行 `JobGraph` 而启动的 `JobManagerRunner`。在 FLIP-6 的实现中,每个 `JobGraph` 的调度执行的实际上是由一个独立的 `JobMaster` 负责的。<br />standalonesession 方式启动的 JobManager 的入口类是 `StandaloneSessionClusterEntrypoint` 继承自 `SessionClusterEntrypoint`;与此对应的是,以 standalonejob 方式启动 JobManager 的入口类是 `StandaloneJobClusterEntryPoint`,继承自 `JobClusterEntrypoint`。它们都由公共父类 `ClusterEntrypoint` 派生而来,区别在于生成的 `DispatcherResourceManagerComponent` 不同。<br />先来看下启动过程,实际上和 MiniCluster 模式下启动 `DispatcherResourceManagerComponent` 的过程类似:
    10. |

    abstract class ClusterEntrypoint { private void runCluster(Configuration configuration) throws Exception { synchronized (lock) {

    1. //初始化 RpcService, HighAvailabilityServices 等服务
    2. initializeServices(configuration);
    3. // write host information into configuration
    4. configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
    5. configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
    6. //生成 DispatcherResourceManagerComponentFactory,由具体子类实现
    7. final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
    8. //创建 DispatcherResourceManagerComponent, 启动 ResourceManager, Dispatcher
    9. clusterComponent = dispatcherResourceManagerComponentFactory.create(
    10. configuration,
    11. commonRpcService,
    12. haServices,
    13. blobServer,
    14. heartbeatServices,
    15. metricRegistry,
    16. archivedExecutionGraphStore,
    17. new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
    18. this);
    19. //一旦 DispatcherResourceManagerComponent#getShutDownFuture 完成,则关闭各项服务
    20. clusterComponent.getShutDownFuture().whenComplete(
    21. (ApplicationStatus applicationStatus, Throwable throwable) -> {
    22. if (throwable != null) {
    23. shutDownAsync(
    24. ApplicationStatus.UNKNOWN,
    25. ExceptionUtils.stringifyException(throwable),
    26. false);
    27. } else {
    28. // This is the general shutdown path. If a separate more specific shutdown was
    29. // already triggered, this will do nothing
    30. shutDownAsync(
    31. applicationStatus,
    32. null,
    33. true);
    34. }
    35. });
    36. }
    37. }

    }

    1. |
    2. | --- |
    3. 这里生成的 `HighAvailabilityServices` MiniCluster 模式下略有区别,由于各组件不在同一个进程中,因而需要从配置中加载配置:1)如果采用基于 Zookeeper HA 模式,则创建 `ZooKeeperHaServices`,基于 zookeeper 获取 leader 通信地址 2)如果没有配置 HA,则创建 `StandaloneHaServices` 并从配置文件中获取各组件的 RPC 地址信息。<br />在 `StandaloneSessionClusterEntrypoint` 中,生成 `DispatcherResourceManagerComponent` 的工厂类是 `SessionDispatcherResourceManagerComponentFactory`,该工厂类创建 `SessionDispatcherResourceManagerComponent`:由 `SessionDispatcherFactory` 创建 `StandaloneDispatcher` `StandaloneResourceManagerFactory` 创建 `StandaloneResourceManager`。<br />在 `StandaloneJobClusterEntrypoint` 中,生成 `DispatcherResourceManagerComponent` 的工厂类是 `JobDispatcherResourceManagerComponentFactory`,该厂类创建 `JobDispatcherResourceManagerComponent`:由 `StandaloneResourceManagerFactory` 创建 `StandaloneResourceManager`,由 `JobDispatcherFactory` 创建 `MiniDispatcher`。一个 `MiniDispatcher` 和一个 `JobGraph` 相绑定,一旦绑定的 `JobGraph` 执行结束,则关闭 `MiniDispatcher`,进而停止 JobManager 进程。<br />`Dispatcher` `ResourceManager` 服务内部的启动流程则和 MiniCluster 中一致,这里不再赘述。
    4. <a name="714e9b80"></a>
    5. ### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#taskmanager-%E7%9A%84%E5%90%AF%E5%8A%A8)TaskManager 的启动
    6. TaskManager 的启动入口在 `TaskManagerRunner` 中,它的启动流程和 MiniCluster 模式下基本一致,区别在于: 1)运行在独立的进程中, 2`HighAvailabilityServices` 的创建要依赖配置文件获取。 `TaskManagerRunner` 会创建 `TaskExecutor``TaskExecutor` 通过 `HighAvailabilityServices` 获取 `ResourceManager` 的通信地址,并和 `ResourceManager` 建立连接。
    7. <a name="43ab2982"></a>
    8. ## [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#yarn-cluster-%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B)Yarn Cluster 的启动流程
    9. Yarn Cluster 的启动入口在 `FlinkYarnSessionCli` :首先根据命令行参数创建 `YarnClusterDescriptor`,接着调用 `YarnClusterDescriptor#deploySessionCluster` 触发集群的部署。<br />实际启动的逻辑在 `AbstractYarnClusterDescriptor#deployInternal` 中,主要就是通过 `YarnClient` yarn 集群提交应用,启动 ApplicationMaster:
    10. |

    abstract class AbstractYarnClusterDescriptor { protected ClusterClient deployInternal( ClusterSpecification clusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached) throws Exception { //…….

    1. ApplicationReport report = startAppMaster(
    2. flinkConfiguration,
    3. applicationName,
    4. yarnClusterEntrypoint,
    5. jobGraph,
    6. yarnClient,
    7. yarnApplication,
    8. validClusterSpecification);
    9. //.....
    10. return createYarnClusterClient(
    11. this,
    12. validClusterSpecification.getNumberTaskManagers(),
    13. validClusterSpecification.getSlotsPerTaskManager(),
    14. report,
    15. flinkConfiguration,
    16. true);
    17. }

    }

    1. |
    2. | --- |
    3. 根据 sessioncluster jobcluster 者两种启动的区别, 提交到 Yarn ApplicationMatser 的入口类分别为 `YarnSessionClusterEntrypoint` `YarnJobClusterEntrypoint`, 区别在于 Dispatcher 分别为 `StandaloneDispatcher` `MiniDispatcher``ResoureManager` 的具体实现类为 `YarnResourceManager`。<br />和前述的 Standalone Cluster 不同, Yarn Cluster 模式下启动的 Flink 集群,其 `TaskManager` 是由 `YarnResourceManager` 根据 JobMaster 的请求动态向 Yarn ResourceManager 进行申请的。在 JobMaster ResourceManager 申请资源时,如果当前没有足够的资源分配,则 `YarnResourceManager` 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 `TaskManager`:
    4. |

    class YarnResourceManager { //申请container private void requestYarnContainer() { resourceManagerClient.addContainerRequest(getContainerRequest()); // make sure we transmit the request fast and receive fast news of granted allocations resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); numPendingContainerRequests++; log.info(“Requesting new TaskExecutor container with resources {}. Number pending requests {}.”, resource, numPendingContainerRequests); }

    1. //分配container的回调函数
    2. @Override
    3. public void onContainersAllocated(List<Container> containers) {
    4. runAsync(() -> {
    5. final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
    6. final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();
    7. for (Container container : containers) {
    8. log.info(
    9. "Received new container: {} - Remaining pending container requests: {}",
    10. container.getId(),
    11. numPendingContainerRequests);
    12. if (numPendingContainerRequests > 0) {
    13. removeContainerRequest(pendingRequestsIterator.next());
    14. final String containerIdStr = container.getId().toString();
    15. final ResourceID resourceId = new ResourceID(containerIdStr);
    16. workerNodeMap.put(resourceId, new YarnWorkerNode(container));
    17. try {
    18. // Context information used to start a TaskExecutor Java process
    19. ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
    20. container.getResource(),
    21. containerIdStr,
    22. container.getNodeId().getHost());
    23. //启动 TaskManager
    24. nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
    25. } catch (Throwable t) {
    26. log.error("Could not start TaskManager in container {}.", container.getId(), t);
    27. // release the failed container
    28. workerNodeMap.remove(resourceId);
    29. resourceManagerClient.releaseAssignedContainer(container.getId());
    30. // and ask for a new one
    31. requestYarnContainerIfRequired();
    32. }
    33. } else {
    34. // return the excessive containers
    35. log.info("Returning excess container {}.", container.getId());
    36. resourceManagerClient.releaseAssignedContainer(container.getId());
    37. }
    38. }
    39. // if we are waiting for no further containers, we can go to the
    40. // regular heartbeat interval
    41. if (numPendingContainerRequests <= 0) {
    42. resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
    43. }
    44. });
    45. }
    46. //创建 TaskManager 的启动上下文
    47. private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
    48. throws Exception {
    49. // init the ContainerLaunchContext
    50. final String currDir = env.get(ApplicationConstants.Environment.PWD.key());
    51. final ContaineredTaskManagerParameters taskManagerParameters =
    52. ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);
    53. log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
    54. "JVM direct memory limit {} MB",
    55. containerId,
    56. taskManagerParameters.taskManagerTotalMemoryMB(),
    57. taskManagerParameters.taskManagerHeapSizeMB(),
    58. taskManagerParameters.taskManagerDirectMemoryLimitMB());
    59. Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);
    60. log.debug("TaskManager configuration: {}", taskManagerConfig);
    61. ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
    62. flinkConfig,
    63. yarnConfig,
    64. env,
    65. taskManagerParameters,
    66. taskManagerConfig,
    67. currDir,
    68. YarnTaskExecutorRunner.class, //入口类
    69. log);
    70. // set a special environment variable to uniquely identify this container
    71. taskExecutorLaunchContext.getEnvironment()
    72. .put(ENV_FLINK_CONTAINER_ID, containerId);
    73. taskExecutorLaunchContext.getEnvironment()
    74. .put(ENV_FLINK_NODE_ID, host);
    75. return taskExecutorLaunchContext;
    76. }

    } ``` | | —- |

    小结

    本文简单分析了 Flink 集群的启动流程,以及 ResourceManagerTaskExecutor DispatcherJobMaster 等不同组件之间的通信过程。

    参考

    -EOF-