CliFronted是FLink提交任务的入口以及其他命令如取消等命令入口
提交流程分析:
public CliFrontend(Configuration configuration,ClusterClientServiceLoader clusterClientServiceLoader,List<CustomCommandLine> customCommandLines) {this.configuration = checkNotNull(configuration);this.customCommandLines = checkNotNull(customCommandLines);this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));this.customCommandLineOptions = new Options();for (CustomCommandLine customCommandLine : customCommandLines) {customCommandLine.addGeneralOptions(customCommandLineOptions);customCommandLine.addRunOptions(customCommandLineOptions);}this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);}
ClusterClientServiceLoader**:SPI**加载对应的ClusterClientFactory,默认是 org.apache.flink.client.deployment.StandaloneClientFactory
customCommandLines:默认含有 Generic Default两种,获取active的CommandLine,默认是Default
Generic 是run-application模式,1.11开始出现https://blog.csdn.net/yuchuanchen/article/details/107617738
不同的customCommandLines最终转换得到的Configuration是不同的
PackagedProgram:相关的配置信息
- jar包
- classPath
- 入口类名称
- Configuration
- mainClass
- userCodeClassLoader
- ……
DefaultExecutorServiceLoader: SPI加载对应的**PipelineExecutorFactory,默认是org.apache.flink.client.deployment.executors.RemoteExecutorFactory
org.apache.flink.client.deployment.executors.LocalExecutorFactory
设置环境Context
ContextEnvironment.setAsContext(executorServiceLoader,configuration,userCodeClassLoader,enforceSingleJobExecution,suppressSysout);StreamContextEnvironment.setAsContext(executorServiceLoader,configuration,userCodeClassLoader,enforceSingleJobExecution,suppressSysout);
执行客户Main方法
**
