Client端Application提交
/opt/flink-1.14.3/bin/flink run-application \--target kubernetes-application \-Dkubernetes.namespace=flink-cluster \-Dkubernetes.service-account=xxxx-test \-Dkubernetes.cluster-id=xxxx-creative-dim-cluster \-Dkubernetes.container.image=registry.cn-beijing.aliyuncs.com/xxxx/xxxx_flink:v1 \-Dkubernetes.container.image.pull-policy=always \-Dkubernetes.container.image.pull-secrets=xxxx \-Dkubernetes.config.file=target/classes/ali-k8s.txt \-Dkubernetes.pod-template-file=target/classes/pod-template-file.yaml \-Dkubernetes.rest-service.exposed.type=NodePort \-Dkubernetes.taskmanager.cpu=3 \-Djobmanager.memory.process.size=4g \-Dtaskmanager.memory.process.size=14g \-Dtaskmanager.memory.task.heap.size=10g \-Dtaskmanager.memory.managed.size=100m \-Dtaskmanager.memory.task.off-heap.size=2g \-Dtaskmanager.numberOfTaskSlots=10 \-Dcluster.fine-grained-resource-management.enabled=true \-Dmetrics.reporter.promgateway.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter \-Dmetrics.reporter.promgateway.host=192.168.28.112 \-Dmetrics.reporter.promgateway.port=9091 \-Dmetrics.reporter.promgateway.jobName=xxxx_creative_dim \-Dmetrics.reporter.promgateway.randomJobNameSuffix=true \-Dmetrics.reporter.promgateway.deleteOnShutdown=false \-Dmetrics.reporter.promgateway.interval='60 SECONDS' \-Dmetrics.reporter.promgateway.groupingKey="app_type=flink;k8s_env=ali_k8s;docker_env=xxxx_harbor;app_name=xxxx_creative_dim" \local:///opt/xxxx_realtime/usrlib/xxxx_realtime.jar
由flink-dist/src/main/flink-bin/bin/flink可知flink任务提交的入口在org.apache.flink.client.cli.CliFrontend。
CliFrontend初始化接收两个参数,本地配置文件配置和命令行参数配置。
根据命令行第一个人参数run-application选择action为ACTION_RUN_APPLICATION,根据—target加载ClusterClientFactory为KubernetesClusterClientFactory,KubernetesClusterClientFactory创建KubernetesClusterDescriptor。
KubernetesClusterDescriptor.deployApplicationCluster()方法中指定container入口为KubernetesApplicationClusterEntrypoint,并deploy k8s Deployment类型。
集群初始化
在HA文章里从HA的角度简要介绍了DispatchRunner创建选举后,回调创建Dispatch的过程。
这里我们说明一下通过KubernetesApplicationClusterEntrypoint启动集群的过程。
集群组件的初始化主要涉及三个类,下面以dispatch为例,其他组件类名可能有差别,但大体上一致:
- DispatcherRunner:作为LeaderContender,回调启动DispatcherLeaderProcess
- DispatcherLeaderProcess:Dispatcher的封装,维护整个dispatch的生命周期
- Dispatcher:真正提供Dispatch服务的RPCEndpoint
整个K8s Flink集群的入口在org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint#main方法。最终的核心入口在org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster方法。
private void runCluster(Configuration configuration, PluginManager pluginManager)throws Exception {synchronized (lock) {//初始化Services,如HAinitializeServices(configuration, pluginManager);
//创建DispatcherResourceManagerComponentFactoryfinal DispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);//启动clusterComponent,包含了Dispatcher、ResourceManager、WebMonitorEndpoint。clusterComponent =dispatcherResourceManagerComponentFactory.create(configuration,resourceId.unwrap(),ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,metricRegistry,executionGraphInfoStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),this);```}
}
//DefaultDispatcherResourceManagerComponentFactory包含了dispatcherRunnerFactory,resourceManagerFactory //dispatcherRunnerFactory在dispatcherResourceManagerComponentFactory.create中创建dispatcherRunner。 //dispatcherRunnerFactory包含了ApplicationDispatcherLeaderProcessFactoryFactory,用于创建dispatcherRunner时建DispatcherLeaderProcessFactory //DispatcherLeaderProcessFactory用于lead选举回调中创建DispatcherLeaderProcess,DispatcherLeaderProcess创建后start()过程中创建Dispatch。 protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(final Configuration configuration) { return new DefaultDispatcherResourceManagerComponentFactory(
new DefaultDispatcherRunnerFactory(ApplicationDispatcherLeaderProcessFactoryFactory.create(//Application使用SessionDispatcherFactory.INSTANCEconfiguration, SessionDispatcherFactory.INSTANCE, program)),resourceManagerFactory,JobRestEndpointFactory.INSTANCE);
}
> TODO:此处大量使用了[工厂模式](https://refactoringguru.cn/design-patterns/factory-method),可以参考[Apache Flink Code Style and Quality Guide](https://flink.apache.org/contributing/code-style-and-quality-preamble.html)和google的[Guide: Writing Testable Code](https://testing.googleblog.com/2008/11/guide-to-writing-testable-code.html)两篇文章体会一下。<a name="ptHuz"></a># Application模式JobGraph提交以上分析了Dispatch的创建流程,那么Dispatch创建后,application模式下任务是怎么提交给Dispatch的呢?<br />逻辑是在ApplicationDispatcherBootstrap实现的。1. 首先分析一下DispatcherBootstrapFactory声明的调用链。```java//org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory#createFactorypublic DispatcherLeaderProcessFactory createFactory(JobPersistenceComponentFactory jobPersistenceComponentFactory,Executor ioExecutor,RpcService rpcService,PartialDispatcherServices partialDispatcherServices,FatalErrorHandler fatalErrorHandler) {//创建DispatcherLeaderProcessFactory会先ApplicationDispatcherGatewayServiceFactory。final ApplicationDispatcherGatewayServiceFactory dispatcherServiceFactory =new ApplicationDispatcherGatewayServiceFactory(configuration,dispatcherFactory,program,rpcService,partialDispatcherServices);//Application模式最终使用的是SessionDispatcherLeaderProcessFactoryreturn new SessionDispatcherLeaderProcessFactory(dispatcherServiceFactory,jobPersistenceComponentFactory,ioExecutor,fatalErrorHandler);}//SessionDispatcherLeaderProcess创建Dispathcer代理到ApplicationDispatcherGatewayServiceFactory//org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcherprivate void createDispatcher(Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {final DispatcherGatewayService dispatcherService =dispatcherGatewayServiceFactory.create(DispatcherId.fromUuid(getLeaderSessionId()),jobGraphs,recoveredDirtyJobResults,jobGraphStore,jobResultStore);completeDispatcherSetup(dispatcherService);}//ApplicationDispatcherGatewayServiceFactory创建dispatch时会创建DispatcherBootstrapFactory,此处是lambda形式的ApplicationDispatcherBootstrap factory。//org.apache.flink.client.deployment.application.ApplicationDispatcherGatewayServiceFactory#createpublic AbstractDispatcherLeaderProcess.DispatcherGatewayService create(DispatcherId fencingToken,Collection<JobGraph> recoveredJobs,Collection<JobResult> recoveredDirtyJobResults,JobGraphWriter jobGraphWriter,JobResultStore jobResultStore) {final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);final Dispatcher dispatcher;try {dispatcher =dispatcherFactory.createDispatcher(rpcService,fencingToken,recoveredJobs,recoveredDirtyJobResults,(dispatcherGateway, scheduledExecutor, errorHandler) ->new ApplicationDispatcherBootstrap(application,recoveredJobIds,configuration,dispatcherGateway,scheduledExecutor,errorHandler),PartialDispatcherServicesWithJobPersistenceComponents.from(partialDispatcherServices, jobGraphWriter, jobResultStore));} catch (Exception e) {throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);}//dispatcher endpoint启动dispatcher.start();return DefaultDispatcherGatewayService.from(dispatcher);}
- ApplicationDispatcherBootstrap factory的调度是在什么地方?
跟踪发下是在org.apache.flink.runtime.dispatcher.Dispatcher#onStart。
@Overridepublic void onStart() throws Exception {
this.dispatcherBootstrap =this.dispatcherBootstrapFactory.create(getSelfGateway(DispatcherGateway.class),this.getRpcService().getScheduledExecutor(),this::onFatalError);
}
Dispatch作为一个RPCEndPoint,启动在上一步的源码分析代码中`dispatcher.start();`提到。onStart方法的调度流程,可以回看一下[Flink RPC](https://www.yuque.com/u22594583/ydf48t/nfmqmy)这篇文档介绍rpc启动的逻辑。3. ApplicationDispatcherBootstrap执行用户代码(任务转换JobGraph并提交)。1. 共用了CLient模块的代码ClientUtils.executeProgram()去真正执行jar包里的代码。<br />ClientUtils.executeProgram()调用ContextEnvironment和StreamContextEnvironment的setAsContext静态方法初始化ContextEnvironment类属性_contextEnvironmentFactory_。<br />我们在flink 任务编时 `StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment_();` ,<br />获取的StreamExecutionEnvironment 也就是StreamExecutionEnvironmentFactory#createExecutionEnvironment创建的。```java//org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationEntryPointprivate void runApplicationEntryPoint(final CompletableFuture<List<JobID>> jobIdsFuture,final Set<JobID> tolerateMissingResult,final DispatcherGateway dispatcherGateway,final ScheduledExecutor scheduledExecutor,final boolean enforceSingleJobExecution,final boolean submitFailedJobOnApplicationError) {···final PipelineExecutorServiceLoader executorServiceLoader =new EmbeddedExecutorServiceLoader(applicationJobIds, dispatcherGateway, scheduledExecutor);ClientUtils.executeProgram(executorServiceLoader,configuration,application,enforceSingleJobExecution,true /* suppress sysout */);···}//ClientUtils.executeProgrampublic static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader,Configuration configuration,PackagedProgram program,boolean enforceSingleJobExecution,boolean suppressSysout)throws ProgramInvocationException {checkNotNull(executorServiceLoader);final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();try {Thread.currentThread().setContextClassLoader(userCodeClassLoader);LOG.info("Starting program (detached: {})",!configuration.getBoolean(DeploymentOptions.ATTACHED));ContextEnvironment.setAsContext(executorServiceLoader,configuration,userCodeClassLoader,enforceSingleJobExecution,suppressSysout);StreamContextEnvironment.setAsContext(executorServiceLoader,configuration,userCodeClassLoader,enforceSingleJobExecution,suppressSysout);try {program.invokeInteractiveModeForExecution();} finally {ContextEnvironment.unsetAsContext();StreamContextEnvironment.unsetAsContext();}} finally {Thread.currentThread().setContextClassLoader(contextClassLoader);}}
- 用户代码执行StreamExecutionEnvironment#execute(java.lang.String),转换transformations为StreamGraph
调用StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph),通过EmbeddedExecutor执行StreamGraph
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
//根据setAsContext设置,此处getExecutor为EmbeddedExecutor#execute CompletableFuture
jobClientFuture = executorFactory.getExecutor(configuration).execute(streamGraph, configuration, userClassloader);
}
EmbeddedExecutor把streamGraph转换为jobGraph,并通过dispatcherGateway.submitJob提交任务到dispatch。 ```java
public CompletableFuture
private CompletableFuture
