CliFronted是FLink提交任务的入口以及其他命令如取消等命令入口

    提交流程分析:

    1. public CliFrontend(
    2. Configuration configuration,
    3. ClusterClientServiceLoader clusterClientServiceLoader,
    4. List<CustomCommandLine> customCommandLines) {
    5. this.configuration = checkNotNull(configuration);
    6. this.customCommandLines = checkNotNull(customCommandLines);
    7. this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
    8. FileSystem.initialize(
    9. configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
    10. this.customCommandLineOptions = new Options();
    11. for (CustomCommandLine customCommandLine : customCommandLines) {
    12. customCommandLine.addGeneralOptions(customCommandLineOptions);
    13. customCommandLine.addRunOptions(customCommandLineOptions);
    14. }
    15. this.clientTimeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
    16. this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
    17. }

    ClusterClientServiceLoader**:SPI**加载对应的ClusterClientFactory,默认是 org.apache.flink.client.deployment.StandaloneClientFactory

    customCommandLines:默认含有 Generic Default两种,获取active的CommandLine,默认是Default
    Generic 是run-application模式,1.11开始出现https://blog.csdn.net/yuchuanchen/article/details/107617738
    不同的customCommandLines最终转换得到的Configuration是不同的

    PackagedProgram:相关的配置信息

    • jar包
    • classPath
    • 入口类名称
    • Configuration
    • mainClass
    • userCodeClassLoader
    • ……


    DefaultExecutorServiceLoader: SPI加载对应的**PipelineExecutorFactory,默认是org.apache.flink.client.deployment.executors.RemoteExecutorFactory
    org.apache.flink.client.deployment.executors.LocalExecutorFactory

    设置环境Context

    1. ContextEnvironment.setAsContext(
    2. executorServiceLoader,
    3. configuration,
    4. userCodeClassLoader,
    5. enforceSingleJobExecution,
    6. suppressSysout);
    7. StreamContextEnvironment.setAsContext(
    8. executorServiceLoader,
    9. configuration,
    10. userCodeClassLoader,
    11. enforceSingleJobExecution,
    12. suppressSysout);

    执行客户Main方法


    **