基于k8s application 模式分析。

Client端Application提交

  1. /opt/flink-1.14.3/bin/flink run-application \
  2. --target kubernetes-application \
  3. -Dkubernetes.namespace=flink-cluster \
  4. -Dkubernetes.service-account=xxxx-test \
  5. -Dkubernetes.cluster-id=xxxx-creative-dim-cluster \
  6. -Dkubernetes.container.image=registry.cn-beijing.aliyuncs.com/xxxx/xxxx_flink:v1 \
  7. -Dkubernetes.container.image.pull-policy=always \
  8. -Dkubernetes.container.image.pull-secrets=xxxx \
  9. -Dkubernetes.config.file=target/classes/ali-k8s.txt \
  10. -Dkubernetes.pod-template-file=target/classes/pod-template-file.yaml \
  11. -Dkubernetes.rest-service.exposed.type=NodePort \
  12. -Dkubernetes.taskmanager.cpu=3 \
  13. -Djobmanager.memory.process.size=4g \
  14. -Dtaskmanager.memory.process.size=14g \
  15. -Dtaskmanager.memory.task.heap.size=10g \
  16. -Dtaskmanager.memory.managed.size=100m \
  17. -Dtaskmanager.memory.task.off-heap.size=2g \
  18. -Dtaskmanager.numberOfTaskSlots=10 \
  19. -Dcluster.fine-grained-resource-management.enabled=true \
  20. -Dmetrics.reporter.promgateway.class=org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter \
  21. -Dmetrics.reporter.promgateway.host=192.168.28.112 \
  22. -Dmetrics.reporter.promgateway.port=9091 \
  23. -Dmetrics.reporter.promgateway.jobName=xxxx_creative_dim \
  24. -Dmetrics.reporter.promgateway.randomJobNameSuffix=true \
  25. -Dmetrics.reporter.promgateway.deleteOnShutdown=false \
  26. -Dmetrics.reporter.promgateway.interval='60 SECONDS' \
  27. -Dmetrics.reporter.promgateway.groupingKey="app_type=flink;k8s_env=ali_k8s;docker_env=xxxx_harbor;app_name=xxxx_creative_dim" \
  28. local:///opt/xxxx_realtime/usrlib/xxxx_realtime.jar

由flink-dist/src/main/flink-bin/bin/flink可知flink任务提交的入口在org.apache.flink.client.cli.CliFrontend。
CliFrontend初始化接收两个参数,本地配置文件配置和命令行参数配置。
根据命令行第一个人参数run-application选择action为ACTION_RUN_APPLICATION,根据—target加载ClusterClientFactory为KubernetesClusterClientFactory,KubernetesClusterClientFactory创建KubernetesClusterDescriptor。
KubernetesClusterDescriptor.deployApplicationCluster()方法中指定container入口为KubernetesApplicationClusterEntrypoint,并deploy k8s Deployment类型。

集群初始化

HA文章里从HA的角度简要介绍了DispatchRunner创建选举后,回调创建Dispatch的过程。
这里我们说明一下通过KubernetesApplicationClusterEntrypoint启动集群的过程。

集群组件的初始化主要涉及三个类,下面以dispatch为例,其他组件类名可能有差别,但大体上一致:

  • DispatcherRunner:作为LeaderContender,回调启动DispatcherLeaderProcess
  • DispatcherLeaderProcess:Dispatcher的封装,维护整个dispatch的生命周期
  • Dispatcher:真正提供Dispatch服务的RPCEndpoint

整个K8s Flink集群的入口在org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint#main方法。最终的核心入口在org.apache.flink.runtime.entrypoint.ClusterEntrypoint#runCluster方法。

  1. private void runCluster(Configuration configuration, PluginManager pluginManager)
  2. throws Exception {
  3. synchronized (lock) {
  4. //初始化Services,如HA
  5. initializeServices(configuration, pluginManager);
  1. //创建DispatcherResourceManagerComponentFactory
  2. final DispatcherResourceManagerComponentFactory
  3. dispatcherResourceManagerComponentFactory =
  4. createDispatcherResourceManagerComponentFactory(configuration);
  5. //启动clusterComponent,包含了Dispatcher、ResourceManager、WebMonitorEndpoint。
  6. clusterComponent =
  7. dispatcherResourceManagerComponentFactory.create(
  8. configuration,
  9. resourceId.unwrap(),
  10. ioExecutor,
  11. commonRpcService,
  12. haServices,
  13. blobServer,
  14. heartbeatServices,
  15. metricRegistry,
  16. executionGraphInfoStore,
  17. new RpcMetricQueryServiceRetriever(
  18. metricRegistry.getMetricQueryServiceRpcService()),
  19. this);
  20. ```
  21. }

}

//DefaultDispatcherResourceManagerComponentFactory包含了dispatcherRunnerFactory,resourceManagerFactory //dispatcherRunnerFactory在dispatcherResourceManagerComponentFactory.create中创建dispatcherRunner。 //dispatcherRunnerFactory包含了ApplicationDispatcherLeaderProcessFactoryFactory,用于创建dispatcherRunner时建DispatcherLeaderProcessFactory //DispatcherLeaderProcessFactory用于lead选举回调中创建DispatcherLeaderProcess,DispatcherLeaderProcess创建后start()过程中创建Dispatch。 protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(final Configuration configuration) { return new DefaultDispatcherResourceManagerComponentFactory(

  1. new DefaultDispatcherRunnerFactory(
  2. ApplicationDispatcherLeaderProcessFactoryFactory.create(
  3. //Application使用SessionDispatcherFactory.INSTANCE
  4. configuration, SessionDispatcherFactory.INSTANCE, program)),
  5. resourceManagerFactory,
  6. JobRestEndpointFactory.INSTANCE);

}

  1. > TODO:此处大量使用了[工厂模式](https://refactoringguru.cn/design-patterns/factory-method),可以参考[Apache Flink Code Style and Quality Guide](https://flink.apache.org/contributing/code-style-and-quality-preamble.html)和google的[Guide: Writing Testable Code](https://testing.googleblog.com/2008/11/guide-to-writing-testable-code.html)两篇文章体会一下。
  2. <a name="ptHuz"></a>
  3. # Application模式JobGraph提交
  4. 以上分析了Dispatch的创建流程,那么Dispatch创建后,application模式下任务是怎么提交给Dispatch的呢?<br />逻辑是在ApplicationDispatcherBootstrap实现的。
  5. 1. 首先分析一下DispatcherBootstrapFactory声明的调用链。
  6. ```java
  7. //org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory#createFactory
  8. public DispatcherLeaderProcessFactory createFactory(
  9. JobPersistenceComponentFactory jobPersistenceComponentFactory,
  10. Executor ioExecutor,
  11. RpcService rpcService,
  12. PartialDispatcherServices partialDispatcherServices,
  13. FatalErrorHandler fatalErrorHandler) {
  14. //创建DispatcherLeaderProcessFactory会先ApplicationDispatcherGatewayServiceFactory。
  15. final ApplicationDispatcherGatewayServiceFactory dispatcherServiceFactory =
  16. new ApplicationDispatcherGatewayServiceFactory(
  17. configuration,
  18. dispatcherFactory,
  19. program,
  20. rpcService,
  21. partialDispatcherServices);
  22. //Application模式最终使用的是SessionDispatcherLeaderProcessFactory
  23. return new SessionDispatcherLeaderProcessFactory(
  24. dispatcherServiceFactory,
  25. jobPersistenceComponentFactory,
  26. ioExecutor,
  27. fatalErrorHandler);
  28. }
  29. //SessionDispatcherLeaderProcess创建Dispathcer代理到ApplicationDispatcherGatewayServiceFactory
  30. //org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess#createDispatcher
  31. private void createDispatcher(
  32. Collection<JobGraph> jobGraphs, Collection<JobResult> recoveredDirtyJobResults) {
  33. final DispatcherGatewayService dispatcherService =
  34. dispatcherGatewayServiceFactory.create(
  35. DispatcherId.fromUuid(getLeaderSessionId()),
  36. jobGraphs,
  37. recoveredDirtyJobResults,
  38. jobGraphStore,
  39. jobResultStore);
  40. completeDispatcherSetup(dispatcherService);
  41. }
  42. //ApplicationDispatcherGatewayServiceFactory创建dispatch时会创建DispatcherBootstrapFactory,此处是lambda形式的ApplicationDispatcherBootstrap factory。
  43. //org.apache.flink.client.deployment.application.ApplicationDispatcherGatewayServiceFactory#create
  44. public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
  45. DispatcherId fencingToken,
  46. Collection<JobGraph> recoveredJobs,
  47. Collection<JobResult> recoveredDirtyJobResults,
  48. JobGraphWriter jobGraphWriter,
  49. JobResultStore jobResultStore) {
  50. final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);
  51. final Dispatcher dispatcher;
  52. try {
  53. dispatcher =
  54. dispatcherFactory.createDispatcher(
  55. rpcService,
  56. fencingToken,
  57. recoveredJobs,
  58. recoveredDirtyJobResults,
  59. (dispatcherGateway, scheduledExecutor, errorHandler) ->
  60. new ApplicationDispatcherBootstrap(
  61. application,
  62. recoveredJobIds,
  63. configuration,
  64. dispatcherGateway,
  65. scheduledExecutor,
  66. errorHandler),
  67. PartialDispatcherServicesWithJobPersistenceComponents.from(
  68. partialDispatcherServices, jobGraphWriter, jobResultStore));
  69. } catch (Exception e) {
  70. throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
  71. }
  72. //dispatcher endpoint启动
  73. dispatcher.start();
  74. return DefaultDispatcherGatewayService.from(dispatcher);
  75. }
  1. ApplicationDispatcherBootstrap factory的调度是在什么地方?

跟踪发下是在org.apache.flink.runtime.dispatcher.Dispatcher#onStart。

  1. @Override
  2. public void onStart() throws Exception {
  1. this.dispatcherBootstrap =
  2. this.dispatcherBootstrapFactory.create(
  3. getSelfGateway(DispatcherGateway.class),
  4. this.getRpcService().getScheduledExecutor(),
  5. this::onFatalError);

}

  1. Dispatch作为一个RPCEndPoint,启动在上一步的源码分析代码中`dispatcher.start();`提到。onStart方法的调度流程,可以回看一下[Flink RPC](https://www.yuque.com/u22594583/ydf48t/nfmqmy)这篇文档介绍rpc启动的逻辑。
  2. 3. ApplicationDispatcherBootstrap执行用户代码(任务转换JobGraph并提交)。
  3. 1. 共用了CLient模块的代码ClientUtils.executeProgram()去真正执行jar包里的代码。<br />ClientUtils.executeProgram()调用ContextEnvironmentStreamContextEnvironmentsetAsContext静态方法初始化ContextEnvironment类属性_contextEnvironmentFactory_。<br />我们在flink 任务编时 `StreamExecutionEnvironment env = StreamExecutionEnvironment._getExecutionEnvironment_();` ,<br />获取的StreamExecutionEnvironment 也就是StreamExecutionEnvironmentFactory#createExecutionEnvironment创建的。
  4. ```java
  5. //org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap#runApplicationEntryPoint
  6. private void runApplicationEntryPoint(
  7. final CompletableFuture<List<JobID>> jobIdsFuture,
  8. final Set<JobID> tolerateMissingResult,
  9. final DispatcherGateway dispatcherGateway,
  10. final ScheduledExecutor scheduledExecutor,
  11. final boolean enforceSingleJobExecution,
  12. final boolean submitFailedJobOnApplicationError) {
  13. ···
  14. final PipelineExecutorServiceLoader executorServiceLoader =
  15. new EmbeddedExecutorServiceLoader(
  16. applicationJobIds, dispatcherGateway, scheduledExecutor);
  17. ClientUtils.executeProgram(
  18. executorServiceLoader,
  19. configuration,
  20. application,
  21. enforceSingleJobExecution,
  22. true /* suppress sysout */);
  23. ···
  24. }
  25. //ClientUtils.executeProgram
  26. public static void executeProgram(
  27. PipelineExecutorServiceLoader executorServiceLoader,
  28. Configuration configuration,
  29. PackagedProgram program,
  30. boolean enforceSingleJobExecution,
  31. boolean suppressSysout)
  32. throws ProgramInvocationException {
  33. checkNotNull(executorServiceLoader);
  34. final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
  35. final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
  36. try {
  37. Thread.currentThread().setContextClassLoader(userCodeClassLoader);
  38. LOG.info(
  39. "Starting program (detached: {})",
  40. !configuration.getBoolean(DeploymentOptions.ATTACHED));
  41. ContextEnvironment.setAsContext(
  42. executorServiceLoader,
  43. configuration,
  44. userCodeClassLoader,
  45. enforceSingleJobExecution,
  46. suppressSysout);
  47. StreamContextEnvironment.setAsContext(
  48. executorServiceLoader,
  49. configuration,
  50. userCodeClassLoader,
  51. enforceSingleJobExecution,
  52. suppressSysout);
  53. try {
  54. program.invokeInteractiveModeForExecution();
  55. } finally {
  56. ContextEnvironment.unsetAsContext();
  57. StreamContextEnvironment.unsetAsContext();
  58. }
  59. } finally {
  60. Thread.currentThread().setContextClassLoader(contextClassLoader);
  61. }
  62. }
  1. 用户代码执行StreamExecutionEnvironment#execute(java.lang.String),转换transformations为StreamGraph
  2. 调用StreamExecutionEnvironment#executeAsync(org.apache.flink.streaming.api.graph.StreamGraph),通过EmbeddedExecutor执行StreamGraph

    1. public JobClient executeAsync(StreamGraph streamGraph) throws Exception {

    //根据setAsContext设置,此处getExecutor为EmbeddedExecutor#execute CompletableFuture jobClientFuture =

    1. executorFactory
    2. .getExecutor(configuration)
    3. .execute(streamGraph, configuration, userClassloader);
    1. }
  3. EmbeddedExecutor把streamGraph转换为jobGraph,并通过dispatcherGateway.submitJob提交任务到dispatch。 ```java

public CompletableFuture execute( final Pipeline pipeline, final Configuration configuration, ClassLoader userCodeClassloader) throws MalformedURLException { ··· return submitAndGetJobClientFuture(pipeline, configuration, userCodeClassloader); }

private CompletableFuture submitAndGetJobClientFuture( final Pipeline pipeline, final Configuration configuration, final ClassLoader userCodeClassloader) throws MalformedURLException { ··· final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); ··· final CompletableFuture jobSubmissionFuture = submitJob(configuration, dispatcherGateway, jobGraph, timeout); ··· } ```