

  1. public static void main(String[] args) throws Exception {
  2. // Checking input parameters
  3. final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
  4. // set up the execution environment
  5. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. // make parameters available in the web interface
  7. env.getConfig().setGlobalJobParameters(params);
  8. // get input data
  9. DataStream<String> text = null;
  10. if (params.has("input")) {
  11. // union all the inputs from text files
  12. for (String input : params.getMultiParameterRequired("input")) {
  13. if (text == null) {
  14. text = env.readTextFile(input);
  15. } else {
  16. text = text.union(env.readTextFile(input));
  17. }
  18. }
  19. Preconditions.checkNotNull(text, "Input DataStream should not be null.");
  20. } else {
  21. System.out.println("Executing WordCount example with default input data set.");
  22. System.out.println("Use --input to specify file input.");
  23. // get default test text data
  24. text = env.fromElements(WordCountData.WORDS);
  25. }
  26. DataStream<Tuple2<String, Integer>> counts =
  27. // split up the lines in pairs (2-tuples) containing: (word,1)
  28. text.flatMap(new Tokenizer())
  29. // group by the tuple field "0" and sum up tuple field "1"
  30. .keyBy(value -> value.f0).sum(1);
  31. // emit result
  32. if (params.has("output")) {
  33. counts.writeAsText(params.get("output"));
  34. } else {
  35. System.out.println("Printing result to stdout. Use --output to specify output path.");
  36. counts.print();
  37. }
  38. // execute program
  39. env.execute("Streaming WordCount");
  40. }



  1. final StreamExecutionEnvironment env =
  2. StreamExecutionEnvironment.getExecutionEnvironment();


  1. public static StreamExecutionEnvironment getExecutionEnvironment() {
  2. return getExecutionEnvironment(new Configuration());
  3. }
  4. // 进一步调用 getExecutionEnvironment(Configuration configuration)
  5. public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
  6. return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
  7. .map(factory -> factory.createExecutionEnvironment(configuration)) // 创建默认的执行环境
  8. .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration)); // 创建local执行环境
  9. }



  1. - CliFrontend.executeProgram
  2. - ClientUtils.executeProgram
  3. - StreamContextEnvironment.setAsContext
  4. - StreamExecutionEnvironment.initializeContextEnvironment




  1. // 无参方法
  2. public JobExecutionResult execute() throws Exception {
  3. // getJobName方法从配置文件pipeline.name配置项获取Job name
  4. // 如果没有配置,使用默认名称"Flink Streaming Job"
  5. return execute(getJobName());
  6. }
  7. // jobname执行方法
  8. public JobExecutionResult execute(String jobName) throws Exception {
  9. Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
  10. return execute(getStreamGraph(jobName));
  11. }




  1. @Override
  2. public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
  3. // 异步执行作业
  4. final JobClient jobClient = executeAsync(streamGraph);
  5. // 获取配置的作业监听器
  6. final List<JobListener> jobListeners = getJobListeners();
  7. try {
  8. // 获取作业执行结果,逐个通知作业监听器
  9. final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
  10. jobListeners.forEach(jobListener ->
  11. jobListener.onJobExecuted(jobExecutionResult, null));
  12. return jobExecutionResult;
  13. } catch (Throwable t) {
  14. jobListeners.forEach(jobListener ->
  15. jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)));
  16. ExceptionUtils.rethrowException(t);
  17. // never reached, only make javac happy
  18. return null;
  19. }
  20. }


  1. @Override
  2. public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
  3. return super.execute(streamGraph);
  4. }


  1. /**
  2. * Triggers the program execution. The environment will execute all parts of
  3. * the program that have resulted in a "sink" operation. Sink operations are
  4. * for example printing results or forwarding them to a message queue.
  5. *
  6. * @param streamGraph the stream graph representing the transformations
  7. * @return The result of the job execution, containing elapsed time and accumulators.
  8. * @throws Exception which occurs during job execution.
  9. */
  10. @Internal
  11. public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
  12. // 异步执行作业
  13. final JobClient jobClient = executeAsync(streamGraph);
  14. try {
  15. final JobExecutionResult jobExecutionResult;
  16. // 使用attached模式执行作业由于需要保持client端不关闭,所以这里同步等待作业执行结果
  17. if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
  18. jobExecutionResult = jobClient.getJobExecutionResult().get();
  19. } else {
  20. // // 异步模式则不需要
  21. jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
  22. }
  23. // 逐个通知jobListener
  24. jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
  25. return jobExecutionResult;
  26. } catch (Throwable t) {
  27. // get() on the JobExecutionResult Future will throw an ExecutionException. This
  28. // behaviour was largely not there in Flink versions before the PipelineExecutor
  29. // refactoring so we should strip that exception.
  30. Throwable strippedException = ExceptionUtils.stripExecutionException(t);
  31. jobListeners.forEach(jobListener -> {
  32. jobListener.onJobExecuted(null, strippedException);
  33. });
  34. ExceptionUtils.rethrowException(strippedException);
  35. // never reached, only make javac happy
  36. return null;
  37. }
  38. }


  1. @Override
  2. public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
  3. // 检查一个environment中不能调用多次execute或executeAsync
  4. validateAllowedExecution();
  5. // 调用父类的executeAsync方法
  6. // 和LocalStreamEnvironment中的executeAsync相同
  7. final JobClient jobClient = super.executeAsync(streamGraph);
  8. if (!suppressSysout) {
  9. System.out.println("Job has been submitted with JobID " + jobClient.getJobID());
  10. }
  11. return jobClient;
  12. }



  1. /**
  2. * Triggers the program execution asynchronously. The environment will execute all parts of
  3. * the program that have resulted in a "sink" operation. Sink operations are
  4. * for example printing results or forwarding them to a message queue.
  5. *
  6. * @param streamGraph the stream graph representing the transformations
  7. * @return A {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
  8. * @throws Exception which occurs during job execution.
  9. */
  10. @Internal
  11. public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
  12. // 检查streamGraph不能为null
  13. checkNotNull(streamGraph, "StreamGraph cannot be null.");
  14. // 检查部署目标配置不能为null
  15. // 部署目标即作业运行的模式,例如本地模式,远程模式,yarn模式或者是k8s模式
  16. checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
  17. // 基于configuration创建一个合适的执行器工厂类
  18. // 获取作业执行器工厂类
  19. final PipelineExecutorFactory executorFactory =
  20. executorServiceLoader.getExecutorFactory(configuration);
  21. checkNotNull(
  22. executorFactory,
  23. "Cannot find compatible factory for specified execution.target (=%s)",
  24. configuration.get(DeploymentOptions.TARGET));
  25. // 从执行器工厂获取执行器,运行包含用户作业的streamGraph
  26. CompletableFuture<JobClient> jobClientFuture = executorFactory
  27. .getExecutor(configuration) //根据configuration获取对应的执行期
  28. .execute(streamGraph, configuration, userClassloader); // 根据stramgraph, configuration, 类加载器 开始执行作业
  29. try {
  30. // 通知各个作业监听器作业已提交
  31. JobClient jobClient = jobClientFuture.get();
  32. jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
  33. return jobClient;
  34. } catch (ExecutionException executionException) {
  35. final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
  36. jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));
  37. throw new FlinkException(
  38. String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
  39. strippedException);
  40. }
  41. }



  1. public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
  2. checkNotNull(configuration);
  3. // 根据配置文件加载并实例化PipelineExecutorFactory的实现类
  4. final ServiceLoader<PipelineExecutorFactory> loader =
  5. ServiceLoader.load(PipelineExecutorFactory.class);
  6. final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
  7. final Iterator<PipelineExecutorFactory> factories = loader.iterator();
  8. while (factories.hasNext()) {
  9. try {
  10. // 遍历所有加载的factory
  11. // 只保留和配置文件兼容的factory
  12. final PipelineExecutorFactory factory = factories.next();
  13. if (factory != null && factory.isCompatibleWith(configuration)) {
  14. compatibleFactories.add(factory);
  15. }
  16. } catch (Throwable e) {
  17. if (e.getCause() instanceof NoClassDefFoundError) {
  18. LOG.info("Could not load factory due to missing dependencies.");
  19. } else {
  20. throw e;
  21. }
  22. }
  23. }
  24. // 如果兼容的factory有多个,打印错误信息
  25. if (compatibleFactories.size() > 1) {
  26. final String configStr =
  27. configuration.toMap().entrySet().stream()
  28. .map(e -> e.getKey() + "=" + e.getValue())
  29. .collect(Collectors.joining("\n"));
  30. throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
  31. }
  32. // 如果兼容的factory是0个,抛出异常信息
  33. if (compatibleFactories.isEmpty()) {
  34. throw new IllegalStateException("No ExecutorFactory found to execute the application.");
  35. }
  36. // 返回这个唯一的factory
  37. return compatibleFactories.get(0);
  38. }

在这个方法中使用了Java SPI机制,根据META-INF/services内的配置文件动态加载并实例化PipelineExecutorFactory的子类。





  1. org.apache.flink.client.deployment.executors.RemoteExecutorFactory
  2. org.apache.flink.client.deployment.executors.LocalExecutorFactory




  1. @Override
  2. public boolean isCompatibleWith(final Configuration configuration) {
  3. // 配置文件 中 execution.target=remote时;执行该方法
  4. return RemoteExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
  5. }



  1. @Override
  2. public boolean isCompatibleWith(final Configuration configuration) {
  3. // 配置文件 中 execution.target=local 时;执行该方法
  4. return LocalExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
  5. }



  1. org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
  2. org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory


  • YarnJobClusterExecutorFactory:要求execution.target为yarn-per-job
  • YarnSessionClusterExecutorFactory:要求execution.target为yarn-session


  1. org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutorFactory





  • pipeline:要执行的作业,指的是StreamGraph。
  • configuration:作业的配置。
  • userCodeClassloader:用户作业的类加载器。和Flink本身使用不同类加载器的原因是不同用户作业加载的class可能冲突,用户作业和Flink框架本身加载的class也可能冲突。为了避免这种冲突,用户作业采用不同的类加载器加载。



  1. public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration,
  2. ClassLoader userCodeClassloader) throws Exception {
  3. // 校验检查
  4. checkNotNull(pipeline);
  5. checkNotNull(configuration);
  6. //
  7. Configuration effectiveConfig = new Configuration();
  8. effectiveConfig.addAll(this.configuration);
  9. effectiveConfig.addAll(configuration);
  10. // we only support attached execution with the local executor.
  11. // 只支持ATTACHED模式运行
  12. checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
  13. // 将StreamGraph转换为JobGraph
  14. final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
  15. // 创建一个MiniCluster
  16. // 并调用MiniCluster的submitJob方法,提交作业
  17. return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
  18. .submitJob(jobGraph, userCodeClassloader);
  19. }




  1. /**
  2. * Starts a {@link MiniCluster} and submits a job.
  3. */
  4. public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
  5. // 获取MiniCluster的配置,指定最大并行度
  6. MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
  7. // 创建出一个MiniCluster
  8. MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
  9. // 启动miniCluster
  10. miniCluster.start();
  11. return miniCluster
  12. .submitJob(jobGraph)
  13. .thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
  14. org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
  15. () -> miniCluster.getJobStatus(submissionResult.getJobID()).get(),
  16. () -> miniCluster.requestJobResult(submissionResult.getJobID()).get(),
  17. userCodeClassloader);
  18. return submissionResult;
  19. }))
  20. .thenApply(result -> new MiniClusterJobClient(
  21. result.getJobID(),
  22. miniCluster,
  23. userCodeClassloader,
  24. MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER))
  25. .whenComplete((ignored, throwable) -> {
  26. if (throwable != null) {
  27. // We failed to create the JobClient and must shutdown to ensure cleanup.
  28. shutDownCluster(miniCluster);
  29. }
  30. })
  31. .thenApply(Function.identity());
  32. }



  1. @Override
  2. public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline,
  3. @Nonnull final Configuration configuration,
  4. @Nonnull final ClassLoader userCodeClassloader) throws Exception {
  5. // 和之前相同,仍然是生成JobGraph
  6. final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
  7. try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory
  8. .createClusterDescriptor(configuration)) {
  9. // 获取远程集群ID
  10. final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
  11. checkState(clusterID != null);
  12. final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor.retrieve(clusterID);
  13. // 创建出clusterClient,用户和远程集群通信,提交作业
  14. ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
  15. return clusterClient
  16. .submitJob(jobGraph) // 提交作业
  17. .thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
  18. ClientUtils.waitUntilJobInitializationFinished(
  19. () -> clusterClient.getJobStatus(jobId).get(),
  20. () -> clusterClient.requestJobResult(jobId).get(),
  21. userCodeClassloader);
  22. return jobId;
  23. }))
  24. .thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(
  25. clusterClientProvider,
  26. jobID,
  27. userCodeClassloader))
  28. .whenComplete((ignored1, ignored2) -> clusterClient.close());
  29. }
  30. }


RestClusterClient通过http rest请求和远程集群通信。它的submitJob方法如下所示:

  1. @Override
  2. public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
  3. CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
  4. try {
  5. // 创建临时文件
  6. final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
  7. // 写到输出对象流中
  8. try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
  9. objectOut.writeObject(jobGraph);
  10. }
  11. // 返回文件路径
  12. return jobGraphFile;
  13. } catch (IOException e) {
  14. throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
  15. }
  16. }, executorService);
  17. // 在JobGraph写入文件完成之后执行
  18. CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
  19. // jar文件名列表
  20. List<String> jarFileNames = new ArrayList<>(8);
  21. // 缓存的分布式文件列表
  22. List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
  23. // 需要上传的文件列表
  24. Collection<FileUpload> filesToUpload = new ArrayList<>(8);
  25. //首先加载以及序列化的jobgraph文件
  26. filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
  27. // 从JobGraph中获取用户的jar文件路径,加入到上传列表
  28. for (Path jar : jobGraph.getUserJars()) {
  29. jarFileNames.add(jar.getName());
  30. filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
  31. }
  32. for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) {
  33. final Path artifactFilePath = new Path(artifacts.getValue().filePath);
  34. try {
  35. // Only local artifacts need to be uploaded.
  36. // 上传用户作业运行所需的其它类型文件
  37. // 只需要上传本地储存的文件
  38. if (!artifactFilePath.getFileSystem().isDistributedFS()) {
  39. artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), artifactFilePath.getName()));
  40. filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
  41. }
  42. } catch (IOException e) {
  43. throw new CompletionException(
  44. new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", e));
  45. }
  46. }
  47. // 创建作业提交请求体
  48. final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
  49. jobGraphFile.getFileName().toString(),
  50. jarFileNames,
  51. artifactFileNames);
  52. // 返回请求体和需要上传的文件
  53. return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
  54. });
  55. // 请求构建完毕后提交请求
  56. final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
  57. requestAndFileUploads -> sendRetriableRequest(
  58. JobSubmitHeaders.getInstance(),
  59. EmptyMessageParameters.getInstance(),
  60. requestAndFileUploads.f0,
  61. requestAndFileUploads.f1,
  62. isConnectionProblemOrServiceUnavailable())
  63. );
  64. // 请求发送完毕之后,删除JobGraph临时文件
  65. submissionFuture
  66. .thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)
  67. .thenAccept(jobGraphFile -> {
  68. try {
  69. Files.delete(jobGraphFile);
  70. } catch (IOException e) {
  71. LOG.warn("Could not delete temporary file {}.", jobGraphFile, e);
  72. }
  73. });
  74. return submissionFuture
  75. .thenApply(ignore -> jobGraph.getJobID())
  76. .exceptionally(
  77. (Throwable throwable) -> {
  78. throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(),
  79. "Failed to submit JobGraph.", ExceptionUtils.stripCompletionException(throwable)));
  80. });
  81. }






  1. /**
  2. * Starts the mini cluster, based on the configured properties.
  3. *
  4. * @throws Exception This method passes on any exception that occurs during the startup of
  5. * the mini cluster.
  6. */
  7. public void start() throws Exception {
  8. synchronized (lock) {
  9. checkState(!running, "MiniCluster is already running");
  10. LOG.info("Starting Flink Mini Cluster");
  11. LOG.debug("Using configuration {}", miniClusterConfiguration);
  12. final Configuration configuration = miniClusterConfiguration.getConfiguration();
  13. // MiniCluster中的组件使用同一个共享的RPC服务
  14. final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
  15. try {
  16. // 初始化IO相关配置,是否复写和是否创建output directory等
  17. initializeIOFormatClasses(configuration);
  18. LOG.info("Starting Metrics Registry");
  19. // 创建监控相关配置
  20. metricRegistry = createMetricRegistry(configuration);
  21. // bring up all the RPC services
  22. LOG.info("Starting RPC Service(s)");
  23. final RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory;
  24. final RpcService metricQueryServiceRpcService;
  25. // 使用共享rpc服务
  26. if (useSingleRpcService) {
  27. // we always need the 'commonRpcService' for auxiliary calls
  28. // 创建本地RPC服务
  29. commonRpcService = createLocalRpcService(configuration);
  30. // 创建通用RPC服务工厂
  31. final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);
  32. // TaskManager RPC服务工厂使用RPC服务工厂
  33. taskManagerRpcServiceFactory = commonRpcServiceFactory;
  34. dispatcherResourceManagerComponentRpcServiceFactory = commonRpcServiceFactory;
  35. // 启动RPC查询服务
  36. metricQueryServiceRpcService = MetricUtils.startLocalMetricsRpcService(configuration);
  37. } else {
  38. // start a new service per component, possibly with custom bind addresses
  39. // 如果不共用RPC服务,获取JobManager和TaskManager地址和端口范围
  40. final String jobManagerExternalAddress = miniClusterConfiguration.getJobManagerExternalAddress();
  41. final String taskManagerExternalAddress = miniClusterConfiguration.getTaskManagerExternalAddress();
  42. final String jobManagerExternalPortRange = miniClusterConfiguration.getJobManagerExternalPortRange();
  43. final String taskManagerExternalPortRange = miniClusterConfiguration.getTaskManagerExternalPortRange();
  44. final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
  45. final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
  46. // 分别创建各个组件的factory和服务等
  47. dispatcherResourceManagerComponentRpcServiceFactory =
  48. new DedicatedRpcServiceFactory(
  49. configuration,
  50. jobManagerExternalAddress,
  51. jobManagerExternalPortRange,
  52. jobManagerBindAddress);
  53. taskManagerRpcServiceFactory =
  54. new DedicatedRpcServiceFactory(
  55. configuration,
  56. taskManagerExternalAddress,
  57. taskManagerExternalPortRange,
  58. taskManagerBindAddress);
  59. // we always need the 'commonRpcService' for auxiliary calls
  60. // bind to the JobManager address with port 0
  61. commonRpcService = createRemoteRpcService(configuration, jobManagerBindAddress, 0);
  62. metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(
  63. configuration,
  64. commonRpcService.getAddress());
  65. }
  66. // 启动监控查询服务
  67. metricRegistry.startQueryService(metricQueryServiceRpcService, null);
  68. // 创建进程监控指标组
  69. processMetricGroup = MetricUtils.instantiateProcessMetricGroup(
  70. metricRegistry,
  71. RpcUtils.getHostname(commonRpcService),
  72. ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
  73. // 创建IO线程池
  74. ioExecutor = Executors.newFixedThreadPool(
  75. ClusterEntrypointUtils.getPoolSize(configuration),
  76. new ExecutorThreadFactory("mini-cluster-io"));
  77. // 创建高可用服务
  78. haServices = createHighAvailabilityServices(configuration, ioExecutor);
  79. // 启动blobServer
  80. blobServer = new BlobServer(configuration, haServices.createBlobStore());
  81. blobServer.start();
  82. // 创建心跳服务
  83. heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
  84. // 创建blob缓存服务
  85. blobCacheService = new BlobCacheService(
  86. configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
  87. );
  88. // 启动TaskManager
  89. startTaskManagers();
  90. // 创建监控查询获取服务
  91. MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());
  92. // 创建Dispatcher和ResourceManager,它们在同一个进程中运行
  93. setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagerComponentRpcServiceFactory, metricQueryServiceRetriever);
  94. // 创建ResourceManager leader获取服务
  95. resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
  96. dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
  97. clusterRestEndpointLeaderRetrievalService = haServices.getClusterRestEndpointLeaderRetriever();
  98. // 创建Dispatcher gateway获取服务
  99. dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
  100. commonRpcService,
  101. DispatcherGateway.class,
  102. DispatcherId::fromUuid,
  103. new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
  104. resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
  105. commonRpcService,
  106. ResourceManagerGateway.class,
  107. ResourceManagerId::fromUuid,
  108. new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
  109. // 创建WebMonitor leader获取服务
  110. webMonitorLeaderRetriever = new LeaderRetriever();
  111. // 分别启动这些服务
  112. resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
  113. dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
  114. clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);
  115. }
  116. catch (Exception e) {
  117. // cleanup everything
  118. try {
  119. close();
  120. } catch (Exception ee) {
  121. e.addSuppressed(ee);
  122. }
  123. throw e;
  124. }
  125. // create a new termination future
  126. terminationFuture = new CompletableFuture<>();
  127. // now officially mark this as running
  128. running = true;
  129. LOG.info("Flink Mini Cluster started successfully");
  130. }
  131. }


  1. @GuardedBy("lock")
  2. private void startTaskManagers() throws Exception {
  3. final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
  4. // 启动task manager的个数
  5. LOG.info("Starting {} TaskManger(s)", numTaskManagers);
  6. for (int i = 0; i < numTaskManagers; i++) {
  7. startTaskExecutor();
  8. }
  9. }


  1. @VisibleForTesting
  2. void startTaskExecutor() throws Exception {
  3. synchronized (lock) {
  4. final Configuration configuration = miniClusterConfiguration.getConfiguration();
  5. // 创建TaskExecutor
  6. final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
  7. configuration,
  8. new ResourceID(UUID.randomUUID().toString()),
  9. taskManagerRpcServiceFactory.createRpcService(),
  10. haServices,
  11. heartbeatServices,
  12. metricRegistry,
  13. blobCacheService,
  14. useLocalCommunication(),
  15. ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
  16. taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
  17. // 启动TaskExecutor
  18. taskExecutor.start();
  19. taskManagers.add(taskExecutor);
  20. }
  21. }






  1. @Override
  2. public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
  3. // 调用Dispatcher的submitJob方法
  4. final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = super.submitJob(jobGraph, timeout);
  5. acknowledgeCompletableFuture.whenComplete(
  6. (Acknowledge ignored, Throwable throwable) -> {
  7. if (throwable != null) {
  8. onFatalError(new FlinkException(
  9. "Failed to submit job " + jobGraph.getJobID() + " in job mode.",
  10. throwable));
  11. }
  12. });
  13. return acknowledgeCompletableFuture;
  14. }


  1. @Override
  2. public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
  3. log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
  4. try {
  5. // 检查作业是否重复
  6. if (isDuplicateJob(jobGraph.getJobID())) {
  7. return FutureUtils.completedExceptionally(
  8. new DuplicateJobSubmissionException(jobGraph.getJobID()));
  9. // 检查部分资源是否已配置
  10. } else if (isPartialResourceConfigured(jobGraph)) {
  11. return FutureUtils.completedExceptionally(
  12. new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
  13. "resources configured. The limitation will be removed in future versions."));
  14. } else {
  15. // 内部提交作业
  16. return internalSubmitJob(jobGraph);
  17. }
  18. } catch (FlinkException e) {
  19. return FutureUtils.completedExceptionally(e);
  20. }
  21. }


  1. private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
  2. log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
  3. // 调用persistAndRunJob方法
  4. final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
  5. this::persistAndRunJob)
  6. .thenApply(ignored -> Acknowledge.get());
  7. return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
  8. if (throwable != null) {
  9. cleanUpJobData(jobGraph.getJobID(), true);
  10. ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
  11. final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
  12. log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
  13. throw new CompletionException(
  14. new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
  15. } else {
  16. return acknowledge;
  17. }
  18. }, ioExecutor);
  19. }


  1. private void persistAndRunJob(JobGraph jobGraph) throws Exception {
  2. // 存储JobGraph
  3. jobGraphWriter.putJobGraph(jobGraph);
  4. // // 调用runJob方法
  5. runJob(jobGraph, ExecutionType.SUBMISSION);
  6. }


  1. private void runJob(JobGraph jobGraph, ExecutionType executionType) {
  2. Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
  3. // 作业时间戳
  4. long initializationTimestamp = System.currentTimeMillis();
  5. // 调用启动JobManager逻辑
  6. CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);
  7. DispatcherJob dispatcherJob = DispatcherJob.createFor(
  8. jobManagerRunnerFuture,
  9. jobGraph.getJobID(),
  10. jobGraph.getName(),
  11. initializationTimestamp);
  12. // 将当前Job存入runningJobs集合
  13. runningJobs.put(jobGraph.getJobID(), dispatcherJob);
  14. final JobID jobId = jobGraph.getJobID();
  15. // 处理Job提交结果,进行cleanup操作
  16. final CompletableFuture<CleanupJobState> cleanupJobStateFuture = dispatcherJob.getResultFuture().handleAsync(
  17. (dispatcherJobResult, throwable) -> {
  18. Preconditions.checkState(runningJobs.get(jobId) == dispatcherJob, "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
  19. if (dispatcherJobResult != null) {
  20. return handleDispatcherJobResult(jobId, dispatcherJobResult, executionType);
  21. } else {
  22. return dispatcherJobFailed(jobId, throwable);
  23. }
  24. }, getMainThreadExecutor());
  25. // 在cleanup完成之后终止作业
  26. final CompletableFuture<Void> jobTerminationFuture = cleanupJobStateFuture
  27. .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
  28. .thenCompose(Function.identity());
  29. FutureUtils.assertNoException(jobTerminationFuture);
  30. registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
  31. }



  1. CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
  2. // 获取RPC服务
  3. final RpcService rpcService = getRpcService();
  4. return CompletableFuture.supplyAsync(
  5. () -> {
  6. try {
  7. // 创建出一个JobManager启动器
  8. // 传入JobGraph和其他参数
  9. JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
  10. jobGraph,
  11. configuration,
  12. rpcService,
  13. highAvailabilityServices,
  14. heartbeatServices,
  15. jobManagerSharedServices,
  16. new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
  17. fatalErrorHandler,
  18. initializationTimestamp);
  19. // 启动JobManager启动器
  20. runner.start();
  21. return runner;
  22. } catch (Exception e) {
  23. throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
  24. }
  25. },
  26. ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
  27. }





  1. @Override
  2. public JobManagerRunner createJobManagerRunner(
  3. JobGraph jobGraph,
  4. Configuration configuration,
  5. RpcService rpcService,
  6. HighAvailabilityServices highAvailabilityServices,
  7. HeartbeatServices heartbeatServices,
  8. JobManagerSharedServices jobManagerServices,
  9. JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
  10. FatalErrorHandler fatalErrorHandler,
  11. long initializationTimestamp) throws Exception {
  12. // // 创建JobManager的配置
  13. final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
  14. // 创建SlotPool服务和Scheduler的工厂类
  15. // 该工厂类用于创建SlotPoolService和SchedulerNG
  16. final SlotPoolFactory slotPoolFactory = SlotPoolFactory.fromConfiguration(configuration);
  17. final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
  18. final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);
  19. // 创建JobMaster服务工厂
  20. // JobMaster服务用于获取JobManaster所在地址,获取和JobMaster通信的Gateway
  21. final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
  22. jobMasterConfiguration,
  23. slotPoolFactory,
  24. rpcService,
  25. highAvailabilityServices,
  26. jobManagerServices,
  27. heartbeatServices,
  28. jobManagerJobMetricGroupFactory,
  29. fatalErrorHandler,
  30. schedulerNGFactory,
  31. shuffleMaster);
  32. // 创建JobManagerRunnerImpl
  33. return new JobManagerRunnerImpl(
  34. jobGraph,
  35. jobMasterFactory,
  36. highAvailabilityServices,
  37. jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
  38. jobManagerServices.getScheduledExecutorService(),
  39. fatalErrorHandler,
  40. initializationTimestamp);
  41. }




  1. @Override
  2. public void start() throws Exception {
  3. try {
  4. leaderElectionService.start(this);
  5. } catch (Exception e) {
  6. log.error("Could not start the JobManager because the leader election service did not start.", e);
  7. throw new Exception("Could not start the leader election service.", e);
  8. }
  9. }



  1. @Override
  2. public void grantLeadership(final UUID leaderSessionID) {
  3. synchronized (lock) {
  4. if (shutdown) {
  5. log.debug("JobManagerRunner cannot be granted leadership because it is already shut down.");
  6. return;
  7. }
  8. leadershipOperation = leadershipOperation.thenCompose(
  9. (ignored) -> {
  10. synchronized (lock) {
  11. return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
  12. }
  13. });
  14. handleException(leadershipOperation, "Could not start the job manager.");
  15. }
  16. }


  1. private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
  2. // 检查作业调度状态
  3. final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
  4. return jobSchedulingStatusFuture.thenCompose(
  5. jobSchedulingStatus -> {
  6. // 如果作业已经执行完毕,调用执行完毕逻辑
  7. if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
  8. return jobAlreadyDone();
  9. } else {
  10. // 否则,启动JobMaster
  11. return startJobMaster(leaderSessionId);
  12. }
  13. });
  14. }


  1. private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
  2. log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
  3. jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());
  4. try {
  5. // 先设置作业的状态为正在运行
  6. runningJobsRegistry.setJobRunning(jobGraph.getJobID());
  7. } catch (IOException e) {
  8. return FutureUtils.completedExceptionally(
  9. new FlinkException(
  10. String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
  11. e));
  12. }
  13. final CompletableFuture<Acknowledge> startFuture;
  14. try {
  15. // 然后启动JobMaster服务
  16. startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
  17. } catch (Exception e) {
  18. return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
  19. }
  20. final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
  21. return startFuture.thenAcceptAsync(
  22. (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
  23. leaderSessionId,
  24. jobMasterService.getAddress(),
  25. currentLeaderGatewayFuture),
  26. executor);
  27. }




  1. public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
  2. // make sure we receive RPC and async calls
  3. start();
  4. return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
  5. }



  1. private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
  2. // 检查是否在主线程执行
  3. validateRunsInMainThread();
  4. checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
  5. if (Objects.equals(getFencingToken(), newJobMasterId)) {
  6. log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
  7. return Acknowledge.get();
  8. }
  9. setNewFencingToken(newJobMasterId);
  10. startJobMasterServices();
  11. log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
  12. resetAndStartScheduler();
  13. return Acknowledge.get();
  14. }

startJobMasterServices方法创建出TaskManager心跳管理器,启动SlotPoolService和建立起ResourceManager leader的连接(ResourceManager也有leader选举过程)。

  1. private void startJobMasterServices() throws Exception {
  2. startHeartbeatServices();
  3. // start the slot pool make sure the slot pool now accepts messages for this leader
  4. slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
  5. //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
  6. // try to reconnect to previously known leader
  7. reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
  8. // job is ready to go, try to establish connection with resource manager
  9. // - activate leader retrieval for the resource manager
  10. // - on notification of the leader, the connection will be established and
  11. // the slot pool will start requesting slots
  12. resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
  13. }


  1. private void resetAndStartScheduler() throws Exception {
  2. validateRunsInMainThread();
  3. final CompletableFuture<Void> schedulerAssignedFuture;
  4. if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
  5. schedulerAssignedFuture = CompletableFuture.completedFuture(null);
  6. schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
  7. } else {
  8. suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
  9. final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
  10. final SchedulerNG newScheduler = createScheduler(executionDeploymentTracker, newJobManagerJobMetricGroup);
  11. schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle(
  12. (ignored, throwable) -> {
  13. newScheduler.setMainThreadExecutor(getMainThreadExecutor());
  14. assignScheduler(newScheduler, newJobManagerJobMetricGroup);
  15. return null;
  16. }
  17. );
  18. }
  19. FutureUtils.assertNoException(schedulerAssignedFuture.thenRun(this::startScheduling));
  20. }


  1. private void startScheduling() {
  2. checkState(jobStatusListener == null);
  3. // register self as job status change listener
  4. jobStatusListener = new JobManagerJobStatusListener();
  5. schedulerNG.registerJobStatusListener(jobStatusListener);
  6. schedulerNG.startScheduling();
  7. }




  1. @Override
  2. public final void startScheduling() {
  3. mainThreadExecutor.assertRunningInMainThread();
  4. registerJobMetrics();
  5. startAllOperatorCoordinators();
  6. startSchedulingInternal();
  7. }


  1. @Override
  2. protected void startSchedulingInternal() {
  3. log.info("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());
  4. // 设置ExecutionGraph的JobStatus状态为Running
  5. prepareExecutionGraphForNgScheduling();
  6. // 执行调度策略的startScheduling方法
  7. schedulingStrategy.startScheduling();
  8. }


  1. @Override
  2. public void startScheduling() {
  3. final Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils
  4. .toStream(schedulingTopology.getAllPipelinedRegions())
  5. .filter(region -> !region.getConsumedResults().iterator().hasNext())
  6. .collect(Collectors.toSet());
  7. maybeScheduleRegions(sourceRegions);
  8. }

此方法先创建出sourceRegions集合。获取所有的Pipeline类型的Region。然后过滤掉其中不包含consumed result的region(region从上游region接收的数据称为consumed result,为下游region输出的数据称为produced result),即最后剩下的是pipelined region的源头节点。


  1. private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
  2. final List<SchedulingPipelinedRegion> regionsSorted =
  3. SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regions);
  4. for (SchedulingPipelinedRegion region : regionsSorted) {
  5. maybeScheduleRegion(region);
  6. }
  7. }


  1. private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
  2. // 如果region中有一个consumed result状态不是CONSUMABLE(数据可以被消费),返回
  3. if (!areRegionInputsAllConsumable(region)) {
  4. return;
  5. }
  6. // 检查region中所有的节点必须为已创建状态
  7. checkState(areRegionVerticesAllInCreatedState(region), "BUG: trying to schedule a region which is not in CREATED state");
  8. // 创建各个Execution节点和部署选项
  9. final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
  10. SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
  11. regionVerticesSorted.get(region),
  12. id -> deploymentOption);
  13. // 开始为执行节点分配资源(slot)和部署
  14. schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
  15. }


  1. @Override
  2. public void allocateSlotsAndDeploy(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
  3. // 检查每个ExecutionVertex的状态必须为ExecutionState.CREATED
  4. validateDeploymentOptions(executionVertexDeploymentOptions);
  5. // 将ExecutionVertexID提取成key
  6. final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
  7. groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
  8. // 提取出所有的ExecutionVertexID
  9. final List<ExecutionVertexID> verticesToDeploy = executionVertexDeploymentOptions.stream()
  10. .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
  11. .collect(Collectors.toList());
  12. // 创建每个ExecutionVertex的版本信息,默认值为1L
  13. final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
  14. executionVertexVersioner.recordVertexModifications(verticesToDeploy);
  15. // 将每个ExecutionVertex的状态切换为ExecutionState.SCHEDULED
  16. transitionToScheduled(verticesToDeploy);
  17. // 为所有vertex分配slot,即执行所需的资源
  18. final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
  19. allocateSlots(executionVertexDeploymentOptions);
  20. // 创建vertex部署相关信息
  21. // 即ExecutionVertexVersion,ExecutionVertexDeploymentOption和SlotExecutionVertexAssignment的包装类
  22. final List<DeploymentHandle> deploymentHandles = createDeploymentHandles(
  23. requiredVersionByVertex,
  24. deploymentOptionsByVertex,
  25. slotExecutionVertexAssignments);
  26. // 等待所有节点分配资源和部署完毕
  27. waitForAllSlotsAndDeploy(deploymentHandles);
  28. }


  1. private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {
  2. FutureUtils.assertNoException(
  3. assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
  4. }


  1. private BiFunction<Void, Throwable, Void> deployAll(final List<DeploymentHandle> deploymentHandles) {
  2. return (ignored, throwable) -> {
  3. propagateIfNonNull(throwable);
  4. for (final DeploymentHandle deploymentHandle : deploymentHandles) {
  5. final SlotExecutionVertexAssignment slotExecutionVertexAssignment = deploymentHandle.getSlotExecutionVertexAssignment();
  6. final CompletableFuture<LogicalSlot> slotAssigned = slotExecutionVertexAssignment.getLogicalSlotFuture();
  7. // 确保slot分配已经完成
  8. checkState(slotAssigned.isDone());
  9. // 在slot分配完成后执行deployOrHandleError方法
  10. FutureUtils.assertNoException(
  11. slotAssigned.handle(deployOrHandleError(deploymentHandle)));
  12. }
  13. return null;
  14. };
  15. }


  1. private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {
  2. final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
  3. final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
  4. return (ignored, throwable) -> {
  5. // 检查ExecutionVertex的版本是否已经修改
  6. // 如果已经修改,说明该vertex被其他deployment部署,终止本次部署操作
  7. if (executionVertexVersioner.isModified(requiredVertexVersion)) {
  8. log.debug("Refusing to deploy execution vertex {} because this deployment was " +
  9. "superseded by another deployment", executionVertexId);
  10. return null;
  11. }
  12. if (throwable == null) {
  13. // 部署ExecutionVertex
  14. deployTaskSafe(executionVertexId);
  15. } else {
  16. handleTaskDeploymentFailure(executionVertexId, throwable);
  17. }
  18. return null;
  19. };
  20. }


  1. private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
  2. try {
  3. final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
  4. // 在获取ExecutionVertex之后部署节点
  5. executionVertexOperations.deploy(executionVertex);
  6. } catch (Throwable e) {
  7. handleTaskDeploymentFailure(executionVertexId, e);
  8. }
  9. }


  1. @Override
  2. public void deploy(final ExecutionVertex executionVertex) throws JobException {
  3. executionVertex.deploy();
  4. }




  1. public void deploy() throws JobException {
  2. currentExecution.deploy();
  3. }



  1. /**
  2. * Deploys the execution to the previously assigned resource.
  3. *
  4. * @throws JobException if the execution cannot be deployed to the assigned resource
  5. */
  6. public void deploy() throws JobException {
  7. assertRunningInJobMasterMainThread();
  8. final LogicalSlot slot = assignedResource;
  9. checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
  10. // Check if the TaskManager died in the meantime
  11. // This only speeds up the response to TaskManagers failing concurrently to deployments.
  12. // The more general check is the rpcTimeout of the deployment call
  13. // 资源必须是可用状态
  14. if (!slot.isAlive()) {
  15. throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
  16. }
  17. // make sure exactly one deployment call happens from the correct state
  18. // note: the transition from CREATED to DEPLOYING is for testing purposes only
  19. ExecutionState previous = this.state;
  21. if (previous == SCHEDULED || previous == CREATED) {
  22. if (!transitionState(previous, DEPLOYING)) {
  23. // race condition, someone else beat us to the deploying call.
  24. // this should actually not happen and indicates a race somewhere else
  25. throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
  26. }
  27. }
  28. else {
  29. // vertex may have been cancelled, or it was already scheduled
  30. throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
  31. }
  32. // 检查slot是否分配给了当前这个Execution
  33. if (this != slot.getPayload()) {
  34. throw new IllegalStateException(
  35. String.format("The execution %s has not been assigned to the assigned slot.", this));
  36. }
  37. try {
  38. // race double check, did we fail/cancel and do we need to release the slot?
  39. // 再次检查状态是否是部署中(DEPLOYING)
  40. if (this.state != DEPLOYING) {
  41. slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));
  42. return;
  43. }
  44. LOG.info("Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}", vertex.getTaskNameWithSubtaskIndex(),
  45. attemptNumber, vertex.getCurrentExecutionAttempt().getAttemptId(), getAssignedResourceLocation(), slot.getAllocationId());
  46. if (taskRestore != null) {
  47. checkState(taskRestore.getTaskStateSnapshot().getSubtaskStateMappings().stream().allMatch(entry ->
  48. entry.getValue().getInputRescalingDescriptor().equals(InflightDataRescalingDescriptor.NO_RESCALE) &&
  49. entry.getValue().getOutputRescalingDescriptor().equals(InflightDataRescalingDescriptor.NO_RESCALE)),
  50. "Rescaling from unaligned checkpoint is not yet supported.");
  51. }
  52. // 创建Task部署描述符,用来创建Task
  53. final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory
  54. .fromExecutionVertex(vertex, attemptNumber)
  55. .createDeploymentDescriptor(
  56. slot.getAllocationId(),
  57. slot.getPhysicalSlotNumber(),
  58. taskRestore,
  59. producedPartitions.values());
  60. // null taskRestore to let it be GC'ed
  61. taskRestore = null;
  62. // 获取提供资源的TaskManagerGateway
  63. // 用来和TaskManager通信
  64. final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
  65. final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
  66. vertex.getExecutionGraph().getJobMasterMainThreadExecutor();
  67. getVertex().notifyPendingDeployment(this);
  68. // We run the submission in the future executor so that the serialization of large TDDs does not block
  69. // the main thread and sync back to the main thread once submission is completed.
  70. // RPC调用,告诉TaskManager创建一个Task,执行当前Execution
  71. CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
  72. .thenCompose(Function.identity())
  73. .whenCompleteAsync(
  74. (ack, failure) -> {
  75. if (failure == null) {
  76. vertex.notifyCompletedDeployment(this);
  77. } else {
  78. if (failure instanceof TimeoutException) {
  79. String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
  80. markFailed(new Exception(
  81. "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
  82. + ") not responding after a rpcTimeout of " + rpcTimeout, failure));
  83. } else {
  84. markFailed(failure);
  85. }
  86. }
  87. },
  88. jobMasterMainThreadExecutor);
  89. }
  90. catch (Throwable t) {
  91. markFailed(t);
  92. if (isLegacyScheduling()) {
  93. ExceptionUtils.rethrow(t);
  94. }
  95. }
  96. }
