Client端Application提交
/opt/flink-1.14.3/bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.namespace=flink-cluster \
-Dkubernetes.service-account=xxxx-test \
-Dkubernetes.cluster-id=xxxx-creative-dim-cluster \
-Dkubernetes.container.image=registry.cn-beijing.aliyuncs.com/xxxx/xxxx_flink:v1 \
-Dkubernetes.container.image.pull-policy=always \
-Dkubernetes.container.image.pull-secrets=xxxx \
-Dkubernetes.config.file=target/classes/ali-k8s.txt \
-Dkubernetes.pod-template-file=target/classes/pod-template-file.yaml \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.taskmanager.cpu=3 \
-Djobmanager.memory.process.size=4g \
-Dtaskmanager.memory.process.size=14g \
-Dtaskmanager.memory.task.heap.size=10g \
-Dtaskmanager.memory.managed.size=100m \
-Dtaskmanager.memory.task.off-heap.size=2g \
-Dtaskmanager.numberOfTaskSlots=10 \
-Dcluster.fine-grained-resource-management.enabled=true \
-Dmetrics.reporter.promgateway.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter \
-Dmetrics.reporter.promgateway.host=192.168.28.112 \
-Dmetrics.reporter.promgateway.port=9091 \
-Dmetrics.reporter.promgateway.jobName=xxxx_creative_dim \
-Dmetrics.reporter.promgateway.randomJobNameSuffix=true \
-Dmetrics.reporter.promgateway.deleteOnShutdown=false \
-Dmetrics.reporter.promgateway.interval='60 SECONDS' \
-Dmetrics.reporter.promgateway.groupingKey="app_type=flink;k8s_env=ali_k8s;docker_env=xxxx_harbor;app_name=xxxx_creative_dim" \
local:///opt/xxxx_realtime/usrlib/xxxx_realtime.jar
由flink-dist/src/main/flink-bin/bin/flink可知flink任务提交的入口在org.apache.flink.client.cli.CliFrontend。
CliFrontend初始化接收两个参数,本地配置文件配置和命令行参数配置。
根据命令行第一个人参数run-application选择action为ACTION_RUN_APPLICATION,根据—target加载ClusterClientFactory为KubernetesClusterClientFactory,KubernetesClusterClientFactory创建KubernetesClusterDescriptor。
KubernetesClusterDescriptor.deployApplicationCluster()方法中指定container入口为KubernetesApplicationClusterEntrypoint,并deploy k8s Deployment类型。
集群初始化
在HA文章里从HA的角度简要介绍了DispatchRunner创建选举后,回调创建Dispatch的过程。
这里我们说明一下通过KubernetesApplicationClusterEntrypoint启动集群的过程。
集群组件的初始化主要涉及三个类,下面以dispatch为例,其他组件类名可能有差别,但大体上一致:
- DispatcherRunner:作为LeaderContender,回调启动DispatcherLeaderProcess
- DispatcherLeaderProcess:Dispatcher的封装,维护整个dispatch的生命周期
- Dispatcher:真正提供Dispatch服务的RPCEndpoint
整个K8s Flink集群的入口在org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint#main方法。最终的核心入口在org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster方法。
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
//初始化Services,如HA
initializeServices(configuration, pluginManager);
//创建DispatcherResourceManagerComponentFactory
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
//启动clusterComponent,包含了Dispatcher、ResourceManager、WebMonitorEndpoint。
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
```
}
}
//DefaultDispatcherResourceManagerComponentFactory包含了dispatcherRunnerFactory,resourceManagerFactory //dispatcherRunnerFactory在dispatcherResourceManagerComponentFactory.create中创建dispatcherRunner。 //dispatcherRunnerFactory包含了ApplicationDispatcherLeaderProcessFactoryFactory,用于创建dispatcherRunner时建DispatcherLeaderProcessFactory //DispatcherLeaderProcessFactory用于lead选举回调中创建DispatcherLeaderProcess,DispatcherLeaderProcess创建后start()过程中创建Dispatch。 protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(final Configuration configuration) { return new DefaultDispatcherResourceManagerComponentFactory(
new DefaultDispatcherRunnerFactory(
ApplicationDispatcherLeaderProcessFactoryFactory.create(
//Application使用SessionDispatcherFactory.INSTANCE
configuration, SessionDispatcherFactory.INSTANCE, program)),
resourceManagerFactory,
JobRestEndpointFactory.INSTANCE);
}
> TODO:此处大量使用了[工厂模式](https://refactoringguru.cn/design-patterns/factory-method),可以参考[Apache Flink Code Style and Quality Guide](https://flink.apache.org/contributing/code-style-and-quality-preamble.html)和google的[Guide: Writing Testable Code](https://testing.googleblog.com/2008/11/guide-to-writing-testable-code.html)两篇文章体会一下。
<a name="ptHuz"></a>
# Application模式JobGraph提交
以上分析了Dispatch的创建流程,那么Dispatch创建后,application模式下任务是怎么提交给Dispatch的呢?<br />逻辑是在ApplicationDispatcherBootstrap实现的。
1. 首先分析一下DispatcherBootstrapFactory声明的调用链。
```java
//org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory#createFactory
public DispatcherLeaderProcessFactory createFactory(
JobPersistenceComponentFactory jobPersistenceComponentFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices,
FatalErrorHandler fatalErrorHandler) {
//创建DispatcherLeaderProcessFactory会先ApplicationDispatcherGatewayServiceFactory。
final ApplicationDispatcherGatewayServiceFactory dispatcherServiceFactory =
new ApplicationDispatcherGatewayServiceFactory(
configuration,
dispatcherFactory,
program,
rpcService,
partialDispatcherServices);
//Application模式最终使用的是SessionDispatcherLeaderProcessFactory
return new SessionDispatcherLeaderProcessFactory(
dispatcherServiceFactory,
jobPersistenceComponentFactory,
ioExecutor,
fatalErrorHandler);
}
//SessionDispatcherLeaderProcess创建Dispathcer代理到ApplicationDispatcherGatewayServiceFactory
//org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcher
private void createDispatcher(
Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {
final DispatcherGatewayService dispatcherService =
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
jobGraphs,
recoveredDirtyJobResults,
jobGraphStore,
jobResultStore);
completeDispatcherSetup(dispatcherService);
}
//ApplicationDispatcherGatewayServiceFactory创建dispatch时会创建DispatcherBootstrapFactory,此处是lambda形式的ApplicationDispatcherBootstrap factory。
//org.apache.flink.client.deployment.application.ApplicationDispatcherGatewayServiceFactory#create
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
Collection<JobResult> recoveredDirtyJobResults,
JobGraphWriter jobGraphWriter,
JobResultStore jobResultStore) {
final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);
final Dispatcher dispatcher;
try {
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
recoveredDirtyJobResults,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new ApplicationDispatcherBootstrap(
application,
recoveredJobIds,
configuration,
dispatcherGateway,
scheduledExecutor,
errorHandler),
PartialDispatcherServicesWithJobPersistenceComponents.from(
partialDispatcherServices, jobGraphWriter, jobResultStore));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
//dispatcher endpoint启动
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
- ApplicationDispatcherBootstrap factory的调度是在什么地方?
跟踪发下是在org.apache.flink.runtime.dispatcher.Dispatcher#onStart。
@Override
public void onStart() throws Exception {
this.dispatcherBootstrap =
this.dispatcherBootstrapFactory.create(
getSelfGateway(DispatcherGateway.class),
this.getRpcService().getScheduledExecutor(),
this::onFatalError);
}
Dispatch作为一个RPCEndPoint,启动在上一步的源码分析代码中`dispatcher.start();`提到。onStart方法的调度流程,可以回看一下[Flink RPC](https://www.yuque.com/u22594583/ydf48t/nfmqmy)这篇文档介绍rpc启动的逻辑。
3. ApplicationDispatcherBootstrap执行用户代码(任务转换JobGraph并提交)。
1. 共用了CLient模块的代码ClientUtils.executeProgram()去真正执行jar包里的代码。<br />ClientUtils.executeProgram()调用ContextEnvironment和StreamContextEnvironment的setAsContext静态方法初始化ContextEnvironment类属性_contextEnvironmentFactory_。<br />我们在flink 任务编时 `StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment_();` ,<br />获取的StreamExecutionEnvironment 也就是StreamExecutionEnvironmentFactory#createExecutionEnvironment创建的。
```java
//org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationEntryPoint
private void runApplicationEntryPoint(
final CompletableFuture<List<JobID>> jobIdsFuture,
final Set<JobID> tolerateMissingResult,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor,
final boolean enforceSingleJobExecution,
final boolean submitFailedJobOnApplicationError) {
···
final PipelineExecutorServiceLoader executorServiceLoader =
new EmbeddedExecutorServiceLoader(
applicationJobIds, dispatcherGateway, scheduledExecutor);
ClientUtils.executeProgram(
executorServiceLoader,
configuration,
application,
enforceSingleJobExecution,
true /* suppress sysout */);
···
}
//ClientUtils.executeProgram
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info(
"Starting program (detached: {})",
!configuration.getBoolean(DeploymentOptions.ATTACHED));
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
- 用户代码执行StreamExecutionEnvironment#execute(java.lang.String),转换transformations为StreamGraph
调用StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph),通过EmbeddedExecutor执行StreamGraph
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
//根据setAsContext设置,此处getExecutor为EmbeddedExecutor#execute CompletableFuture
jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
}
EmbeddedExecutor把streamGraph转换为jobGraph,并通过dispatcherGateway.submitJob提交任务到dispatch。 ```java
public CompletableFuture
private CompletableFuture