1.flink.sh文件
Flink提交任务的脚本为flink.sh,该文件位于 flink-dist/src/main/flink-bin/bin/ 目录下,我们去寻找flink.sh中Java入口代码。
## 省略exec$JAVA_RUN $JVM_ARGS$FLINK_ENV_JAVA_OPTS"${log_setting[@]}"-classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"org.apache.flink.client.cli.CliFrontend "$@"
通过flink.sh不难发现Flink提交任务的入口类为CliFrontend。找到这个类的main方法:
/*** Submits the job based on the arguments.*/public static void main(final String[] args) {// 日志打印环境配置信息// 例如:代码版本,当前用户,Java版本,jvm参数EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);// 1. find the configuration directory// 读取flink配置文件目录final String configurationDirectory = getConfigurationDirectoryFromEnv();// 2. load the global configuration// 读取flink配置文件到configuration对象中final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);// 3. load the custom command lines// 加载自定义命令行final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(configuration,configurationDirectory);try {// 实例化 CliFrontendfinal CliFrontend cli = new CliFrontend(configuration,customCommandLines);// 安装认证配置信息SecurityUtils.install(new SecurityConfiguration(cli.configuration));// 在配置环境下调用启动参数int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));System.exit(retCode);}catch (Throwable t) {final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);LOG.error("Fatal error while running command line interface.", strippedThrowable);strippedThrowable.printStackTrace();System.exit(31);}}
有以上分析后,我们发现flink的主要实现逻辑位于
cli.parseAndRun(args)
在分析主要逻辑之前,我们注重分析下main方法的各个执行环节。
�getConfigurationDirectoryFromEnv方法的源码:
public static String getConfigurationDirectoryFromEnv() {// 从FLINK_CONF_DIR环境变量获取Flink配置文件路径String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);// 如果配置了该环境变量,并且目录存在,则直接返回if (location != null) {if (new File(location).exists()) {return location;}else {throw new RuntimeException("The configuration directory '" + location + "', specified in the '" +ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist.");}}// 如果没有配置环境变量,从上层目录的conf寻找(../conf)else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {location = CONFIG_DIRECTORY_FALLBACK_1;}// 如果没有配置环境变量,从当前目录的conf寻找(conf)else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {location = CONFIG_DIRECTORY_FALLBACK_2;}// 如果都没有找到,系统异常退出else {throw new RuntimeException("The configuration directory was not specified. " +"Please specify the directory containing the configuration file through the '" +ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable.");}return location;}
loadCustomCommandLines方法源码:
public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {// GenericCLI --> flinkYarnSessionCLI --> DefaultCLI顺序加载命令行List<CustomCommandLine> customCommandLines = new ArrayList<>();customCommandLines.add(new GenericCLI(configuration, configurationDirectory));// Command line interface of the YARN session, with a special initialization here// to prefix all options with y/yarn.// yarn回话的命令行接口,这里所有选项使用y/yarn前缀指定final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";try {customCommandLines.add(loadCustomCommandLine(flinkYarnSessionCLI,configuration,configurationDirectory,"y","yarn"));} catch (NoClassDefFoundError | Exception e) {final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";try {LOG.info("Loading FallbackYarnSessionCli");customCommandLines.add(loadCustomCommandLine(errorYarnSessionCLI, configuration));} catch (Exception exception) {LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);}}// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the// active CustomCommandLine in order and DefaultCLI isActive always return true.// 注意:默认客户端必须最后添加,getActiveCustomCommandLine方法将会按照CustomCommandLine顺序判断// DefaultCLI的isActive方法一直返回truecustomCommandLines.add(new DefaultCLI());return customCommandLines;}
�接下来,分析主要逻辑 cli.parseAndRun(args) 方法
/*** Parses the command line arguments and starts the requested action.* 解析命令行参数,并开始请求操作* @param args command line arguments of the client.* @return The return code of the program*/public int parseAndRun(String[] args) {// check for action// 检查命令行 个数if (args.length < 1) {CliFrontendParser.printHelp(customCommandLines);System.out.println("Please specify an action.");return 1;}// get action// 获取flink将要执行的操作String action = args[0];// remove action from parameters// 移除操作参数final String[] params = Arrays.copyOfRange(args, 1, args.length);try {// do action 行为操作主要有一下几种:// run,run-application,info,list,cancel,stop,savepointswitch (action) {case ACTION_RUN:run(params);return 0;case ACTION_RUN_APPLICATION:runApplication(params);return 0;case ACTION_LIST:list(params);return 0;case ACTION_INFO:info(params);return 0;case ACTION_CANCEL:cancel(params);return 0;case ACTION_STOP:stop(params);return 0;case ACTION_SAVEPOINT:savepoint(params);return 0;case "-h":case "--help":CliFrontendParser.printHelp(customCommandLines);return 0;case "-v":case "--version":String version = EnvironmentInformation.getVersion();String commitID = EnvironmentInformation.getRevisionInformation().commitId;System.out.print("Version: " + version);System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID);return 0;default:System.out.printf("\"%s\" is not a valid action.\n", action);System.out.println();System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");System.out.println();System.out.println("Specify the version option (-v or --version) to print Flink version.");System.out.println();System.out.println("Specify the help option (-h or --help) to get help on the command.");return 1;}} catch (CliArgsException ce) {return handleArgException(ce);} catch (ProgramParametrizationException ppe) {return handleParametrizationException(ppe);} catch (ProgramMissingJobException pmje) {return handleMissingJobException();} catch (Exception e) {return handleError(e);}}
接下来我们进一步分析run()方法
/*** Executions the run action.** @param args Command line arguments for the run action.*/protected void run(String[] args) throws Exception {LOG.info("Running 'run' command.");// 解析命令行参数final Options commandOptions = CliFrontendParser.getRunCommandOptions();final CommandLine commandLine = getCommandLine(commandOptions, args, true);// evaluate help flagif (commandLine.hasOption(HELP_OPTION.getOpt())) {CliFrontendParser.printHelpForRun(customCommandLines);return;}// 获取有效的命令行,上述加载的三种客户端形式final CustomCommandLine activeCommandLine =validateAndGetActiveCommandLine(checkNotNull(commandLine));final ProgramOptions programOptions = ProgramOptions.create(commandLine);// jar包,并获取对应的依赖final List<URL> jobJars = getJobJarAndDependencies(programOptions);//final Configuration effectiveConfiguration = getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);LOG.debug("Effective executor configuration: {}", effectiveConfiguration);// 程序包final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);try {executeProgram(effectiveConfiguration, program);} finally {program.deleteExtractedLibraries();}}
这段代码主要有三部分,解析运行参数,从用户提交的jar包封装为PackagedProgram,还有执行程序以及清理操作。
这里需要调研下如何获取活跃的commandLine
/*** Gets the custom command-line for the arguments.* @param commandLine The input to the command-line.* @return custom command-line which is active (may only be one at a time)*/public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {LOG.debug("Custom commandlines: {}", customCommandLines);for (CustomCommandLine cli : customCommandLines) {LOG.debug("Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));if (cli.isActive(commandLine)) {return cli;}}throw new IllegalStateException("No valid command-line found.");}
这个方法很简单,从CustomCommandLine中获取到一个active状态的commandLine会立即返回。CustomCommandLine在初始化CliFrontend时会先初始化FlinkYarnSessionCli,然后初始化DefaultCli。因此该方法会在FlinkYarnSessionCli为active时优先返回FlinkYarnSessionCli。对于DefaultCli,它的isActive方法总是返回true。
FlinkYarnSessionCli的isActive方法:
@Overridepublic boolean isActive(CommandLine commandLine) {if (!super.isActive(commandLine)) {return (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);}return true;}
下面分析下executeProgram方法, 进一步调用ClientUtils.executeProgram方法。
public static void executeProgram(PipelineExecutorServiceLoader executorServiceLoader,Configuration configuration,PackagedProgram program,boolean enforceSingleJobExecution,boolean suppressSysout) throws ProgramInvocationException {checkNotNull(executorServiceLoader);// 用户代码类加载器final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();// 上下文类加载器final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();try {// 使用用户类加载器Thread.currentThread().setContextClassLoader(userCodeClassLoader);LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));// Execution Environment for remote execution with the Client// 执行环境与客户端远程执行ContextEnvironment.setAsContext(executorServiceLoader,configuration,userCodeClassLoader,enforceSingleJobExecution,suppressSysout);// 流上下文环境StreamContextEnvironment.setAsContext(executorServiceLoader,configuration,userCodeClassLoader,enforceSingleJobExecution,suppressSysout);try {// 这种方法假定上下文环境准备,或者执行将默认本地执行。program.invokeInteractiveModeForExecution();} finally {ContextEnvironment.unsetAsContext();StreamContextEnvironment.unsetAsContext();}} finally {Thread.currentThread().setContextClassLoader(contextClassLoader);}}
executeProgram方法主要完成加载器类替换用户类加载器,设置环境,执行程序;
invokeInteractiveModeForExecution方法主要逻辑如下所示:
/*** This method assumes that the context environment is prepared, or the execution* will be a local execution by default.*/public void invokeInteractiveModeForExecution() throws ProgramInvocationException {callMainMethod(mainClass, args);}
callMainMethod开始调用主函数,callMainMethod的主要逻辑如下所示
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {Method mainMethod;// 类必须是publicif (!Modifier.isPublic(entryClass.getModifiers())) {throw new ProgramInvocationException("The class " + entryClass.getName() + " must be public.");}try {// 寻找main方法mainMethod = entryClass.getMethod("main", String[].class);} catch (NoSuchMethodException e) {throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");} catch (Throwable t) {throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +entryClass.getName() + ": " + t.getMessage(), t);}// main必须是staticif (!Modifier.isStatic(mainMethod.getModifiers())) {throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method.");}// main必须是publicif (!Modifier.isPublic(mainMethod.getModifiers())) {throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method.");}try {// 开始调用main方法mainMethod.invoke(null, (Object) args);} catch (IllegalArgumentException e) {throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e);} catch (IllegalAccessException e) {throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e);} catch (InvocationTargetException e) {Throwable exceptionInMethod = e.getTargetException();if (exceptionInMethod instanceof Error) {throw (Error) exceptionInMethod;} else if (exceptionInMethod instanceof ProgramParametrizationException) {throw (ProgramParametrizationException) exceptionInMethod;} else if (exceptionInMethod instanceof ProgramInvocationException) {throw (ProgramInvocationException) exceptionInMethod;} else {throw new ProgramInvocationException("The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod);}} catch (Throwable t) {throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t);}}
逻辑:
1.首先判断类是public
2.寻找到main方法
3.判断main是static
4.判断main是public
5.执行我们编写的业务代码
到这里便开始执行用户编写的代码
