CliFronted是FLink提交任务的入口以及其他命令如取消等命令入口
提交流程分析:
public CliFrontend(
Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader,
List<CustomCommandLine> customCommandLines) {
this.configuration = checkNotNull(configuration);
this.customCommandLines = checkNotNull(customCommandLines);
this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
this.customCommandLineOptions = new Options();
for (CustomCommandLine customCommandLine : customCommandLines) {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
}
ClusterClientServiceLoader**:SPI**加载对应的ClusterClientFactory,默认是 org.apache.flink.client.deployment.StandaloneClientFactory
customCommandLines:默认含有 Generic Default两种,获取active的CommandLine,默认是Default
Generic 是run-application模式,1.11开始出现https://blog.csdn.net/yuchuanchen/article/details/107617738
不同的customCommandLines最终转换得到的Configuration是不同的
PackagedProgram:相关的配置信息
- jar包
- classPath
- 入口类名称
- Configuration
- mainClass
- userCodeClassLoader
- ……
DefaultExecutorServiceLoader: SPI加载对应的**PipelineExecutorFactory,默认是org.apache.flink.client.deployment.executors.RemoteExecutorFactory
org.apache.flink.client.deployment.executors.LocalExecutorFactory
设置环境Context
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
执行客户Main方法
**