在 Flink 1.5.0 版本发布的时候,Flink 迎来了一个重要的改进:根据 FLIP-6 重构了 Flink 集群部署和任务处理模型,以便更好地和管理资源和调度任务,更优雅地和 Yarn、 Mesos、Kubernetes 等框架进行集成。
在这篇文章中,我们将对 Flink 集群的启动流程加一分析。本文的分析基于 Flink 1.9-SNAPSHOT 版本的代码。
HA 及 Leader 选举
Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。
Leader 地址的获取通过 LeaderRetrievalLister 和 LeaderRetriverService 这两个接口来完成。 LeaderRetriverService 可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。
| ``` public interface LeaderRetrievalService { void start(LeaderRetrievalListener listener) throws Exception; void stop() throws Exception; } public interface LeaderRetrievalListener { void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID); void handleError(Exception exception); }
|| --- |`GatewayRetriver` 接口用于获取 `RpcGateway`,抽象类 `LeaderGatewayRetriver` 则同时继承了 `LeaderRetriever` 和 `GatewayRetriver`,因而1)可以在Leader选举完成后得到 Leader 地址 2)可以获取到 Leader 的 RpcGateway。<br />`RpcGatewayRetriever` 是 `LeaderGatewayRetriver` 的具体实现,根据 Leader 的地址通过 `RpcService.connect()` 方法获得对应 Leader 的 RpcGateway。|
class RpcGatewayRetriever
|| --- |Leader 选举是通过 `LeaderElectionService`(选举服务)和 `LeaderContender`(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 `LeaderContender` 竞选成功了,会通过 `LeaderContender#grantLeadership` 得到通知。|
public interface LeaderElectionService { void start(LeaderContender contender) throws Exception; void stop() throws Exception; void confirmLeaderSessionID(UUID leaderSessionID); boolean hasLeadership(@Nonnull UUID leaderSessionId); } public interface LeaderContender { void grantLeadership(UUID leaderSessionID); void revokeLeadership(); String getAddress(); void handleError(Exception exception); }
|| --- |`LeaderElectionService` 有多种实现,如无需进行选举过程的 `StandaloneLeaderElectionService`,以及借助 zookeeper 和 curator 框架实现的 `ZooKeeperLeaderElectionService`,具体的实现细节可参考对应的源码。<br />`HighAvailabilityServices` 接口则提供了获取 HA 相关所有服务的方法,包括:- ResourceManager 选举服务及 Leader 获取- Dispatcher 选举服务及 Leader 获取- 任务状态的注册表- checkpoint recovery、blob store 等相关的服务<a name="363f3bcf"></a>## [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#minicluster-%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B)MiniCluster 的启动流程我们先从最为简单的 MiniCluster 着手,分析一下 Flink 的启动流程以及内部各组件之间的交互。 MiniCluster 可以看做是一个内嵌的 Flink 运行时环境,所有的组件都在独立的本地线程中运行。MiniCluster 的启动入口在 `LocalStreamEnvironment` 中。<br />在 `MiniCluster#start` 中,启动流程大致分为三个阶段:- 创建一些辅助的服务,如 `RpcService`, `HighAvailabilityServices`, `BlobServer` 等- 启动 TaskManager- 启动 Dispatcher, ResourceManager 等<a name="01592dec"></a>### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E5%88%9B%E5%BB%BA-highavailabilityservices)创建 `HighAvailabilityServices`|
class MiniCluster { public void start() { //…… ioExecutor = Executors.newFixedThreadPool( Hardware.getNumberCPUCores(), new ExecutorThreadFactory(“mini-cluster-io”)); haServices = createHighAvailabilityServices(configuration, ioExecutor); //…… }
protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {LOG.info("Starting high-availability services");return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration,executor);}
}
|| --- |`HighAvailabilityServicesUtils` 是创建 `HighAvailabilityServices` 的工具类,在没有配置 HA 的情况下,会创建 `EmbeddedHaServices`。 `EmbeddedHaServices` 不具备高可用的特性,适用于 ResourceMangaer, TaksManager,JobManager 等所有组件都运行在同一个进程的情况。`EmbeddedHaService` 为各组件创建的选举服务为 `EmbeddedLeaderElectionService`, 一旦有参与选举的 `LeaderContender` 加入,该 contender 就被选择为 leader。<a name="6e7f3121"></a>### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E5%90%AF%E5%8A%A8-taskmanager)启动 TaskManager|
class MiniCluster { public void start() { //…… startTaskManagers(); //…… } private void startTaskManagers() throws Exception { final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); for (int i = 0; i < numTaskManagers; i++) { startTaskExecutor(); } } @VisibleForTesting void startTaskExecutor() throws Exception { synchronized (lock) { final Configuration configuration = miniClusterConfiguration.getConfiguration(); final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager( configuration, new ResourceID(UUID.randomUUID().toString()), taskManagerRpcServiceFactory.createRpcService(), haServices, heartbeatServices, metricRegistry, blobCacheService, useLocalCommunication(), taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size())); taskExecutor.start(); taskManagers.add(taskExecutor); } } }
|| --- |在创建 `HighAvailabilityServices` 之后,就可以依次启动 TaskManager 了。`TaskManagerRunner#startTaskManager` 会创建一个 `TaskExecutor`, `TaskExecutor` 实现了 `RpcEndpoint` 接口。 `TaskExecutor` 需要和 `ResourceManager` 等组件进行通信,可以通过 `HighAvailabilityServices` 获得对应的服务地址。<br />在 `TaskExecutor` 启动的回调函数中,会启动一系列服务|
class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public void onStart() throws Exception { try { //启动服务 startTaskExecutorServices(); } catch (Exception e) { final TaskManagerException exception = new TaskManagerException(String.format(“Could not start the TaskExecutor %s”, getAddress()), e); onFatalError(exception); throw exception; } //超时交由 FatalErrorHandler 进行处理 startRegistrationTimeout(); } private void startTaskExecutorServices() throws Exception { try { // start by connecting to the ResourceManager resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); // tell the task slot table who’s responsible for the task slot actions taskSlotTable.start(new SlotActionsImpl()); // start the job leader service jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService()); } catch (Exception e) { handleStartTaskExecutorServicesException(e); } }
/*** The listener for leader changes of the resource manager.*/private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {@Overridepublic void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {//获得 ResourceManager 的地址, 和 ResourceManager 建立连接runAsync(() -> notifyOfNewResourceManagerLeader(leaderAddress,ResourceManagerId.fromUuidOrNull(leaderSessionID)));}@Overridepublic void handleError(Exception exception) {onFatalError(exception);}}private final class JobLeaderListenerImpl implements JobLeaderListener {@Overridepublic void jobManagerGainedLeadership(final JobID jobId,final JobMasterGateway jobManagerGateway,final JMTMRegistrationSuccess registrationMessage) {//和 JobManager 建立连接runAsync(() ->establishJobManagerConnection(jobId,jobManagerGateway,registrationMessage));}@Overridepublic void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);runAsync(() ->closeJobManagerConnection(jobId,new Exception("Job leader for job id " + jobId + " lost leadership.")));}@Overridepublic void handleError(Throwable throwable) {onFatalError(throwable);}}
}
|| --- |当 `ResourceManagerLeaderListener` 的监听被回调时,`TaskExecutor` 会试图建立和 `ResourceManager` 的连接,连接被封装为 `TaskExecutorToResourceManagerConnection`。一旦获取 `ResourceManager` 的 leader 被确定后,就可以获取到 `ResourceManager` 对应的 RpcGateway, 接下来就可以通过 RPC 调用发起 `ResourceManager#registerTaskExecutor` 注册流程。注册成功后,`TaskExecutor` 向 `ResourceManager` 报告其资源(主要是 slots)情况。|
class TaskExecutor {
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
//发送SlotReport
final CompletableFuture
|| --- |<a name="d046d63f"></a>### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E5%90%AF%E5%8A%A8-dispatcherresourcemanagercomponent)启动 `DispatcherResourceManagerComponent`|
class MiniCluster { public void start() { //…… dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents( configuration, dispatcherResourceManagreComponentRpcServiceFactory, haServices, blobServer, heartbeatServices, metricRegistry, metricQueryServiceRetriever, new ShutDownFatalErrorHandler() )); //…… } protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents( Configuration configuration, RpcServiceFactory rpcServiceFactory, HighAvailabilityServices haServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { //Session dispatcher, standalone resource manager SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(); return Collections.singleton( dispatcherResourceManagerComponentFactory.create( configuration, rpcServiceFactory.createRpcService(), haServices, blobServer, heartbeatServices, metricRegistry, new MemoryArchivedExecutionGraphStore(), metricQueryServiceRetriever, fatalErrorHandler)); } @Nonnull private SessionDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() { return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE); } }
|| --- |在 MiniCluster 模式下,会创建一个 `SessionDispatcherResourceManagerComponent` 对象。`SessionDispatcherResourceManagerComponent` 继承自 `DispatcherResourceManagerComponent`,用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint, 这些组件都在同一个进程中运行。MiniCluster 模式下启动的是 `StandaloneDispatcher` 和 `StandaloneResourceManager`。<br />在工厂类创建 `DispatcherResourceManagerComponent`, 会启动 Dispatcher, ResourceManager 等组件:|
public abstract class AbstractDispatcherResourceManagerComponentFactory
resourceManager = resourceManagerFactory.createResourceManager(configuration,ResourceID.generate(),rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),jobManagerMetricGroup);final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);dispatcher = dispatcherFactory.createDispatcher(configuration,rpcService,highAvailabilityServices,resourceManagerGatewayRetriever,blobServer,heartbeatServices,jobManagerMetricGroup,metricRegistry.getMetricQueryServiceGatewayRpcAddress(),archivedExecutionGraphStore,fatalErrorHandler,historyServerArchivist);log.debug("Starting ResourceManager.");resourceManager.start();resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);log.debug("Starting Dispatcher.");dispatcher.start();dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);return createDispatcherResourceManagerComponent(dispatcher,resourceManager,dispatcherLeaderRetrievalService,resourceManagerRetrievalService,webMonitorEndpoint,jobManagerMetricGroup);}
}
|| --- |在 `ResourceManager` 启动的回调函数中,会通过 `HighAvailabilityServices` 获取到选举服务,从而参与到选举之中。并启动 `JobLeaderIdService`,管理向当前 ResourceManager 注册的作业的 leader id。|
abstract class ResourceManager
|| --- |在 `Dispatcher` 启动的回调函数中,当前 Dispatcher 也会通过 `LeaderElectionService` 参与选举。|
public abstract class Dispatcher extends FencedRpcEndpoint
@Overridepublic void onStart() throws Exception {try {startDispatcherServices();} catch (Exception e) {final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e);onFatalError(exception);throw exception;}}private void startDispatcherServices() throws Exception {try {submittedJobGraphStore.start(this);leaderElectionService.start(this);registerDispatcherMetrics(jobManagerMetricGroup);} catch (Exception e) {handleStartDispatcherServicesException(e);}}
}
|| --- |<a name="4adb4f64"></a>### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E6%8F%90%E4%BA%A4-jobgraph)提交 JobGraph通过 `MiniCluster#executeJobBlocking` 提交 `JobGraph` 并等待运行完成,提交`JobGraph`和请求运行结果的逻辑如下,都是通过 RPC 调用来实现:|
class MiniCluster {
public CompletableFuture
|| --- |`Dispatcher` 在接收到提交 `JobGraph` 的请求后,会将提交的 `JobGraph` 保存在 `SubmittedJobGraphStore` 中(用于故障恢复),并为提交的 `JobGraph` 启动 JobManager:|
class Dispatcher {
private CompletableFuture
|| --- |启动的 `JobManagerRunner` 会竞争 leader ,一旦被选举为 leader,就会启动一个 `JobMaster`。|
public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync { public void start() throws Exception { try { //竞争leader leaderElectionService.start(this); } catch (Exception e) { log.error(“Could not start the JobManager because the leader election service did not start.”, e); throw new Exception(“Could not start the leader election service.”, e); } }
//被选举为 leader@Overridepublic void grantLeadership(final UUID leaderSessionID) {synchronized (lock) {if (shutdown) {log.info("JobManagerRunner already shutdown.");return;}leadershipOperation = leadershipOperation.thenCompose((ignored) -> {synchronized (lock) {return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);}});handleException(leadershipOperation, "Could not start the job manager.");}}private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();return jobSchedulingStatusFuture.thenCompose(jobSchedulingStatus -> {if (jobSchedulingStatus == JobSchedulingStatus.DONE) {return jobAlreadyDone();} else {return startJobMaster(leaderSessionId);}});}private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());try {runningJobsRegistry.setJobRunning(jobGraph.getJobID());} catch (IOException e) {return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),e));}final CompletableFuture<Acknowledge> startFuture;try {//使用特定的 JobMasterId 启动 JobMasterstartFuture = jobMasterService.start(new JobMasterId(leaderSessionId));} catch (Exception e) {return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));}final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;return startFuture.thenAcceptAsync((Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),executor);}
}
|| --- |`JobMaster` 启动后会和 `ResourceManager` 建立连接,连接被封装为 `ResourceManagerConnection`。一旦连接建立之后,`JobMaster` 就可以通过 RPC 调用和 `ResourceManager` 进行通信了:|
public class JobMaster extends FencedRpcEndpoint
|| --- |在此之后就进入了任务调度执行的流程。<a name="08fc1434"></a>## [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#standalone-cluster-%E6%A8%A1%E5%BC%8F%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B)Standalone Cluster 模式的启动流程在 Standalone 模式下,TaskManager 和 ResourceManager 等都在独立的进程中运行。Standalone Cluster 有两种启动方式, 即 standalonesession 模式和 standalonejob 方式,它们区别在于 Dispatcher 的实现方式不同。<a name="67dfbcf4"></a>### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#jobmanager-%E7%9A%84%E5%90%AF%E5%8A%A8)JobManager 的启动需要注意的一点是,这里我们所说的 **JobManager** 指的是包含 `Dispatcher`, `ResouceManager` 等组件的单一进程,而并非 `Dispatcher` 为执行 `JobGraph` 而启动的 `JobManagerRunner`。在 FLIP-6 的实现中,每个 `JobGraph` 的调度执行的实际上是由一个独立的 `JobMaster` 负责的。<br />standalonesession 方式启动的 JobManager 的入口类是 `StandaloneSessionClusterEntrypoint`, 继承自 `SessionClusterEntrypoint`;与此对应的是,以 standalonejob 方式启动 JobManager 的入口类是 `StandaloneJobClusterEntryPoint`,继承自 `JobClusterEntrypoint`。它们都由公共父类 `ClusterEntrypoint` 派生而来,区别在于生成的 `DispatcherResourceManagerComponent` 不同。<br />先来看下启动过程,实际上和 MiniCluster 模式下启动 `DispatcherResourceManagerComponent` 的过程类似:|
abstract class ClusterEntrypoint { private void runCluster(Configuration configuration) throws Exception { synchronized (lock) {
//初始化 RpcService, HighAvailabilityServices 等服务initializeServices(configuration);// write host information into configurationconfiguration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());//生成 DispatcherResourceManagerComponentFactory,由具体子类实现final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);//创建 DispatcherResourceManagerComponent, 启动 ResourceManager, DispatcherclusterComponent = dispatcherResourceManagerComponentFactory.create(configuration,commonRpcService,haServices,blobServer,heartbeatServices,metricRegistry,archivedExecutionGraphStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),this);//一旦 DispatcherResourceManagerComponent#getShutDownFuture 完成,则关闭各项服务clusterComponent.getShutDownFuture().whenComplete((ApplicationStatus applicationStatus, Throwable throwable) -> {if (throwable != null) {shutDownAsync(ApplicationStatus.UNKNOWN,ExceptionUtils.stringifyException(throwable),false);} else {// This is the general shutdown path. If a separate more specific shutdown was// already triggered, this will do nothingshutDownAsync(applicationStatus,null,true);}});}}
}
|| --- |这里生成的 `HighAvailabilityServices` 和 MiniCluster 模式下略有区别,由于各组件不在同一个进程中,因而需要从配置中加载配置:1)如果采用基于 Zookeeper 的 HA 模式,则创建 `ZooKeeperHaServices`,基于 zookeeper 获取 leader 通信地址 2)如果没有配置 HA,则创建 `StandaloneHaServices`, 并从配置文件中获取各组件的 RPC 地址信息。<br />在 `StandaloneSessionClusterEntrypoint` 中,生成 `DispatcherResourceManagerComponent` 的工厂类是 `SessionDispatcherResourceManagerComponentFactory`,该工厂类创建 `SessionDispatcherResourceManagerComponent`:由 `SessionDispatcherFactory` 创建 `StandaloneDispatcher`, 由 `StandaloneResourceManagerFactory` 创建 `StandaloneResourceManager`。<br />在 `StandaloneJobClusterEntrypoint` 中,生成 `DispatcherResourceManagerComponent` 的工厂类是 `JobDispatcherResourceManagerComponentFactory`,该厂类创建 `JobDispatcherResourceManagerComponent`:由 `StandaloneResourceManagerFactory` 创建 `StandaloneResourceManager`,由 `JobDispatcherFactory` 创建 `MiniDispatcher`。一个 `MiniDispatcher` 和一个 `JobGraph` 相绑定,一旦绑定的 `JobGraph` 执行结束,则关闭 `MiniDispatcher`,进而停止 JobManager 进程。<br />`Dispatcher` 和 `ResourceManager` 服务内部的启动流程则和 MiniCluster 中一致,这里不再赘述。<a name="714e9b80"></a>### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#taskmanager-%E7%9A%84%E5%90%AF%E5%8A%A8)TaskManager 的启动TaskManager 的启动入口在 `TaskManagerRunner` 中,它的启动流程和 MiniCluster 模式下基本一致,区别在于: 1)运行在独立的进程中, 2)`HighAvailabilityServices` 的创建要依赖配置文件获取。 `TaskManagerRunner` 会创建 `TaskExecutor`,`TaskExecutor` 通过 `HighAvailabilityServices` 获取 `ResourceManager` 的通信地址,并和 `ResourceManager` 建立连接。<a name="43ab2982"></a>## [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#yarn-cluster-%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B)Yarn Cluster 的启动流程Yarn Cluster 的启动入口在 `FlinkYarnSessionCli` 中 :首先根据命令行参数创建 `YarnClusterDescriptor`,接着调用 `YarnClusterDescriptor#deploySessionCluster` 触发集群的部署。<br />实际启动的逻辑在 `AbstractYarnClusterDescriptor#deployInternal` 中,主要就是通过 `YarnClient` 向 yarn 集群提交应用,启动 ApplicationMaster:|
abstract class AbstractYarnClusterDescriptor {
protected ClusterClient
ApplicationReport report = startAppMaster(flinkConfiguration,applicationName,yarnClusterEntrypoint,jobGraph,yarnClient,yarnApplication,validClusterSpecification);//.....return createYarnClusterClient(this,validClusterSpecification.getNumberTaskManagers(),validClusterSpecification.getSlotsPerTaskManager(),report,flinkConfiguration,true);}
}
|| --- |根据 sessioncluster 和 jobcluster 者两种启动的区别, 提交到 Yarn 中 ApplicationMatser 的入口类分别为 `YarnSessionClusterEntrypoint` 和 `YarnJobClusterEntrypoint`, 区别在于 Dispatcher 分别为 `StandaloneDispatcher` 和 `MiniDispatcher`。`ResoureManager` 的具体实现类为 `YarnResourceManager`。<br />和前述的 Standalone Cluster 不同, Yarn Cluster 模式下启动的 Flink 集群,其 `TaskManager` 是由 `YarnResourceManager` 根据 JobMaster 的请求动态向 Yarn 的 ResourceManager 进行申请的。在 JobMaster 向 ResourceManager 申请资源时,如果当前没有足够的资源分配,则 `YarnResourceManager` 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 `TaskManager`:|
class YarnResourceManager { //申请container private void requestYarnContainer() { resourceManagerClient.addContainerRequest(getContainerRequest()); // make sure we transmit the request fast and receive fast news of granted allocations resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); numPendingContainerRequests++; log.info(“Requesting new TaskExecutor container with resources {}. Number pending requests {}.”, resource, numPendingContainerRequests); }
//分配container的回调函数@Overridepublic void onContainersAllocated(List<Container> containers) {runAsync(() -> {final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();for (Container container : containers) {log.info("Received new container: {} - Remaining pending container requests: {}",container.getId(),numPendingContainerRequests);if (numPendingContainerRequests > 0) {removeContainerRequest(pendingRequestsIterator.next());final String containerIdStr = container.getId().toString();final ResourceID resourceId = new ResourceID(containerIdStr);workerNodeMap.put(resourceId, new YarnWorkerNode(container));try {// Context information used to start a TaskExecutor Java processContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(container.getResource(),containerIdStr,container.getNodeId().getHost());//启动 TaskManagernodeManagerClient.startContainer(container, taskExecutorLaunchContext);} catch (Throwable t) {log.error("Could not start TaskManager in container {}.", container.getId(), t);// release the failed containerworkerNodeMap.remove(resourceId);resourceManagerClient.releaseAssignedContainer(container.getId());// and ask for a new onerequestYarnContainerIfRequired();}} else {// return the excessive containerslog.info("Returning excess container {}.", container.getId());resourceManagerClient.releaseAssignedContainer(container.getId());}}// if we are waiting for no further containers, we can go to the// regular heartbeat intervalif (numPendingContainerRequests <= 0) {resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);}});}//创建 TaskManager 的启动上下文private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)throws Exception {// init the ContainerLaunchContextfinal String currDir = env.get(ApplicationConstants.Environment.PWD.key());final ContaineredTaskManagerParameters taskManagerParameters =ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +"JVM direct memory limit {} MB",containerId,taskManagerParameters.taskManagerTotalMemoryMB(),taskManagerParameters.taskManagerHeapSizeMB(),taskManagerParameters.taskManagerDirectMemoryLimitMB());Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);log.debug("TaskManager configuration: {}", taskManagerConfig);ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(flinkConfig,yarnConfig,env,taskManagerParameters,taskManagerConfig,currDir,YarnTaskExecutorRunner.class, //入口类log);// set a special environment variable to uniquely identify this containertaskExecutorLaunchContext.getEnvironment().put(ENV_FLINK_CONTAINER_ID, containerId);taskExecutorLaunchContext.getEnvironment().put(ENV_FLINK_NODE_ID, host);return taskExecutorLaunchContext;}
} ``` | | —- |
小结
本文简单分析了 Flink 集群的启动流程,以及 ResourceManager、 TaskExecutor Dispatcher、 JobMaster 等不同组件之间的通信过程。
参考
-EOF-
