1.flink.sh文件

Flink提交任务的脚本为flink.sh,该文件位于 flink-dist/src/main/flink-bin/bin/ 目录下,我们去寻找flink.sh中Java入口代码。

  1. ## 省略
  2. exec
  3. $JAVA_RUN $JVM_ARGS
  4. $FLINK_ENV_JAVA_OPTS
  5. "${log_setting[@]}"
  6. -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
  7. org.apache.flink.client.cli.CliFrontend "$@"

通过flink.sh不难发现Flink提交任务的入口类为CliFrontend。找到这个类的main方法:

  1. /**
  2. * Submits the job based on the arguments.
  3. */
  4. public static void main(final String[] args) {
  5. // 日志打印环境配置信息
  6. // 例如:代码版本,当前用户,Java版本,jvm参数
  7. EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
  8. // 1. find the configuration directory
  9. // 读取flink配置文件目录
  10. final String configurationDirectory = getConfigurationDirectoryFromEnv();
  11. // 2. load the global configuration
  12. // 读取flink配置文件到configuration对象中
  13. final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
  14. // 3. load the custom command lines
  15. // 加载自定义命令行
  16. final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
  17. configuration,
  18. configurationDirectory);
  19. try {
  20. // 实例化 CliFrontend
  21. final CliFrontend cli = new CliFrontend(
  22. configuration,
  23. customCommandLines);
  24. // 安装认证配置信息
  25. SecurityUtils.install(new SecurityConfiguration(cli.configuration));
  26. // 在配置环境下调用启动参数
  27. int retCode = SecurityUtils.getInstalledContext()
  28. .runSecured(() -> cli.parseAndRun(args));
  29. System.exit(retCode);
  30. }
  31. catch (Throwable t) {
  32. final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
  33. LOG.error("Fatal error while running command line interface.", strippedThrowable);
  34. strippedThrowable.printStackTrace();
  35. System.exit(31);
  36. }
  37. }

有以上分析后,我们发现flink的主要实现逻辑位于

  1. cli.parseAndRun(args)

在分析主要逻辑之前,我们注重分析下main方法的各个执行环节。

�getConfigurationDirectoryFromEnv方法的源码:

  1. public static String getConfigurationDirectoryFromEnv() {
  2. // 从FLINK_CONF_DIR环境变量获取Flink配置文件路径
  3. String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
  4. // 如果配置了该环境变量,并且目录存在,则直接返回
  5. if (location != null) {
  6. if (new File(location).exists()) {
  7. return location;
  8. }
  9. else {
  10. throw new RuntimeException("The configuration directory '" + location + "', specified in the '" +
  11. ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist.");
  12. }
  13. }
  14. // 如果没有配置环境变量,从上层目录的conf寻找(../conf)
  15. else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
  16. location = CONFIG_DIRECTORY_FALLBACK_1;
  17. }
  18. // 如果没有配置环境变量,从当前目录的conf寻找(conf)
  19. else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
  20. location = CONFIG_DIRECTORY_FALLBACK_2;
  21. }
  22. // 如果都没有找到,系统异常退出
  23. else {
  24. throw new RuntimeException("The configuration directory was not specified. " +
  25. "Please specify the directory containing the configuration file through the '" +
  26. ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable.");
  27. }
  28. return location;
  29. }

loadCustomCommandLines方法源码:

  1. public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
  2. // GenericCLI --> flinkYarnSessionCLI --> DefaultCLI顺序加载命令行
  3. List<CustomCommandLine> customCommandLines = new ArrayList<>();
  4. customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
  5. // Command line interface of the YARN session, with a special initialization here
  6. // to prefix all options with y/yarn.
  7. // yarn回话的命令行接口,这里所有选项使用y/yarn前缀指定
  8. final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
  9. try {
  10. customCommandLines.add(
  11. loadCustomCommandLine(flinkYarnSessionCLI,
  12. configuration,
  13. configurationDirectory,
  14. "y",
  15. "yarn"));
  16. } catch (NoClassDefFoundError | Exception e) {
  17. final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
  18. try {
  19. LOG.info("Loading FallbackYarnSessionCli");
  20. customCommandLines.add(
  21. loadCustomCommandLine(errorYarnSessionCLI, configuration));
  22. } catch (Exception exception) {
  23. LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
  24. }
  25. }
  26. // Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
  27. // active CustomCommandLine in order and DefaultCLI isActive always return true.
  28. // 注意:默认客户端必须最后添加,getActiveCustomCommandLine方法将会按照CustomCommandLine顺序判断
  29. // DefaultCLI的isActive方法一直返回true
  30. customCommandLines.add(new DefaultCLI());
  31. return customCommandLines;
  32. }

�接下来,分析主要逻辑 cli.parseAndRun(args) 方法

  1. /**
  2. * Parses the command line arguments and starts the requested action.
  3. * 解析命令行参数,并开始请求操作
  4. * @param args command line arguments of the client.
  5. * @return The return code of the program
  6. */
  7. public int parseAndRun(String[] args) {
  8. // check for action
  9. // 检查命令行 个数
  10. if (args.length < 1) {
  11. CliFrontendParser.printHelp(customCommandLines);
  12. System.out.println("Please specify an action.");
  13. return 1;
  14. }
  15. // get action
  16. // 获取flink将要执行的操作
  17. String action = args[0];
  18. // remove action from parameters
  19. // 移除操作参数
  20. final String[] params = Arrays.copyOfRange(args, 1, args.length);
  21. try {
  22. // do action 行为操作主要有一下几种:
  23. // run,run-application,info,list,cancel,stop,savepoint
  24. switch (action) {
  25. case ACTION_RUN:
  26. run(params);
  27. return 0;
  28. case ACTION_RUN_APPLICATION:
  29. runApplication(params);
  30. return 0;
  31. case ACTION_LIST:
  32. list(params);
  33. return 0;
  34. case ACTION_INFO:
  35. info(params);
  36. return 0;
  37. case ACTION_CANCEL:
  38. cancel(params);
  39. return 0;
  40. case ACTION_STOP:
  41. stop(params);
  42. return 0;
  43. case ACTION_SAVEPOINT:
  44. savepoint(params);
  45. return 0;
  46. case "-h":
  47. case "--help":
  48. CliFrontendParser.printHelp(customCommandLines);
  49. return 0;
  50. case "-v":
  51. case "--version":
  52. String version = EnvironmentInformation.getVersion();
  53. String commitID = EnvironmentInformation.getRevisionInformation().commitId;
  54. System.out.print("Version: " + version);
  55. System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID);
  56. return 0;
  57. default:
  58. System.out.printf("\"%s\" is not a valid action.\n", action);
  59. System.out.println();
  60. System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
  61. System.out.println();
  62. System.out.println("Specify the version option (-v or --version) to print Flink version.");
  63. System.out.println();
  64. System.out.println("Specify the help option (-h or --help) to get help on the command.");
  65. return 1;
  66. }
  67. } catch (CliArgsException ce) {
  68. return handleArgException(ce);
  69. } catch (ProgramParametrizationException ppe) {
  70. return handleParametrizationException(ppe);
  71. } catch (ProgramMissingJobException pmje) {
  72. return handleMissingJobException();
  73. } catch (Exception e) {
  74. return handleError(e);
  75. }
  76. }

接下来我们进一步分析run()方法

  1. /**
  2. * Executions the run action.
  3. *
  4. * @param args Command line arguments for the run action.
  5. */
  6. protected void run(String[] args) throws Exception {
  7. LOG.info("Running 'run' command.");
  8. // 解析命令行参数
  9. final Options commandOptions = CliFrontendParser.getRunCommandOptions();
  10. final CommandLine commandLine = getCommandLine(commandOptions, args, true);
  11. // evaluate help flag
  12. if (commandLine.hasOption(HELP_OPTION.getOpt())) {
  13. CliFrontendParser.printHelpForRun(customCommandLines);
  14. return;
  15. }
  16. // 获取有效的命令行,上述加载的三种客户端形式
  17. final CustomCommandLine activeCommandLine =
  18. validateAndGetActiveCommandLine(checkNotNull(commandLine));
  19. final ProgramOptions programOptions = ProgramOptions.create(commandLine);
  20. // jar包,并获取对应的依赖
  21. final List<URL> jobJars = getJobJarAndDependencies(programOptions);
  22. //
  23. final Configuration effectiveConfiguration = getEffectiveConfiguration(
  24. activeCommandLine, commandLine, programOptions, jobJars);
  25. LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
  26. // 程序包
  27. final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);
  28. try {
  29. executeProgram(effectiveConfiguration, program);
  30. } finally {
  31. program.deleteExtractedLibraries();
  32. }
  33. }

这段代码主要有三部分,解析运行参数,从用户提交的jar包封装为PackagedProgram,还有执行程序以及清理操作。

这里需要调研下如何获取活跃的commandLine

  1. /**
  2. * Gets the custom command-line for the arguments.
  3. * @param commandLine The input to the command-line.
  4. * @return custom command-line which is active (may only be one at a time)
  5. */
  6. public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
  7. LOG.debug("Custom commandlines: {}", customCommandLines);
  8. for (CustomCommandLine cli : customCommandLines) {
  9. LOG.debug("Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
  10. if (cli.isActive(commandLine)) {
  11. return cli;
  12. }
  13. }
  14. throw new IllegalStateException("No valid command-line found.");
  15. }

这个方法很简单,从CustomCommandLine中获取到一个active状态的commandLine会立即返回。CustomCommandLine在初始化CliFrontend时会先初始化FlinkYarnSessionCli,然后初始化DefaultCli。因此该方法会在FlinkYarnSessionCli为active时优先返回FlinkYarnSessionCli。对于DefaultCli,它的isActive方法总是返回true。
FlinkYarnSessionCli的isActive方法:

  1. @Override
  2. public boolean isActive(CommandLine commandLine) {
  3. if (!super.isActive(commandLine)) {
  4. return (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
  5. }
  6. return true;
  7. }

下面分析下executeProgram方法, 进一步调用ClientUtils.executeProgram方法。

  1. public static void executeProgram(
  2. PipelineExecutorServiceLoader executorServiceLoader,
  3. Configuration configuration,
  4. PackagedProgram program,
  5. boolean enforceSingleJobExecution,
  6. boolean suppressSysout) throws ProgramInvocationException {
  7. checkNotNull(executorServiceLoader);
  8. // 用户代码类加载器
  9. final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
  10. // 上下文类加载器
  11. final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
  12. try {
  13. // 使用用户类加载器
  14. Thread.currentThread().setContextClassLoader(userCodeClassLoader);
  15. LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
  16. // Execution Environment for remote execution with the Client
  17. // 执行环境与客户端远程执行
  18. ContextEnvironment.setAsContext(
  19. executorServiceLoader,
  20. configuration,
  21. userCodeClassLoader,
  22. enforceSingleJobExecution,
  23. suppressSysout);
  24. // 流上下文环境
  25. StreamContextEnvironment.setAsContext(
  26. executorServiceLoader,
  27. configuration,
  28. userCodeClassLoader,
  29. enforceSingleJobExecution,
  30. suppressSysout);
  31. try {
  32. // 这种方法假定上下文环境准备,或者执行将默认本地执行。
  33. program.invokeInteractiveModeForExecution();
  34. } finally {
  35. ContextEnvironment.unsetAsContext();
  36. StreamContextEnvironment.unsetAsContext();
  37. }
  38. } finally {
  39. Thread.currentThread().setContextClassLoader(contextClassLoader);
  40. }
  41. }

executeProgram方法主要完成加载器类替换用户类加载器,设置环境,执行程序;
invokeInteractiveModeForExecution方法主要逻辑如下所示:

  1. /**
  2. * This method assumes that the context environment is prepared, or the execution
  3. * will be a local execution by default.
  4. */
  5. public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
  6. callMainMethod(mainClass, args);
  7. }

callMainMethod开始调用主函数,callMainMethod的主要逻辑如下所示

  1. private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
  2. Method mainMethod;
  3. // 类必须是public
  4. if (!Modifier.isPublic(entryClass.getModifiers())) {
  5. throw new ProgramInvocationException("The class " + entryClass.getName() + " must be public.");
  6. }
  7. try {
  8. // 寻找main方法
  9. mainMethod = entryClass.getMethod("main", String[].class);
  10. } catch (NoSuchMethodException e) {
  11. throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");
  12. } catch (Throwable t) {
  13. throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +
  14. entryClass.getName() + ": " + t.getMessage(), t);
  15. }
  16. // main必须是static
  17. if (!Modifier.isStatic(mainMethod.getModifiers())) {
  18. throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method.");
  19. }
  20. // main必须是public
  21. if (!Modifier.isPublic(mainMethod.getModifiers())) {
  22. throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method.");
  23. }
  24. try {
  25. // 开始调用main方法
  26. mainMethod.invoke(null, (Object) args);
  27. } catch (IllegalArgumentException e) {
  28. throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e);
  29. } catch (IllegalAccessException e) {
  30. throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e);
  31. } catch (InvocationTargetException e) {
  32. Throwable exceptionInMethod = e.getTargetException();
  33. if (exceptionInMethod instanceof Error) {
  34. throw (Error) exceptionInMethod;
  35. } else if (exceptionInMethod instanceof ProgramParametrizationException) {
  36. throw (ProgramParametrizationException) exceptionInMethod;
  37. } else if (exceptionInMethod instanceof ProgramInvocationException) {
  38. throw (ProgramInvocationException) exceptionInMethod;
  39. } else {
  40. throw new ProgramInvocationException("The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod);
  41. }
  42. } catch (Throwable t) {
  43. throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t);
  44. }
  45. }

逻辑:
1.首先判断类是public
2.寻找到main方法
3.判断main是static
4.判断main是public
5.执行我们编写的业务代码

到这里便开始执行用户编写的代码