前言

本篇我们从典型Flink用户代码入口开始分析整个Flink作业的执行流程。
我们以wordcount为例:

  1. public static void main(String[] args) throws Exception {
  2. // Checking input parameters
  3. final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
  4. // set up the execution environment
  5. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6. // make parameters available in the web interface
  7. env.getConfig().setGlobalJobParameters(params);
  8. // get input data
  9. DataStream<String> text = null;
  10. if (params.has("input")) {
  11. // union all the inputs from text files
  12. for (String input : params.getMultiParameterRequired("input")) {
  13. if (text == null) {
  14. text = env.readTextFile(input);
  15. } else {
  16. text = text.union(env.readTextFile(input));
  17. }
  18. }
  19. Preconditions.checkNotNull(text, "Input DataStream should not be null.");
  20. } else {
  21. System.out.println("Executing WordCount example with default input data set.");
  22. System.out.println("Use --input to specify file input.");
  23. // get default test text data
  24. text = env.fromElements(WordCountData.WORDS);
  25. }
  26. DataStream<Tuple2<String, Integer>> counts =
  27. // split up the lines in pairs (2-tuples) containing: (word,1)
  28. text.flatMap(new Tokenizer())
  29. // group by the tuple field "0" and sum up tuple field "1"
  30. .keyBy(value -> value.f0).sum(1);
  31. // emit result
  32. if (params.has("output")) {
  33. counts.writeAsText(params.get("output"));
  34. } else {
  35. System.out.println("Printing result to stdout. Use --output to specify output path.");
  36. counts.print();
  37. }
  38. // execute program
  39. env.execute("Streaming WordCount");
  40. }

创建ExecutionEnvironment

Flink用户代码执行的第一步通常为获取执行的environment。

  1. final StreamExecutionEnvironment env =
  2. StreamExecutionEnvironment.getExecutionEnvironment();

那我们开始从getExecutionEnvironment方法开始分析

  1. public static StreamExecutionEnvironment getExecutionEnvironment() {
  2. return getExecutionEnvironment(new Configuration());
  3. }
  4. // 进一步调用 getExecutionEnvironment(Configuration configuration)
  5. public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
  6. return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
  7. .map(factory -> factory.createExecutionEnvironment(configuration)) // 创建默认的执行环境
  8. .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration)); // 创建local执行环境
  9. }

其中Util.resolveFactory方法接收两个参数,第一个是位于ThreadLocal中的factory,另一个是静态Factory。如果Threadlocal中存在一个factory,返回这个factory,否则返回静态factory。
如果从这两个factory中都无法创建出ExecutionEnvironment,则调用StreamExecutionEnvironment.createLocalEnvironment(configuration)方法。会创建出一个LocalStreamEnvironment,意味着使用本地运行模式,所有的任务在同一个JVM中运行。

如果用户使用命令行方式将Flink作业提交到集群,可参考03-flink源码任务提交流程分析中,程序会执行如下逻辑:

  1. - CliFrontend.executeProgram
  2. - ClientUtils.executeProgram
  3. - StreamContextEnvironment.setAsContext
  4. - StreamExecutionEnvironment.initializeContextEnvironment

通过这些调用,StreamExecutionEnvironmentFactory会被创建出来并设置到StreamExecutionEnvironment的threadLocalContextEnvironmentFactory和contextEnvironmentFactory中。调用这个factory的createExecutionEnvironment方法创建出一个StreamContextEnvironment对象。这是将作业提交到远程Flink分布式集群的运行环境。

execute方法执行作业

以上完成执行环境的创建,接下来分析execute方法启动Flink作业,代码如下:

  1. // 无参方法
  2. public JobExecutionResult execute() throws Exception {
  3. // getJobName方法从配置文件pipeline.name配置项获取Job name
  4. // 如果没有配置,使用默认名称"Flink Streaming Job"
  5. return execute(getJobName());
  6. }
  7. // jobname执行方法
  8. public JobExecutionResult execute(String jobName) throws Exception {
  9. Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
  10. return execute(getStreamGraph(jobName));
  11. }

execute作业之前,有一个getStreamGraph方法,将作业转换为StreamGraph。续后会完成StreamGraph生成流程,接下来进一步分析execute方法。

接下来的execute方法的执行步骤随着ExecutionEnvironment的不同而不同。

StreamContextEnvironment的execute方法:

  1. @Override
  2. public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
  3. // 异步执行作业
  4. final JobClient jobClient = executeAsync(streamGraph);
  5. // 获取配置的作业监听器
  6. final List<JobListener> jobListeners = getJobListeners();
  7. try {
  8. // 获取作业执行结果,逐个通知作业监听器
  9. final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
  10. jobListeners.forEach(jobListener ->
  11. jobListener.onJobExecuted(jobExecutionResult, null));
  12. return jobExecutionResult;
  13. } catch (Throwable t) {
  14. jobListeners.forEach(jobListener ->
  15. jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)));
  16. ExceptionUtils.rethrowException(t);
  17. // never reached, only make javac happy
  18. return null;
  19. }
  20. }

LocalStreamEnvironment的execute方法:

  1. @Override
  2. public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
  3. return super.execute(streamGraph);
  4. }

调用了父类的execute方法。它的父类正是StreamExecutionEnvironment

  1. /**
  2. * Triggers the program execution. The environment will execute all parts of
  3. * the program that have resulted in a "sink" operation. Sink operations are
  4. * for example printing results or forwarding them to a message queue.
  5. *
  6. * @param streamGraph the stream graph representing the transformations
  7. * @return The result of the job execution, containing elapsed time and accumulators.
  8. * @throws Exception which occurs during job execution.
  9. */
  10. @Internal
  11. public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
  12. // 异步执行作业
  13. final JobClient jobClient = executeAsync(streamGraph);
  14. try {
  15. final JobExecutionResult jobExecutionResult;
  16. // 使用attached模式执行作业由于需要保持client端不关闭,所以这里同步等待作业执行结果
  17. if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
  18. jobExecutionResult = jobClient.getJobExecutionResult().get();
  19. } else {
  20. // // 异步模式则不需要
  21. jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
  22. }
  23. // 逐个通知jobListener
  24. jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
  25. return jobExecutionResult;
  26. } catch (Throwable t) {
  27. // get() on the JobExecutionResult Future will throw an ExecutionException. This
  28. // behaviour was largely not there in Flink versions before the PipelineExecutor
  29. // refactoring so we should strip that exception.
  30. Throwable strippedException = ExceptionUtils.stripExecutionException(t);
  31. jobListeners.forEach(jobListener -> {
  32. jobListener.onJobExecuted(null, strippedException);
  33. });
  34. ExceptionUtils.rethrowException(strippedException);
  35. // never reached, only make javac happy
  36. return null;
  37. }
  38. }

接下来我们分析StreamContextEnvironment的executeAsync方法。

  1. @Override
  2. public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
  3. // 检查一个environment中不能调用多次execute或executeAsync
  4. validateAllowedExecution();
  5. // 调用父类的executeAsync方法
  6. // 和LocalStreamEnvironment中的executeAsync相同
  7. final JobClient jobClient = super.executeAsync(streamGraph);
  8. if (!suppressSysout) {
  9. System.out.println("Job has been submitted with JobID " + jobClient.getJobID());
  10. }
  11. return jobClient;
  12. }

最终两个Environment的executeAsync方法归为一致。

下面是StreamExecutionEnvironment类的executeAsync方法:

  1. /**
  2. * Triggers the program execution asynchronously. The environment will execute all parts of
  3. * the program that have resulted in a "sink" operation. Sink operations are
  4. * for example printing results or forwarding them to a message queue.
  5. *
  6. * @param streamGraph the stream graph representing the transformations
  7. * @return A {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
  8. * @throws Exception which occurs during job execution.
  9. */
  10. @Internal
  11. public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
  12. // 检查streamGraph不能为null
  13. checkNotNull(streamGraph, "StreamGraph cannot be null.");
  14. // 检查部署目标配置不能为null
  15. // 部署目标即作业运行的模式,例如本地模式,远程模式,yarn模式或者是k8s模式
  16. checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
  17. // 基于configuration创建一个合适的执行器工厂类
  18. // 获取作业执行器工厂类
  19. final PipelineExecutorFactory executorFactory =
  20. executorServiceLoader.getExecutorFactory(configuration);
  21. checkNotNull(
  22. executorFactory,
  23. "Cannot find compatible factory for specified execution.target (=%s)",
  24. configuration.get(DeploymentOptions.TARGET));
  25. // 从执行器工厂获取执行器,运行包含用户作业的streamGraph
  26. CompletableFuture<JobClient> jobClientFuture = executorFactory
  27. .getExecutor(configuration) //根据configuration获取对应的执行期
  28. .execute(streamGraph, configuration, userClassloader); // 根据stramgraph, configuration, 类加载器 开始执行作业
  29. try {
  30. // 通知各个作业监听器作业已提交
  31. JobClient jobClient = jobClientFuture.get();
  32. jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
  33. return jobClient;
  34. } catch (ExecutionException executionException) {
  35. final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);
  36. jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));
  37. throw new FlinkException(
  38. String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
  39. strippedException);
  40. }
  41. }

到这里不同的执行环境具体的逻辑又要开始走向不同。这些逻辑的分叉点在executorServiceLoader.getExecutorFactory(configuration),不同环境下获取到的PipelineExecutorFactory是不同的。在创建LocalStreamEnvironment或者是StreamContextEnvironment的时候executorServiceLoader变量传入的是DefaultExecutorServiceLoader。我们查看它的getExecutorFactory方法。如下所示:

DefaultExecutorServiceLoader的getExecutorFactory方法:

  1. public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
  2. checkNotNull(configuration);
  3. // 根据配置文件加载并实例化PipelineExecutorFactory的实现类
  4. final ServiceLoader<PipelineExecutorFactory> loader =
  5. ServiceLoader.load(PipelineExecutorFactory.class);
  6. final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
  7. final Iterator<PipelineExecutorFactory> factories = loader.iterator();
  8. while (factories.hasNext()) {
  9. try {
  10. // 遍历所有加载的factory
  11. // 只保留和配置文件兼容的factory
  12. final PipelineExecutorFactory factory = factories.next();
  13. if (factory != null && factory.isCompatibleWith(configuration)) {
  14. compatibleFactories.add(factory);
  15. }
  16. } catch (Throwable e) {
  17. if (e.getCause() instanceof NoClassDefFoundError) {
  18. LOG.info("Could not load factory due to missing dependencies.");
  19. } else {
  20. throw e;
  21. }
  22. }
  23. }
  24. // 如果兼容的factory有多个,打印错误信息
  25. if (compatibleFactories.size() > 1) {
  26. final String configStr =
  27. configuration.toMap().entrySet().stream()
  28. .map(e -> e.getKey() + "=" + e.getValue())
  29. .collect(Collectors.joining("\n"));
  30. throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
  31. }
  32. // 如果兼容的factory是0个,抛出异常信息
  33. if (compatibleFactories.isEmpty()) {
  34. throw new IllegalStateException("No ExecutorFactory found to execute the application.");
  35. }
  36. // 返回这个唯一的factory
  37. return compatibleFactories.get(0);
  38. }

在这个方法中使用了Java SPI机制,根据META-INF/services内的配置文件动态加载并实例化PipelineExecutorFactory的子类。

PipelineExecutorFactory和PipelineExecutor

PipelineExecutorFactory

我们查找下org.apache.flink.core.execution.PipelineExecutorFactory配置文件,发现共有3个,分别位于源码flink-clientsflink-yarnflink-kubernetes子项目中。下面逐个分析。

flink-clients中的org.apache.flink.core.execution.PipelineExecutorFactory内容如下:

  1. org.apache.flink.client.deployment.executors.RemoteExecutorFactory
  2. org.apache.flink.client.deployment.executors.LocalExecutorFactory

也就是说如果我们引入了flink-clients包,会创建RemoteExecutorFactoryLocalExecutorFactory实例。

我们在分别查看下他们的isCompatibleWith方法。

RemoteExecutorFactory的isCompatibleWith方法:

  1. @Override
  2. public boolean isCompatibleWith(final Configuration configuration) {
  3. // 配置文件 中 execution.target=remote时;执行该方法
  4. return RemoteExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
  5. }

要求配置文件中execution.target配置为remote,才会使用RemoteExecutorFactory。

LocalExecutorFactory的isCompatibleWith方法:

  1. @Override
  2. public boolean isCompatibleWith(final Configuration configuration) {
  3. // 配置文件 中 execution.target=local 时;执行该方法
  4. return LocalExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));
  5. }

要求配置文件中execution.target配置为local,才会使用LocalExecutorFactory。

flink-yarn中的org.apache.flink.core.execution.PipelineExecutorFactory内容如下:

  1. org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
  2. org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory

和上面类似,这里只说明各个factory兼容的配置项,不再贴出isCompatibleWith方法代码。

  • YarnJobClusterExecutorFactory:要求execution.target为yarn-per-job
  • YarnSessionClusterExecutorFactory:要求execution.target为yarn-session

flink-kubernetes中的org.apache.flink.core.execution.PipelineExecutorFactory内容如下:

  1. org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutorFactory

其中KubernetesSessionClusterExecutorFactory要求execution.target配置为kubernetes-session。

PipelineExecutor

接下来我们重点讨论两个Executor:LocalExecutorRemoteExecutor。这两个executor创建的逻辑比较简单,此处不再介绍。

PipelineExecutor执行作业的方法为execute。它有3个参数:

  • pipeline:要执行的作业,指的是StreamGraph。
  • configuration:作业的配置。
  • userCodeClassloader:用户作业的类加载器。和Flink本身使用不同类加载器的原因是不同用户作业加载的class可能冲突,用户作业和Flink框架本身加载的class也可能冲突。为了避免这种冲突,用户作业采用不同的类加载器加载。

LocalExecutor

LocalExecutor用于在本地执行任务。它的execute方法如下:

  1. public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration,
  2. ClassLoader userCodeClassloader) throws Exception {
  3. // 校验检查
  4. checkNotNull(pipeline);
  5. checkNotNull(configuration);
  6. //
  7. Configuration effectiveConfig = new Configuration();
  8. effectiveConfig.addAll(this.configuration);
  9. effectiveConfig.addAll(configuration);
  10. // we only support attached execution with the local executor.
  11. // 只支持ATTACHED模式运行
  12. checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
  13. // 将StreamGraph转换为JobGraph
  14. final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
  15. // 创建一个MiniCluster
  16. // 并调用MiniCluster的submitJob方法,提交作业
  17. return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
  18. .submitJob(jobGraph, userCodeClassloader);
  19. }

在submit作业之前,需要将StreamGraph转换为JobGraph。后续会补充JobGraph如何生成。

在方法的最后调用了PerJobMiniClusterFactory的submitJob方法。PerJobMiniClusterFactory实际操作的是MiniCluster对象。顾名思义,它是一个”小型集群”,所有的作业都在本地运行。

PerJobMiniClusterFactorysubmitJob方法代码如下:

  1. /**
  2. * Starts a {@link MiniCluster} and submits a job.
  3. */
  4. public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
  5. // 获取MiniCluster的配置,指定最大并行度
  6. MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());
  7. // 创建出一个MiniCluster
  8. MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
  9. // 启动miniCluster
  10. miniCluster.start();
  11. return miniCluster
  12. .submitJob(jobGraph)
  13. .thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
  14. org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(
  15. () -> miniCluster.getJobStatus(submissionResult.getJobID()).get(),
  16. () -> miniCluster.requestJobResult(submissionResult.getJobID()).get(),
  17. userCodeClassloader);
  18. return submissionResult;
  19. }))
  20. .thenApply(result -> new MiniClusterJobClient(
  21. result.getJobID(),
  22. miniCluster,
  23. userCodeClassloader,
  24. MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER))
  25. .whenComplete((ignored, throwable) -> {
  26. if (throwable != null) {
  27. // We failed to create the JobClient and must shutdown to ensure cleanup.
  28. shutDownCluster(miniCluster);
  29. }
  30. })
  31. .thenApply(Function.identity());
  32. }

RemoteExecutor

remoteExecutor的execute方法位于它的父类AbstractSessionClusterExecutor中。代码和分析如下所示:

  1. @Override
  2. public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline,
  3. @Nonnull final Configuration configuration,
  4. @Nonnull final ClassLoader userCodeClassloader) throws Exception {
  5. // 和之前相同,仍然是生成JobGraph
  6. final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
  7. try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory
  8. .createClusterDescriptor(configuration)) {
  9. // 获取远程集群ID
  10. final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
  11. checkState(clusterID != null);
  12. final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor.retrieve(clusterID);
  13. // 创建出clusterClient,用户和远程集群通信,提交作业
  14. ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
  15. return clusterClient
  16. .submitJob(jobGraph) // 提交作业
  17. .thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {
  18. ClientUtils.waitUntilJobInitializationFinished(
  19. () -> clusterClient.getJobStatus(jobId).get(),
  20. () -> clusterClient.requestJobResult(jobId).get(),
  21. userCodeClassloader);
  22. return jobId;
  23. }))
  24. .thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(
  25. clusterClientProvider,
  26. jobID,
  27. userCodeClassloader))
  28. .whenComplete((ignored1, ignored2) -> clusterClient.close());
  29. }
  30. }

ClusterClient根据集群类型的不同,有两个子类:MiniClusterClientRestClusterClient。其中MiniClusterClient用户和MiniCluster通信,它的submitJob方法实际调用的是MiniCluster的submitJob方法。

RestClusterClient通过http rest请求和远程集群通信。它的submitJob方法如下所示:

  1. @Override
  2. public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
  3. CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
  4. try {
  5. // 创建临时文件
  6. final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
  7. // 写到输出对象流中
  8. try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
  9. objectOut.writeObject(jobGraph);
  10. }
  11. // 返回文件路径
  12. return jobGraphFile;
  13. } catch (IOException e) {
  14. throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
  15. }
  16. }, executorService);
  17. // 在JobGraph写入文件完成之后执行
  18. CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
  19. // jar文件名列表
  20. List<String> jarFileNames = new ArrayList<>(8);
  21. // 缓存的分布式文件列表
  22. List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
  23. // 需要上传的文件列表
  24. Collection<FileUpload> filesToUpload = new ArrayList<>(8);
  25. //首先加载以及序列化的jobgraph文件
  26. filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
  27. // 从JobGraph中获取用户的jar文件路径,加入到上传列表
  28. for (Path jar : jobGraph.getUserJars()) {
  29. jarFileNames.add(jar.getName());
  30. filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
  31. }
  32. for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) {
  33. final Path artifactFilePath = new Path(artifacts.getValue().filePath);
  34. try {
  35. // Only local artifacts need to be uploaded.
  36. // 上传用户作业运行所需的其它类型文件
  37. // 只需要上传本地储存的文件
  38. if (!artifactFilePath.getFileSystem().isDistributedFS()) {
  39. artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), artifactFilePath.getName()));
  40. filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
  41. }
  42. } catch (IOException e) {
  43. throw new CompletionException(
  44. new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", e));
  45. }
  46. }
  47. // 创建作业提交请求体
  48. final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
  49. jobGraphFile.getFileName().toString(),
  50. jarFileNames,
  51. artifactFileNames);
  52. // 返回请求体和需要上传的文件
  53. return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
  54. });
  55. // 请求构建完毕后提交请求
  56. final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
  57. requestAndFileUploads -> sendRetriableRequest(
  58. JobSubmitHeaders.getInstance(),
  59. EmptyMessageParameters.getInstance(),
  60. requestAndFileUploads.f0,
  61. requestAndFileUploads.f1,
  62. isConnectionProblemOrServiceUnavailable())
  63. );
  64. // 请求发送完毕之后,删除JobGraph临时文件
  65. submissionFuture
  66. .thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)
  67. .thenAccept(jobGraphFile -> {
  68. try {
  69. Files.delete(jobGraphFile);
  70. } catch (IOException e) {
  71. LOG.warn("Could not delete temporary file {}.", jobGraphFile, e);
  72. }
  73. });
  74. return submissionFuture
  75. .thenApply(ignore -> jobGraph.getJobID())
  76. .exceptionally(
  77. (Throwable throwable) -> {
  78. throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(),
  79. "Failed to submit JobGraph.", ExceptionUtils.stripCompletionException(throwable)));
  80. });
  81. }

由于远程集群的JobManagerTaskManager需要单独部署启动,和本篇内容关联不大,此处不再详细描述。

MiniCluster

前面已经简单介绍过MiniCluster,它用于在本地环境执行作业。

start方法

启动MiniCluster的逻辑位于start方法中。

  1. /**
  2. * Starts the mini cluster, based on the configured properties.
  3. *
  4. * @throws Exception This method passes on any exception that occurs during the startup of
  5. * the mini cluster.
  6. */
  7. public void start() throws Exception {
  8. synchronized (lock) {
  9. checkState(!running, "MiniCluster is already running");
  10. LOG.info("Starting Flink Mini Cluster");
  11. LOG.debug("Using configuration {}", miniClusterConfiguration);
  12. final Configuration configuration = miniClusterConfiguration.getConfiguration();
  13. // MiniCluster中的组件使用同一个共享的RPC服务
  14. final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
  15. try {
  16. // 初始化IO相关配置,是否复写和是否创建output directory等
  17. initializeIOFormatClasses(configuration);
  18. LOG.info("Starting Metrics Registry");
  19. // 创建监控相关配置
  20. metricRegistry = createMetricRegistry(configuration);
  21. // bring up all the RPC services
  22. LOG.info("Starting RPC Service(s)");
  23. final RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory;
  24. final RpcService metricQueryServiceRpcService;
  25. // 使用共享rpc服务
  26. if (useSingleRpcService) {
  27. // we always need the 'commonRpcService' for auxiliary calls
  28. // 创建本地RPC服务
  29. commonRpcService = createLocalRpcService(configuration);
  30. // 创建通用RPC服务工厂
  31. final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);
  32. // TaskManager RPC服务工厂使用RPC服务工厂
  33. taskManagerRpcServiceFactory = commonRpcServiceFactory;
  34. dispatcherResourceManagerComponentRpcServiceFactory = commonRpcServiceFactory;
  35. // 启动RPC查询服务
  36. metricQueryServiceRpcService = MetricUtils.startLocalMetricsRpcService(configuration);
  37. } else {
  38. // start a new service per component, possibly with custom bind addresses
  39. // 如果不共用RPC服务,获取JobManager和TaskManager地址和端口范围
  40. final String jobManagerExternalAddress = miniClusterConfiguration.getJobManagerExternalAddress();
  41. final String taskManagerExternalAddress = miniClusterConfiguration.getTaskManagerExternalAddress();
  42. final String jobManagerExternalPortRange = miniClusterConfiguration.getJobManagerExternalPortRange();
  43. final String taskManagerExternalPortRange = miniClusterConfiguration.getTaskManagerExternalPortRange();
  44. final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
  45. final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
  46. // 分别创建各个组件的factory和服务等
  47. dispatcherResourceManagerComponentRpcServiceFactory =
  48. new DedicatedRpcServiceFactory(
  49. configuration,
  50. jobManagerExternalAddress,
  51. jobManagerExternalPortRange,
  52. jobManagerBindAddress);
  53. taskManagerRpcServiceFactory =
  54. new DedicatedRpcServiceFactory(
  55. configuration,
  56. taskManagerExternalAddress,
  57. taskManagerExternalPortRange,
  58. taskManagerBindAddress);
  59. // we always need the 'commonRpcService' for auxiliary calls
  60. // bind to the JobManager address with port 0
  61. commonRpcService = createRemoteRpcService(configuration, jobManagerBindAddress, 0);
  62. metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(
  63. configuration,
  64. commonRpcService.getAddress());
  65. }
  66. // 启动监控查询服务
  67. metricRegistry.startQueryService(metricQueryServiceRpcService, null);
  68. // 创建进程监控指标组
  69. processMetricGroup = MetricUtils.instantiateProcessMetricGroup(
  70. metricRegistry,
  71. RpcUtils.getHostname(commonRpcService),
  72. ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
  73. // 创建IO线程池
  74. ioExecutor = Executors.newFixedThreadPool(
  75. ClusterEntrypointUtils.getPoolSize(configuration),
  76. new ExecutorThreadFactory("mini-cluster-io"));
  77. // 创建高可用服务
  78. haServices = createHighAvailabilityServices(configuration, ioExecutor);
  79. // 启动blobServer
  80. blobServer = new BlobServer(configuration, haServices.createBlobStore());
  81. blobServer.start();
  82. // 创建心跳服务
  83. heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
  84. // 创建blob缓存服务
  85. blobCacheService = new BlobCacheService(
  86. configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
  87. );
  88. // 启动TaskManager
  89. startTaskManagers();
  90. // 创建监控查询获取服务
  91. MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());
  92. // 创建Dispatcher和ResourceManager,它们在同一个进程中运行
  93. setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagerComponentRpcServiceFactory, metricQueryServiceRetriever);
  94. // 创建ResourceManager leader获取服务
  95. resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
  96. dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
  97. clusterRestEndpointLeaderRetrievalService = haServices.getClusterRestEndpointLeaderRetriever();
  98. // 创建Dispatcher gateway获取服务
  99. dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
  100. commonRpcService,
  101. DispatcherGateway.class,
  102. DispatcherId::fromUuid,
  103. new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
  104. resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
  105. commonRpcService,
  106. ResourceManagerGateway.class,
  107. ResourceManagerId::fromUuid,
  108. new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));
  109. // 创建WebMonitor leader获取服务
  110. webMonitorLeaderRetriever = new LeaderRetriever();
  111. // 分别启动这些服务
  112. resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
  113. dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
  114. clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);
  115. }
  116. catch (Exception e) {
  117. // cleanup everything
  118. try {
  119. close();
  120. } catch (Exception ee) {
  121. e.addSuppressed(ee);
  122. }
  123. throw e;
  124. }
  125. // create a new termination future
  126. terminationFuture = new CompletableFuture<>();
  127. // now officially mark this as running
  128. running = true;
  129. LOG.info("Flink Mini Cluster started successfully");
  130. }
  131. }

通过上面的分析我们熟悉了MiniCluster的启动流程。接下来分析下启动TaskManager的逻辑,位于startTaskManagers方法。

  1. @GuardedBy("lock")
  2. private void startTaskManagers() throws Exception {
  3. final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
  4. // 启动task manager的个数
  5. LOG.info("Starting {} TaskManger(s)", numTaskManagers);
  6. for (int i = 0; i < numTaskManagers; i++) {
  7. startTaskExecutor();
  8. }
  9. }

继续跟踪startTaskExecutor方法:

  1. @VisibleForTesting
  2. void startTaskExecutor() throws Exception {
  3. synchronized (lock) {
  4. final Configuration configuration = miniClusterConfiguration.getConfiguration();
  5. // 创建TaskExecutor
  6. final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
  7. configuration,
  8. new ResourceID(UUID.randomUUID().toString()),
  9. taskManagerRpcServiceFactory.createRpcService(),
  10. haServices,
  11. heartbeatServices,
  12. metricRegistry,
  13. blobCacheService,
  14. useLocalCommunication(),
  15. ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
  16. taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
  17. // 启动TaskExecutor
  18. taskExecutor.start();
  19. taskManagers.add(taskExecutor);
  20. }
  21. }

taskmanger创建了多个taskexecutor,到这里TaskManager已经启动完毕。

Dispatcher

Dispatcher负责提交作业和创建出JobManager。

Dispatcher有2个子类,MiniDispatcher和StandaloneDispatcher。分别用于提交任务给MiniCluster和其他独立的cluster。其中StandaloneDispatcher的实现
最为简单,没有override父类Dispatcher的任何方法。

接下来我们分析下MiniDispatchersubmitJob方法。

  1. @Override
  2. public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
  3. // 调用Dispatcher的submitJob方法
  4. final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = super.submitJob(jobGraph, timeout);
  5. acknowledgeCompletableFuture.whenComplete(
  6. (Acknowledge ignored, Throwable throwable) -> {
  7. if (throwable != null) {
  8. onFatalError(new FlinkException(
  9. "Failed to submit job " + jobGraph.getJobID() + " in job mode.",
  10. throwable));
  11. }
  12. });
  13. return acknowledgeCompletableFuture;
  14. }

继续查看父类DispachersubmitJob方法,如下所示:

  1. @Override
  2. public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
  3. log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
  4. try {
  5. // 检查作业是否重复
  6. if (isDuplicateJob(jobGraph.getJobID())) {
  7. return FutureUtils.completedExceptionally(
  8. new DuplicateJobSubmissionException(jobGraph.getJobID()));
  9. // 检查部分资源是否已配置
  10. } else if (isPartialResourceConfigured(jobGraph)) {
  11. return FutureUtils.completedExceptionally(
  12. new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
  13. "resources configured. The limitation will be removed in future versions."));
  14. } else {
  15. // 内部提交作业
  16. return internalSubmitJob(jobGraph);
  17. }
  18. } catch (FlinkException e) {
  19. return FutureUtils.completedExceptionally(e);
  20. }
  21. }

接下来流程到了internalSubmitJob方法,代码如下:

  1. private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
  2. log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
  3. // 调用persistAndRunJob方法
  4. final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
  5. this::persistAndRunJob)
  6. .thenApply(ignored -> Acknowledge.get());
  7. return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
  8. if (throwable != null) {
  9. cleanUpJobData(jobGraph.getJobID(), true);
  10. ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
  11. final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
  12. log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
  13. throw new CompletionException(
  14. new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
  15. } else {
  16. return acknowledge;
  17. }
  18. }, ioExecutor);
  19. }

继续跟踪persistAndRunJob方法:

  1. private void persistAndRunJob(JobGraph jobGraph) throws Exception {
  2. // 存储JobGraph
  3. jobGraphWriter.putJobGraph(jobGraph);
  4. // // 调用runJob方法
  5. runJob(jobGraph, ExecutionType.SUBMISSION);
  6. }

分析下runJob方法:

  1. private void runJob(JobGraph jobGraph, ExecutionType executionType) {
  2. Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
  3. // 作业时间戳
  4. long initializationTimestamp = System.currentTimeMillis();
  5. // 调用启动JobManager逻辑
  6. CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);
  7. DispatcherJob dispatcherJob = DispatcherJob.createFor(
  8. jobManagerRunnerFuture,
  9. jobGraph.getJobID(),
  10. jobGraph.getName(),
  11. initializationTimestamp);
  12. // 将当前Job存入runningJobs集合
  13. runningJobs.put(jobGraph.getJobID(), dispatcherJob);
  14. final JobID jobId = jobGraph.getJobID();
  15. // 处理Job提交结果,进行cleanup操作
  16. final CompletableFuture<CleanupJobState> cleanupJobStateFuture = dispatcherJob.getResultFuture().handleAsync(
  17. (dispatcherJobResult, throwable) -> {
  18. Preconditions.checkState(runningJobs.get(jobId) == dispatcherJob, "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
  19. if (dispatcherJobResult != null) {
  20. return handleDispatcherJobResult(jobId, dispatcherJobResult, executionType);
  21. } else {
  22. return dispatcherJobFailed(jobId, throwable);
  23. }
  24. }, getMainThreadExecutor());
  25. // 在cleanup完成之后终止作业
  26. final CompletableFuture<Void> jobTerminationFuture = cleanupJobStateFuture
  27. .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
  28. .thenCompose(Function.identity());
  29. FutureUtils.assertNoException(jobTerminationFuture);
  30. registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
  31. }

这里到了MiniCluster提交作业流程的最后一步,创建JobManagerRunner

createJobManagerRunner方法分析如下:

  1. CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
  2. // 获取RPC服务
  3. final RpcService rpcService = getRpcService();
  4. return CompletableFuture.supplyAsync(
  5. () -> {
  6. try {
  7. // 创建出一个JobManager启动器
  8. // 传入JobGraph和其他参数
  9. JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
  10. jobGraph,
  11. configuration,
  12. rpcService,
  13. highAvailabilityServices,
  14. heartbeatServices,
  15. jobManagerSharedServices,
  16. new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
  17. fatalErrorHandler,
  18. initializationTimestamp);
  19. // 启动JobManager启动器
  20. runner.start();
  21. return runner;
  22. } catch (Exception e) {
  23. throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
  24. }
  25. },
  26. ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
  27. }

下面我们开始分析JobManager启动逻辑。

JobManagerRunner和JobManagerRunnerFactory

JobManagerRunnerFactory

接下来我们分析JobManagerRunnerFactory的唯一实现类DefaultJobManagerRunnerFactorycreateJobManagerRunner方法。

  1. @Override
  2. public JobManagerRunner createJobManagerRunner(
  3. JobGraph jobGraph,
  4. Configuration configuration,
  5. RpcService rpcService,
  6. HighAvailabilityServices highAvailabilityServices,
  7. HeartbeatServices heartbeatServices,
  8. JobManagerSharedServices jobManagerServices,
  9. JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
  10. FatalErrorHandler fatalErrorHandler,
  11. long initializationTimestamp) throws Exception {
  12. // // 创建JobManager的配置
  13. final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
  14. // 创建SlotPool服务和Scheduler的工厂类
  15. // 该工厂类用于创建SlotPoolService和SchedulerNG
  16. final SlotPoolFactory slotPoolFactory = SlotPoolFactory.fromConfiguration(configuration);
  17. final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
  18. final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);
  19. // 创建JobMaster服务工厂
  20. // JobMaster服务用于获取JobManaster所在地址,获取和JobMaster通信的Gateway
  21. final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(
  22. jobMasterConfiguration,
  23. slotPoolFactory,
  24. rpcService,
  25. highAvailabilityServices,
  26. jobManagerServices,
  27. heartbeatServices,
  28. jobManagerJobMetricGroupFactory,
  29. fatalErrorHandler,
  30. schedulerNGFactory,
  31. shuffleMaster);
  32. // 创建JobManagerRunnerImpl
  33. return new JobManagerRunnerImpl(
  34. jobGraph,
  35. jobMasterFactory,
  36. highAvailabilityServices,
  37. jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),
  38. jobManagerServices.getScheduledExecutorService(),
  39. fatalErrorHandler,
  40. initializationTimestamp);
  41. }

JobManagerRunner

JobManagerRunner用于启动JobManager。JobManagerRunner在创建时需要获取用户代码类加载器,RunningJobsRegistry(用于跟踪job执行状态,等待执行,执行中或者是执行完毕)和leader选举服务。

接下来需要启动JobManager。我们查看下start方法:

  1. @Override
  2. public void start() throws Exception {
  3. try {
  4. leaderElectionService.start(this);
  5. } catch (Exception e) {
  6. log.error("Could not start the JobManager because the leader election service did not start.", e);
  7. throw new Exception("Could not start the leader election service.", e);
  8. }
  9. }

start方法就一个任务:启动leader选举服务。启动leader选举过程。leader选举过程详细分析将在后续补充完整!

一旦有JobManagerRunner实例被授予leader角色,它的grantLeadership方法会被调用。grantLeadership是LeaderContender接口的方法,所有参与leader竞选的角色都必须要实现这个接口。JobManagerRunner自然也不例外。
我们查看grantLeadership方法:

  1. @Override
  2. public void grantLeadership(final UUID leaderSessionID) {
  3. synchronized (lock) {
  4. if (shutdown) {
  5. log.debug("JobManagerRunner cannot be granted leadership because it is already shut down.");
  6. return;
  7. }
  8. leadershipOperation = leadershipOperation.thenCompose(
  9. (ignored) -> {
  10. synchronized (lock) {
  11. return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
  12. }
  13. });
  14. handleException(leadershipOperation, "Could not start the job manager.");
  15. }
  16. }

接下来到了校验作业调度和启动JobManager的时候。这部分逻辑位于verifyJobSchedulingStatusAndStartJobManager方法。

  1. private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
  2. // 检查作业调度状态
  3. final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
  4. return jobSchedulingStatusFuture.thenCompose(
  5. jobSchedulingStatus -> {
  6. // 如果作业已经执行完毕,调用执行完毕逻辑
  7. if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
  8. return jobAlreadyDone();
  9. } else {
  10. // 否则,启动JobMaster
  11. return startJobMaster(leaderSessionId);
  12. }
  13. });
  14. }

这么长一路下来,终于到了启动JobMaster的时候。

  1. private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
  2. log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
  3. jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());
  4. try {
  5. // 先设置作业的状态为正在运行
  6. runningJobsRegistry.setJobRunning(jobGraph.getJobID());
  7. } catch (IOException e) {
  8. return FutureUtils.completedExceptionally(
  9. new FlinkException(
  10. String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
  11. e));
  12. }
  13. final CompletableFuture<Acknowledge> startFuture;
  14. try {
  15. // 然后启动JobMaster服务
  16. startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
  17. } catch (Exception e) {
  18. return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
  19. }
  20. final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
  21. return startFuture.thenAcceptAsync(
  22. (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
  23. leaderSessionId,
  24. jobMasterService.getAddress(),
  25. currentLeaderGatewayFuture),
  26. executor);
  27. }

到这里为止,执行过程流转到了JobMaster对象。

JobMaster

JobMaster负责执行一个JobGraph。

  1. public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
  2. // make sure we receive RPC and async calls
  3. start();
  4. return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
  5. }

start方法调用了RPC的start方法

startJobExecution方法启动JobMaster服务和开始任务调度:

  1. private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
  2. // 检查是否在主线程执行
  3. validateRunsInMainThread();
  4. checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
  5. if (Objects.equals(getFencingToken(), newJobMasterId)) {
  6. log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
  7. return Acknowledge.get();
  8. }
  9. setNewFencingToken(newJobMasterId);
  10. startJobMasterServices();
  11. log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
  12. resetAndStartScheduler();
  13. return Acknowledge.get();
  14. }

startJobMasterServices方法创建出TaskManager心跳管理器,启动SlotPoolService和建立起ResourceManager leader的连接(ResourceManager也有leader选举过程)。

  1. private void startJobMasterServices() throws Exception {
  2. startHeartbeatServices();
  3. // start the slot pool make sure the slot pool now accepts messages for this leader
  4. slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
  5. //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
  6. // try to reconnect to previously known leader
  7. reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
  8. // job is ready to go, try to establish connection with resource manager
  9. // - activate leader retrieval for the resource manager
  10. // - on notification of the leader, the connection will be established and
  11. // the slot pool will start requesting slots
  12. resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
  13. }

我们回到上面的resetAndStartScheduler方法,查看它的代码

  1. private void resetAndStartScheduler() throws Exception {
  2. validateRunsInMainThread();
  3. final CompletableFuture<Void> schedulerAssignedFuture;
  4. if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
  5. schedulerAssignedFuture = CompletableFuture.completedFuture(null);
  6. schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
  7. } else {
  8. suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
  9. final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
  10. final SchedulerNG newScheduler = createScheduler(executionDeploymentTracker, newJobManagerJobMetricGroup);
  11. schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle(
  12. (ignored, throwable) -> {
  13. newScheduler.setMainThreadExecutor(getMainThreadExecutor());
  14. assignScheduler(newScheduler, newJobManagerJobMetricGroup);
  15. return null;
  16. }
  17. );
  18. }
  19. FutureUtils.assertNoException(schedulerAssignedFuture.thenRun(this::startScheduling));
  20. }

我们回到上面的startScheduling方法,查看它的代码

  1. private void startScheduling() {
  2. checkState(jobStatusListener == null);
  3. // register self as job status change listener
  4. jobStatusListener = new JobManagerJobStatusListener();
  5. schedulerNG.registerJobStatusListener(jobStatusListener);
  6. schedulerNG.startScheduling();
  7. }

SchedulerNG

SchedulerNG为Flink的调度器接口,负责根据JobGraph创建ExecutionGraph然后将作业调度执行。

下面的分析我们已默认的DefaultScheduler为准进行分析。DefaultScheduler的父类为SchedulerBase。它在初始化的时候将JobGraph转换为ExecutionGraph

  1. @Override
  2. public final void startScheduling() {
  3. mainThreadExecutor.assertRunningInMainThread();
  4. registerJobMetrics();
  5. startAllOperatorCoordinators();
  6. startSchedulingInternal();
  7. }

我们回到开始调度执行的逻辑。SchedulerBasestartScheduling方法调用了startSchedulingInternal

  1. @Override
  2. protected void startSchedulingInternal() {
  3. log.info("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());
  4. // 设置ExecutionGraph的JobStatus状态为Running
  5. prepareExecutionGraphForNgScheduling();
  6. // 执行调度策略的startScheduling方法
  7. schedulingStrategy.startScheduling();
  8. }

接下来我们查看下SchedulingStrategy唯一的实现类PipelinedRegionSchedulingStrategystartScheduling方法。

  1. @Override
  2. public void startScheduling() {
  3. final Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils
  4. .toStream(schedulingTopology.getAllPipelinedRegions())
  5. .filter(region -> !region.getConsumedResults().iterator().hasNext())
  6. .collect(Collectors.toSet());
  7. maybeScheduleRegions(sourceRegions);
  8. }

此方法先创建出sourceRegions集合。获取所有的Pipeline类型的Region。然后过滤掉其中不包含consumed result的region(region从上游region接收的数据称为consumed result,为下游region输出的数据称为produced result),即最后剩下的是pipelined region的源头节点。

接下来轮到maybeScheduleRegions方法。

  1. private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {
  2. final List<SchedulingPipelinedRegion> regionsSorted =
  3. SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regions);
  4. for (SchedulingPipelinedRegion region : regionsSorted) {
  5. maybeScheduleRegion(region);
  6. }
  7. }

该方法将这些region安装拓扑顺序排序后,逐个调用maybeScheduleRegion方法。

  1. private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {
  2. // 如果region中有一个consumed result状态不是CONSUMABLE(数据可以被消费),返回
  3. if (!areRegionInputsAllConsumable(region)) {
  4. return;
  5. }
  6. // 检查region中所有的节点必须为已创建状态
  7. checkState(areRegionVerticesAllInCreatedState(region), "BUG: trying to schedule a region which is not in CREATED state");
  8. // 创建各个Execution节点和部署选项
  9. final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
  10. SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
  11. regionVerticesSorted.get(region),
  12. id -> deploymentOption);
  13. // 开始为执行节点分配资源(slot)和部署
  14. schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
  15. }

SchedulerOperations接口是SchedulingStrategy用于实现调度决策的方法。它拥有一个实现类DefaultScheduler

  1. @Override
  2. public void allocateSlotsAndDeploy(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {
  3. // 检查每个ExecutionVertex的状态必须为ExecutionState.CREATED
  4. validateDeploymentOptions(executionVertexDeploymentOptions);
  5. // 将ExecutionVertexID提取成key
  6. final Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =
  7. groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);
  8. // 提取出所有的ExecutionVertexID
  9. final List<ExecutionVertexID> verticesToDeploy = executionVertexDeploymentOptions.stream()
  10. .map(ExecutionVertexDeploymentOption::getExecutionVertexId)
  11. .collect(Collectors.toList());
  12. // 创建每个ExecutionVertex的版本信息,默认值为1L
  13. final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
  14. executionVertexVersioner.recordVertexModifications(verticesToDeploy);
  15. // 将每个ExecutionVertex的状态切换为ExecutionState.SCHEDULED
  16. transitionToScheduled(verticesToDeploy);
  17. // 为所有vertex分配slot,即执行所需的资源
  18. final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =
  19. allocateSlots(executionVertexDeploymentOptions);
  20. // 创建vertex部署相关信息
  21. // 即ExecutionVertexVersion,ExecutionVertexDeploymentOption和SlotExecutionVertexAssignment的包装类
  22. final List<DeploymentHandle> deploymentHandles = createDeploymentHandles(
  23. requiredVersionByVertex,
  24. deploymentOptionsByVertex,
  25. slotExecutionVertexAssignments);
  26. // 等待所有节点分配资源和部署完毕
  27. waitForAllSlotsAndDeploy(deploymentHandles);
  28. }

我们继续跟踪waitForAllSlotsAndDeploy方法

  1. private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {
  2. FutureUtils.assertNoException(
  3. assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));
  4. }

assignAllResources方法给所有的vertex分配执行所需的资源,然后执行deployAll部署所有节点。

  1. private BiFunction<Void, Throwable, Void> deployAll(final List<DeploymentHandle> deploymentHandles) {
  2. return (ignored, throwable) -> {
  3. propagateIfNonNull(throwable);
  4. for (final DeploymentHandle deploymentHandle : deploymentHandles) {
  5. final SlotExecutionVertexAssignment slotExecutionVertexAssignment = deploymentHandle.getSlotExecutionVertexAssignment();
  6. final CompletableFuture<LogicalSlot> slotAssigned = slotExecutionVertexAssignment.getLogicalSlotFuture();
  7. // 确保slot分配已经完成
  8. checkState(slotAssigned.isDone());
  9. // 在slot分配完成后执行deployOrHandleError方法
  10. FutureUtils.assertNoException(
  11. slotAssigned.handle(deployOrHandleError(deploymentHandle)));
  12. }
  13. return null;
  14. };
  15. }

接着查看deployOrHandleError方法,代码如下:

  1. private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {
  2. final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
  3. final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
  4. return (ignored, throwable) -> {
  5. // 检查ExecutionVertex的版本是否已经修改
  6. // 如果已经修改,说明该vertex被其他deployment部署,终止本次部署操作
  7. if (executionVertexVersioner.isModified(requiredVertexVersion)) {
  8. log.debug("Refusing to deploy execution vertex {} because this deployment was " +
  9. "superseded by another deployment", executionVertexId);
  10. return null;
  11. }
  12. if (throwable == null) {
  13. // 部署ExecutionVertex
  14. deployTaskSafe(executionVertexId);
  15. } else {
  16. handleTaskDeploymentFailure(executionVertexId, throwable);
  17. }
  18. return null;
  19. };
  20. }

接着跳转到deployTaskSafe方法:

  1. private void deployTaskSafe(final ExecutionVertexID executionVertexId) {
  2. try {
  3. final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
  4. // 在获取ExecutionVertex之后部署节点
  5. executionVertexOperations.deploy(executionVertex);
  6. } catch (Throwable e) {
  7. handleTaskDeploymentFailure(executionVertexId, e);
  8. }
  9. }

查看下ExecutionVertexOperations的唯一实现类DefaultExecutionVertexOperations的deploy方法:

  1. @Override
  2. public void deploy(final ExecutionVertex executionVertex) throws JobException {
  3. executionVertex.deploy();
  4. }

该方法调用了ExecutionVertex的部署方法。

ExecutionVertex

ExecutionVertex的deploy方法内容如下。

  1. public void deploy() throws JobException {
  2. currentExecution.deploy();
  3. }

其中currentExecution为Execution对象。每次尝试执行ExecutionVertex都会创建出一个Execution对象。currentExecution变量保存了最近一次创建的Execution。

Execution的deploy方法内容请见下面说明:

  1. /**
  2. * Deploys the execution to the previously assigned resource.
  3. *
  4. * @throws JobException if the execution cannot be deployed to the assigned resource
  5. */
  6. public void deploy() throws JobException {
  7. assertRunningInJobMasterMainThread();
  8. final LogicalSlot slot = assignedResource;
  9. checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
  10. // Check if the TaskManager died in the meantime
  11. // This only speeds up the response to TaskManagers failing concurrently to deployments.
  12. // The more general check is the rpcTimeout of the deployment call
  13. // 资源必须是可用状态
  14. if (!slot.isAlive()) {
  15. throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
  16. }
  17. // make sure exactly one deployment call happens from the correct state
  18. // note: the transition from CREATED to DEPLOYING is for testing purposes only
  19. ExecutionState previous = this.state;
  20. // 执行状态从SCHEDULED或CREATED转换成DEPLOYING
  21. if (previous == SCHEDULED || previous == CREATED) {
  22. if (!transitionState(previous, DEPLOYING)) {
  23. // race condition, someone else beat us to the deploying call.
  24. // this should actually not happen and indicates a race somewhere else
  25. throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
  26. }
  27. }
  28. else {
  29. // vertex may have been cancelled, or it was already scheduled
  30. throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);
  31. }
  32. // 检查slot是否分配给了当前这个Execution
  33. if (this != slot.getPayload()) {
  34. throw new IllegalStateException(
  35. String.format("The execution %s has not been assigned to the assigned slot.", this));
  36. }
  37. try {
  38. // race double check, did we fail/cancel and do we need to release the slot?
  39. // 再次检查状态是否是部署中(DEPLOYING)
  40. if (this.state != DEPLOYING) {
  41. slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));
  42. return;
  43. }
  44. LOG.info("Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}", vertex.getTaskNameWithSubtaskIndex(),
  45. attemptNumber, vertex.getCurrentExecutionAttempt().getAttemptId(), getAssignedResourceLocation(), slot.getAllocationId());
  46. if (taskRestore != null) {
  47. checkState(taskRestore.getTaskStateSnapshot().getSubtaskStateMappings().stream().allMatch(entry ->
  48. entry.getValue().getInputRescalingDescriptor().equals(InflightDataRescalingDescriptor.NO_RESCALE) &&
  49. entry.getValue().getOutputRescalingDescriptor().equals(InflightDataRescalingDescriptor.NO_RESCALE)),
  50. "Rescaling from unaligned checkpoint is not yet supported.");
  51. }
  52. // 创建Task部署描述符,用来创建Task
  53. final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory
  54. .fromExecutionVertex(vertex, attemptNumber)
  55. .createDeploymentDescriptor(
  56. slot.getAllocationId(),
  57. slot.getPhysicalSlotNumber(),
  58. taskRestore,
  59. producedPartitions.values());
  60. // null taskRestore to let it be GC'ed
  61. taskRestore = null;
  62. // 获取提供资源的TaskManagerGateway
  63. // 用来和TaskManager通信
  64. final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
  65. final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
  66. vertex.getExecutionGraph().getJobMasterMainThreadExecutor();
  67. getVertex().notifyPendingDeployment(this);
  68. // We run the submission in the future executor so that the serialization of large TDDs does not block
  69. // the main thread and sync back to the main thread once submission is completed.
  70. // RPC调用,告诉TaskManager创建一个Task,执行当前Execution
  71. CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
  72. .thenCompose(Function.identity())
  73. .whenCompleteAsync(
  74. (ack, failure) -> {
  75. if (failure == null) {
  76. vertex.notifyCompletedDeployment(this);
  77. } else {
  78. if (failure instanceof TimeoutException) {
  79. String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
  80. markFailed(new Exception(
  81. "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
  82. + ") not responding after a rpcTimeout of " + rpcTimeout, failure));
  83. } else {
  84. markFailed(failure);
  85. }
  86. }
  87. },
  88. jobMasterMainThreadExecutor);
  89. }
  90. catch (Throwable t) {
  91. markFailed(t);
  92. if (isLegacyScheduling()) {
  93. ExceptionUtils.rethrow(t);
  94. }
  95. }
  96. }

最后,我们经历了种种曲折又复杂的过程,终于到了TaskManager执行task这一步。Task的创建和执行作者打算单独开篇来讲解。到此为止,Flink作业执行流程已分析完毕。