1.flink.sh文件
Flink提交任务的脚本为flink.sh,该文件位于 flink-dist/src/main/flink-bin/bin/ 目录下,我们去寻找flink.sh中Java入口代码。
## 省略
exec
$JAVA_RUN $JVM_ARGS
$FLINK_ENV_JAVA_OPTS
"${log_setting[@]}"
-classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`"
org.apache.flink.client.cli.CliFrontend "$@"
通过flink.sh不难发现Flink提交任务的入口类为CliFrontend。找到这个类的main方法:
/**
* Submits the job based on the arguments.
*/
public static void main(final String[] args) {
// 日志打印环境配置信息
// 例如:代码版本,当前用户,Java版本,jvm参数
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
// 读取flink配置文件目录
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
// 读取flink配置文件到configuration对象中
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
// 加载自定义命令行
final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
try {
// 实例化 CliFrontend
final CliFrontend cli = new CliFrontend(
configuration,
customCommandLines);
// 安装认证配置信息
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
// 在配置环境下调用启动参数
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseAndRun(args));
System.exit(retCode);
}
catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
System.exit(31);
}
}
有以上分析后,我们发现flink的主要实现逻辑位于
cli.parseAndRun(args)
在分析主要逻辑之前,我们注重分析下main方法的各个执行环节。
�getConfigurationDirectoryFromEnv方法的源码:
public static String getConfigurationDirectoryFromEnv() {
// 从FLINK_CONF_DIR环境变量获取Flink配置文件路径
String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
// 如果配置了该环境变量,并且目录存在,则直接返回
if (location != null) {
if (new File(location).exists()) {
return location;
}
else {
throw new RuntimeException("The configuration directory '" + location + "', specified in the '" +
ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist.");
}
}
// 如果没有配置环境变量,从上层目录的conf寻找(../conf)
else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
location = CONFIG_DIRECTORY_FALLBACK_1;
}
// 如果没有配置环境变量,从当前目录的conf寻找(conf)
else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
location = CONFIG_DIRECTORY_FALLBACK_2;
}
// 如果都没有找到,系统异常退出
else {
throw new RuntimeException("The configuration directory was not specified. " +
"Please specify the directory containing the configuration file through the '" +
ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable.");
}
return location;
}
loadCustomCommandLines方法源码:
public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
// GenericCLI --> flinkYarnSessionCLI --> DefaultCLI顺序加载命令行
List<CustomCommandLine> customCommandLines = new ArrayList<>();
customCommandLines.add(new GenericCLI(configuration, configurationDirectory));
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
// yarn回话的命令行接口,这里所有选项使用y/yarn前缀指定
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
customCommandLines.add(
loadCustomCommandLine(flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
final String errorYarnSessionCLI = "org.apache.flink.yarn.cli.FallbackYarnSessionCli";
try {
LOG.info("Loading FallbackYarnSessionCli");
customCommandLines.add(
loadCustomCommandLine(errorYarnSessionCLI, configuration));
} catch (Exception exception) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
}
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
// 注意:默认客户端必须最后添加,getActiveCustomCommandLine方法将会按照CustomCommandLine顺序判断
// DefaultCLI的isActive方法一直返回true
customCommandLines.add(new DefaultCLI());
return customCommandLines;
}
�接下来,分析主要逻辑 cli.parseAndRun(args) 方法
/**
* Parses the command line arguments and starts the requested action.
* 解析命令行参数,并开始请求操作
* @param args command line arguments of the client.
* @return The return code of the program
*/
public int parseAndRun(String[] args) {
// check for action
// 检查命令行 个数
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// get action
// 获取flink将要执行的操作
String action = args[0];
// remove action from parameters
// 移除操作参数
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action 行为操作主要有一下几种:
// run,run-application,info,list,cancel,stop,savepoint
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
case ACTION_INFO:
info(params);
return 0;
case ACTION_CANCEL:
cancel(params);
return 0;
case ACTION_STOP:
stop(params);
return 0;
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println("Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println("Specify the help option (-h or --help) to get help on the command.");
return 1;
}
} catch (CliArgsException ce) {
return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {
return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {
return handleMissingJobException();
} catch (Exception e) {
return handleError(e);
}
}
接下来我们进一步分析run()方法
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
// 解析命令行参数
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
// 获取有效的命令行,上述加载的三种客户端形式
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
// jar包,并获取对应的依赖
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
//
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
// 程序包
final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);
try {
executeProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
}
这段代码主要有三部分,解析运行参数,从用户提交的jar包封装为PackagedProgram,还有执行程序以及清理操作。
这里需要调研下如何获取活跃的commandLine
/**
* Gets the custom command-line for the arguments.
* @param commandLine The input to the command-line.
* @return custom command-line which is active (may only be one at a time)
*/
public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {
LOG.debug("Custom commandlines: {}", customCommandLines);
for (CustomCommandLine cli : customCommandLines) {
LOG.debug("Checking custom commandline {}, isActive: {}", cli, cli.isActive(commandLine));
if (cli.isActive(commandLine)) {
return cli;
}
}
throw new IllegalStateException("No valid command-line found.");
}
这个方法很简单,从CustomCommandLine中获取到一个active状态的commandLine会立即返回。CustomCommandLine在初始化CliFrontend时会先初始化FlinkYarnSessionCli,然后初始化DefaultCli。因此该方法会在FlinkYarnSessionCli为active时优先返回FlinkYarnSessionCli。对于DefaultCli,它的isActive方法总是返回true。
FlinkYarnSessionCli的isActive方法:
@Override
public boolean isActive(CommandLine commandLine) {
if (!super.isActive(commandLine)) {
return (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
}
return true;
}
下面分析下executeProgram方法, 进一步调用ClientUtils.executeProgram方法。
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout) throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
// 用户代码类加载器
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
// 上下文类加载器
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
// 使用用户类加载器
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
// Execution Environment for remote execution with the Client
// 执行环境与客户端远程执行
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
// 流上下文环境
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
// 这种方法假定上下文环境准备,或者执行将默认本地执行。
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
executeProgram方法主要完成加载器类替换用户类加载器,设置环境,执行程序;
invokeInteractiveModeForExecution方法主要逻辑如下所示:
/**
* This method assumes that the context environment is prepared, or the execution
* will be a local execution by default.
*/
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
callMainMethod(mainClass, args);
}
callMainMethod开始调用主函数,callMainMethod的主要逻辑如下所示
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
Method mainMethod;
// 类必须是public
if (!Modifier.isPublic(entryClass.getModifiers())) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " must be public.");
}
try {
// 寻找main方法
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");
} catch (Throwable t) {
throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +
entryClass.getName() + ": " + t.getMessage(), t);
}
// main必须是static
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method.");
}
// main必须是public
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method.");
}
try {
// 开始调用main方法
mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e);
} catch (IllegalAccessException e) {
throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e);
} catch (InvocationTargetException e) {
Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramParametrizationException) {
throw (ProgramParametrizationException) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
} else {
throw new ProgramInvocationException("The main method caused an error: " + exceptionInMethod.getMessage(), exceptionInMethod);
}
} catch (Throwable t) {
throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t);
}
}
逻辑:
1.首先判断类是public
2.寻找到main方法
3.判断main是static
4.判断main是public
5.执行我们编写的业务代码
到这里便开始执行用户编写的代码