在 Flink 1.5.0 版本发布的时候,Flink 迎来了一个重要的改进:根据 FLIP-6 重构了 Flink 集群部署和任务处理模型,以便更好地和管理资源和调度任务,更优雅地和 Yarn、 Mesos、Kubernetes 等框架进行集成。
在这篇文章中,我们将对 Flink 集群的启动流程加一分析。本文的分析基于 Flink 1.9-SNAPSHOT 版本的代码。
HA 及 Leader 选举
Flink 内部的组件如 ResourceManager, JobManager 等都可以配置 HA 模式,Flink 集群启动的的时候会大量涉及到 Leader 选举,Leader 地址获取等相关的操作,因而先对 HA 相关的概念进行介绍。
Leader 地址的获取通过 LeaderRetrievalLister
和 LeaderRetriverService
这两个接口来完成。 LeaderRetriverService
可以启动一个对 Leader 地址的监听,在 Leader 选举完成后得到通知。
| ``` public interface LeaderRetrievalService { void start(LeaderRetrievalListener listener) throws Exception; void stop() throws Exception; } public interface LeaderRetrievalListener { void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID); void handleError(Exception exception); }
|
| --- |
`GatewayRetriver` 接口用于获取 `RpcGateway`,抽象类 `LeaderGatewayRetriver` 则同时继承了 `LeaderRetriever` 和 `GatewayRetriver`,因而1)可以在Leader选举完成后得到 Leader 地址 2)可以获取到 Leader 的 RpcGateway。<br />`RpcGatewayRetriever` 是 `LeaderGatewayRetriver` 的具体实现,根据 Leader 的地址通过 `RpcService.connect()` 方法获得对应 Leader 的 RpcGateway。
|
class RpcGatewayRetriever
|
| --- |
Leader 选举是通过 `LeaderElectionService`(选举服务)和 `LeaderContender`(参与竞选的对象)共同来完成的,每一次选举成功后都会有唯一的 leaderSessionID,可以用来作为 RpcGateway 通信的 fence token。当一个 `LeaderContender` 竞选成功了,会通过 `LeaderContender#grantLeadership` 得到通知。
|
public interface LeaderElectionService { void start(LeaderContender contender) throws Exception; void stop() throws Exception; void confirmLeaderSessionID(UUID leaderSessionID); boolean hasLeadership(@Nonnull UUID leaderSessionId); } public interface LeaderContender { void grantLeadership(UUID leaderSessionID); void revokeLeadership(); String getAddress(); void handleError(Exception exception); }
|
| --- |
`LeaderElectionService` 有多种实现,如无需进行选举过程的 `StandaloneLeaderElectionService`,以及借助 zookeeper 和 curator 框架实现的 `ZooKeeperLeaderElectionService`,具体的实现细节可参考对应的源码。<br />`HighAvailabilityServices` 接口则提供了获取 HA 相关所有服务的方法,包括:
- ResourceManager 选举服务及 Leader 获取
- Dispatcher 选举服务及 Leader 获取
- 任务状态的注册表
- checkpoint recovery、blob store 等相关的服务
<a name="363f3bcf"></a>
## [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#minicluster-%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B)MiniCluster 的启动流程
我们先从最为简单的 MiniCluster 着手,分析一下 Flink 的启动流程以及内部各组件之间的交互。 MiniCluster 可以看做是一个内嵌的 Flink 运行时环境,所有的组件都在独立的本地线程中运行。MiniCluster 的启动入口在 `LocalStreamEnvironment` 中。<br />在 `MiniCluster#start` 中,启动流程大致分为三个阶段:
- 创建一些辅助的服务,如 `RpcService`, `HighAvailabilityServices`, `BlobServer` 等
- 启动 TaskManager
- 启动 Dispatcher, ResourceManager 等
<a name="01592dec"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E5%88%9B%E5%BB%BA-highavailabilityservices)创建 `HighAvailabilityServices`
|
class MiniCluster { public void start() { //…… ioExecutor = Executors.newFixedThreadPool( Hardware.getNumberCPUCores(), new ExecutorThreadFactory(“mini-cluster-io”)); haServices = createHighAvailabilityServices(configuration, ioExecutor); //…… }
protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
LOG.info("Starting high-availability services");
return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
configuration,
executor);
}
}
|
| --- |
`HighAvailabilityServicesUtils` 是创建 `HighAvailabilityServices` 的工具类,在没有配置 HA 的情况下,会创建 `EmbeddedHaServices`。 `EmbeddedHaServices` 不具备高可用的特性,适用于 ResourceMangaer, TaksManager,JobManager 等所有组件都运行在同一个进程的情况。`EmbeddedHaService` 为各组件创建的选举服务为 `EmbeddedLeaderElectionService`, 一旦有参与选举的 `LeaderContender` 加入,该 contender 就被选择为 leader。
<a name="6e7f3121"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E5%90%AF%E5%8A%A8-taskmanager)启动 TaskManager
|
class MiniCluster { public void start() { //…… startTaskManagers(); //…… } private void startTaskManagers() throws Exception { final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); for (int i = 0; i < numTaskManagers; i++) { startTaskExecutor(); } } @VisibleForTesting void startTaskExecutor() throws Exception { synchronized (lock) { final Configuration configuration = miniClusterConfiguration.getConfiguration(); final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager( configuration, new ResourceID(UUID.randomUUID().toString()), taskManagerRpcServiceFactory.createRpcService(), haServices, heartbeatServices, metricRegistry, blobCacheService, useLocalCommunication(), taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size())); taskExecutor.start(); taskManagers.add(taskExecutor); } } }
|
| --- |
在创建 `HighAvailabilityServices` 之后,就可以依次启动 TaskManager 了。`TaskManagerRunner#startTaskManager` 会创建一个 `TaskExecutor`, `TaskExecutor` 实现了 `RpcEndpoint` 接口。 `TaskExecutor` 需要和 `ResourceManager` 等组件进行通信,可以通过 `HighAvailabilityServices` 获得对应的服务地址。<br />在 `TaskExecutor` 启动的回调函数中,会启动一系列服务
|
class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public void onStart() throws Exception { try { //启动服务 startTaskExecutorServices(); } catch (Exception e) { final TaskManagerException exception = new TaskManagerException(String.format(“Could not start the TaskExecutor %s”, getAddress()), e); onFatalError(exception); throw exception; } //超时交由 FatalErrorHandler 进行处理 startRegistrationTimeout(); } private void startTaskExecutorServices() throws Exception { try { // start by connecting to the ResourceManager resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); // tell the task slot table who’s responsible for the task slot actions taskSlotTable.start(new SlotActionsImpl()); // start the job leader service jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService()); } catch (Exception e) { handleStartTaskExecutorServicesException(e); } }
/**
* The listener for leader changes of the resource manager.
*/
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
//获得 ResourceManager 的地址, 和 ResourceManager 建立连接
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
public void handleError(Exception exception) {
onFatalError(exception);
}
}
private final class JobLeaderListenerImpl implements JobLeaderListener {
@Override
public void jobManagerGainedLeadership(
final JobID jobId,
final JobMasterGateway jobManagerGateway,
final JMTMRegistrationSuccess registrationMessage) {
//和 JobManager 建立连接
runAsync(
() ->
establishJobManagerConnection(
jobId,
jobManagerGateway,
registrationMessage));
}
@Override
public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);
runAsync(() ->
closeJobManagerConnection(
jobId,
new Exception("Job leader for job id " + jobId + " lost leadership.")));
}
@Override
public void handleError(Throwable throwable) {
onFatalError(throwable);
}
}
}
|
| --- |
当 `ResourceManagerLeaderListener` 的监听被回调时,`TaskExecutor` 会试图建立和 `ResourceManager` 的连接,连接被封装为 `TaskExecutorToResourceManagerConnection`。一旦获取 `ResourceManager` 的 leader 被确定后,就可以获取到 `ResourceManager` 对应的 RpcGateway, 接下来就可以通过 RPC 调用发起 `ResourceManager#registerTaskExecutor` 注册流程。注册成功后,`TaskExecutor` 向 `ResourceManager` 报告其资源(主要是 slots)情况。
|
class TaskExecutor {
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
//发送SlotReport
final CompletableFuture
|
| --- |
<a name="d046d63f"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E5%90%AF%E5%8A%A8-dispatcherresourcemanagercomponent)启动 `DispatcherResourceManagerComponent`
|
class MiniCluster { public void start() { //…… dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents( configuration, dispatcherResourceManagreComponentRpcServiceFactory, haServices, blobServer, heartbeatServices, metricRegistry, metricQueryServiceRetriever, new ShutDownFatalErrorHandler() )); //…… } protected Collection<? extends DispatcherResourceManagerComponent<?>> createDispatcherResourceManagerComponents( Configuration configuration, RpcServiceFactory rpcServiceFactory, HighAvailabilityServices haServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { //Session dispatcher, standalone resource manager SessionDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(); return Collections.singleton( dispatcherResourceManagerComponentFactory.create( configuration, rpcServiceFactory.createRpcService(), haServices, blobServer, heartbeatServices, metricRegistry, new MemoryArchivedExecutionGraphStore(), metricQueryServiceRetriever, fatalErrorHandler)); } @Nonnull private SessionDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() { return new SessionDispatcherResourceManagerComponentFactory(StandaloneResourceManagerFactory.INSTANCE); } }
|
| --- |
在 MiniCluster 模式下,会创建一个 `SessionDispatcherResourceManagerComponent` 对象。`SessionDispatcherResourceManagerComponent` 继承自 `DispatcherResourceManagerComponent`,用来启动 Dispatcher, ResourceManager,和 WebMonitorEndpoint, 这些组件都在同一个进程中运行。MiniCluster 模式下启动的是 `StandaloneDispatcher` 和 `StandaloneResourceManager`。<br />在工厂类创建 `DispatcherResourceManagerComponent`, 会启动 Dispatcher, ResourceManager 等组件:
|
public abstract class AbstractDispatcherResourceManagerComponentFactory
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
jobManagerMetricGroup);
final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
dispatcher = dispatcherFactory.createDispatcher(
configuration,
rpcService,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricRegistry.getMetricQueryServiceGatewayRpcAddress(),
archivedExecutionGraphStore,
fatalErrorHandler,
historyServerArchivist);
log.debug("Starting ResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
log.debug("Starting Dispatcher.");
dispatcher.start();
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
return createDispatcherResourceManagerComponent(
dispatcher,
resourceManager,
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint,
jobManagerMetricGroup);
}
}
|
| --- |
在 `ResourceManager` 启动的回调函数中,会通过 `HighAvailabilityServices` 获取到选举服务,从而参与到选举之中。并启动 `JobLeaderIdService`,管理向当前 ResourceManager 注册的作业的 leader id。
|
abstract class ResourceManager
|
| --- |
在 `Dispatcher` 启动的回调函数中,当前 Dispatcher 也会通过 `LeaderElectionService` 参与选举。
|
public abstract class Dispatcher extends FencedRpcEndpoint
@Override
public void onStart() throws Exception {
try {
startDispatcherServices();
} catch (Exception e) {
final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), e);
onFatalError(exception);
throw exception;
}
}
private void startDispatcherServices() throws Exception {
try {
submittedJobGraphStore.start(this);
leaderElectionService.start(this);
registerDispatcherMetrics(jobManagerMetricGroup);
} catch (Exception e) {
handleStartDispatcherServicesException(e);
}
}
}
|
| --- |
<a name="4adb4f64"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#%E6%8F%90%E4%BA%A4-jobgraph)提交 JobGraph
通过 `MiniCluster#executeJobBlocking` 提交 `JobGraph` 并等待运行完成,提交`JobGraph`和请求运行结果的逻辑如下,都是通过 RPC 调用来实现:
|
class MiniCluster {
public CompletableFuture
|
| --- |
`Dispatcher` 在接收到提交 `JobGraph` 的请求后,会将提交的 `JobGraph` 保存在 `SubmittedJobGraphStore` 中(用于故障恢复),并为提交的 `JobGraph` 启动 JobManager:
|
class Dispatcher {
private CompletableFuture
|
| --- |
启动的 `JobManagerRunner` 会竞争 leader ,一旦被选举为 leader,就会启动一个 `JobMaster`。
|
public class JobManagerRunner implements LeaderContender, OnCompletionActions, AutoCloseableAsync { public void start() throws Exception { try { //竞争leader leaderElectionService.start(this); } catch (Exception e) { log.error(“Could not start the JobManager because the leader election service did not start.”, e); throw new Exception(“Could not start the leader election service.”, e); } }
//被选举为 leader
@Override
public void grantLeadership(final UUID leaderSessionID) {
synchronized (lock) {
if (shutdown) {
log.info("JobManagerRunner already shutdown.");
return;
}
leadershipOperation = leadershipOperation.thenCompose(
(ignored) -> {
synchronized (lock) {
return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
}
});
handleException(leadershipOperation, "Could not start the job manager.");
}
}
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
return jobSchedulingStatusFuture.thenCompose(
jobSchedulingStatus -> {
if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
return jobAlreadyDone();
} else {
return startJobMaster(leaderSessionId);
}
});
}
private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
try {
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
} catch (IOException e) {
return FutureUtils.completedExceptionally(
new FlinkException(
String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
e));
}
final CompletableFuture<Acknowledge> startFuture;
try {
//使用特定的 JobMasterId 启动 JobMaster
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
} catch (Exception e) {
return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
}
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
return startFuture.thenAcceptAsync(
(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
executor);
}
}
|
| --- |
`JobMaster` 启动后会和 `ResourceManager` 建立连接,连接被封装为 `ResourceManagerConnection`。一旦连接建立之后,`JobMaster` 就可以通过 RPC 调用和 `ResourceManager` 进行通信了:
|
public class JobMaster extends FencedRpcEndpoint
|
| --- |
在此之后就进入了任务调度执行的流程。
<a name="08fc1434"></a>
## [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#standalone-cluster-%E6%A8%A1%E5%BC%8F%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B)Standalone Cluster 模式的启动流程
在 Standalone 模式下,TaskManager 和 ResourceManager 等都在独立的进程中运行。Standalone Cluster 有两种启动方式, 即 standalonesession 模式和 standalonejob 方式,它们区别在于 Dispatcher 的实现方式不同。
<a name="67dfbcf4"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#jobmanager-%E7%9A%84%E5%90%AF%E5%8A%A8)JobManager 的启动
需要注意的一点是,这里我们所说的 **JobManager** 指的是包含 `Dispatcher`, `ResouceManager` 等组件的单一进程,而并非 `Dispatcher` 为执行 `JobGraph` 而启动的 `JobManagerRunner`。在 FLIP-6 的实现中,每个 `JobGraph` 的调度执行的实际上是由一个独立的 `JobMaster` 负责的。<br />standalonesession 方式启动的 JobManager 的入口类是 `StandaloneSessionClusterEntrypoint`, 继承自 `SessionClusterEntrypoint`;与此对应的是,以 standalonejob 方式启动 JobManager 的入口类是 `StandaloneJobClusterEntryPoint`,继承自 `JobClusterEntrypoint`。它们都由公共父类 `ClusterEntrypoint` 派生而来,区别在于生成的 `DispatcherResourceManagerComponent` 不同。<br />先来看下启动过程,实际上和 MiniCluster 模式下启动 `DispatcherResourceManagerComponent` 的过程类似:
|
abstract class ClusterEntrypoint { private void runCluster(Configuration configuration) throws Exception { synchronized (lock) {
//初始化 RpcService, HighAvailabilityServices 等服务
initializeServices(configuration);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
//生成 DispatcherResourceManagerComponentFactory,由具体子类实现
final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
//创建 DispatcherResourceManagerComponent, 启动 ResourceManager, Dispatcher
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
//一旦 DispatcherResourceManagerComponent#getShutDownFuture 完成,则关闭各项服务
clusterComponent.getShutDownFuture().whenComplete(
(ApplicationStatus applicationStatus, Throwable throwable) -> {
if (throwable != null) {
shutDownAsync(
ApplicationStatus.UNKNOWN,
ExceptionUtils.stringifyException(throwable),
false);
} else {
// This is the general shutdown path. If a separate more specific shutdown was
// already triggered, this will do nothing
shutDownAsync(
applicationStatus,
null,
true);
}
});
}
}
}
|
| --- |
这里生成的 `HighAvailabilityServices` 和 MiniCluster 模式下略有区别,由于各组件不在同一个进程中,因而需要从配置中加载配置:1)如果采用基于 Zookeeper 的 HA 模式,则创建 `ZooKeeperHaServices`,基于 zookeeper 获取 leader 通信地址 2)如果没有配置 HA,则创建 `StandaloneHaServices`, 并从配置文件中获取各组件的 RPC 地址信息。<br />在 `StandaloneSessionClusterEntrypoint` 中,生成 `DispatcherResourceManagerComponent` 的工厂类是 `SessionDispatcherResourceManagerComponentFactory`,该工厂类创建 `SessionDispatcherResourceManagerComponent`:由 `SessionDispatcherFactory` 创建 `StandaloneDispatcher`, 由 `StandaloneResourceManagerFactory` 创建 `StandaloneResourceManager`。<br />在 `StandaloneJobClusterEntrypoint` 中,生成 `DispatcherResourceManagerComponent` 的工厂类是 `JobDispatcherResourceManagerComponentFactory`,该厂类创建 `JobDispatcherResourceManagerComponent`:由 `StandaloneResourceManagerFactory` 创建 `StandaloneResourceManager`,由 `JobDispatcherFactory` 创建 `MiniDispatcher`。一个 `MiniDispatcher` 和一个 `JobGraph` 相绑定,一旦绑定的 `JobGraph` 执行结束,则关闭 `MiniDispatcher`,进而停止 JobManager 进程。<br />`Dispatcher` 和 `ResourceManager` 服务内部的启动流程则和 MiniCluster 中一致,这里不再赘述。
<a name="714e9b80"></a>
### [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#taskmanager-%E7%9A%84%E5%90%AF%E5%8A%A8)TaskManager 的启动
TaskManager 的启动入口在 `TaskManagerRunner` 中,它的启动流程和 MiniCluster 模式下基本一致,区别在于: 1)运行在独立的进程中, 2)`HighAvailabilityServices` 的创建要依赖配置文件获取。 `TaskManagerRunner` 会创建 `TaskExecutor`,`TaskExecutor` 通过 `HighAvailabilityServices` 获取 `ResourceManager` 的通信地址,并和 `ResourceManager` 建立连接。
<a name="43ab2982"></a>
## [](https://blog.jrwang.me/2019/flink-source-code-bootstarp/#yarn-cluster-%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B)Yarn Cluster 的启动流程
Yarn Cluster 的启动入口在 `FlinkYarnSessionCli` 中 :首先根据命令行参数创建 `YarnClusterDescriptor`,接着调用 `YarnClusterDescriptor#deploySessionCluster` 触发集群的部署。<br />实际启动的逻辑在 `AbstractYarnClusterDescriptor#deployInternal` 中,主要就是通过 `YarnClient` 向 yarn 集群提交应用,启动 ApplicationMaster:
|
abstract class AbstractYarnClusterDescriptor {
protected ClusterClient
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
//.....
return createYarnClusterClient(
this,
validClusterSpecification.getNumberTaskManagers(),
validClusterSpecification.getSlotsPerTaskManager(),
report,
flinkConfiguration,
true);
}
}
|
| --- |
根据 sessioncluster 和 jobcluster 者两种启动的区别, 提交到 Yarn 中 ApplicationMatser 的入口类分别为 `YarnSessionClusterEntrypoint` 和 `YarnJobClusterEntrypoint`, 区别在于 Dispatcher 分别为 `StandaloneDispatcher` 和 `MiniDispatcher`。`ResoureManager` 的具体实现类为 `YarnResourceManager`。<br />和前述的 Standalone Cluster 不同, Yarn Cluster 模式下启动的 Flink 集群,其 `TaskManager` 是由 `YarnResourceManager` 根据 JobMaster 的请求动态向 Yarn 的 ResourceManager 进行申请的。在 JobMaster 向 ResourceManager 申请资源时,如果当前没有足够的资源分配,则 `YarnResourceManager` 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 `TaskManager`:
|
class YarnResourceManager { //申请container private void requestYarnContainer() { resourceManagerClient.addContainerRequest(getContainerRequest()); // make sure we transmit the request fast and receive fast news of granted allocations resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); numPendingContainerRequests++; log.info(“Requesting new TaskExecutor container with resources {}. Number pending requests {}.”, resource, numPendingContainerRequests); }
//分配container的回调函数
@Override
public void onContainersAllocated(List<Container> containers) {
runAsync(() -> {
final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();
for (Container container : containers) {
log.info(
"Received new container: {} - Remaining pending container requests: {}",
container.getId(),
numPendingContainerRequests);
if (numPendingContainerRequests > 0) {
removeContainerRequest(pendingRequestsIterator.next());
final String containerIdStr = container.getId().toString();
final ResourceID resourceId = new ResourceID(containerIdStr);
workerNodeMap.put(resourceId, new YarnWorkerNode(container));
try {
// Context information used to start a TaskExecutor Java process
ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
container.getResource(),
containerIdStr,
container.getNodeId().getHost());
//启动 TaskManager
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start TaskManager in container {}.", container.getId(), t);
// release the failed container
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(container.getId());
// and ask for a new one
requestYarnContainerIfRequired();
}
} else {
// return the excessive containers
log.info("Returning excess container {}.", container.getId());
resourceManagerClient.releaseAssignedContainer(container.getId());
}
}
// if we are waiting for no further containers, we can go to the
// regular heartbeat interval
if (numPendingContainerRequests <= 0) {
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}
});
}
//创建 TaskManager 的启动上下文
private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
throws Exception {
// init the ContainerLaunchContext
final String currDir = env.get(ApplicationConstants.Environment.PWD.key());
final ContaineredTaskManagerParameters taskManagerParameters =
ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);
log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
"JVM direct memory limit {} MB",
containerId,
taskManagerParameters.taskManagerTotalMemoryMB(),
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());
Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);
log.debug("TaskManager configuration: {}", taskManagerConfig);
ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
flinkConfig,
yarnConfig,
env,
taskManagerParameters,
taskManagerConfig,
currDir,
YarnTaskExecutorRunner.class, //入口类
log);
// set a special environment variable to uniquely identify this container
taskExecutorLaunchContext.getEnvironment()
.put(ENV_FLINK_CONTAINER_ID, containerId);
taskExecutorLaunchContext.getEnvironment()
.put(ENV_FLINK_NODE_ID, host);
return taskExecutorLaunchContext;
}
} ``` | | —- |
小结
本文简单分析了 Flink 集群的启动流程,以及 ResourceManager
、 TaskExecutor
Dispatcher
、 JobMaster
等不同组件之间的通信过程。
参考
-EOF-