启动脚本分析

TaskManager的启动方式为taskmanager.sh start。因此我们如果想要知道程序入口类,必须首先分析这个脚本。
这个脚本比较长,我们重点关注如下片段:

  1. # ...
  2. # 设置ENTRYPOINT变量值为taskexecutor
  3. ENTRYPOINT=taskexecutor
  4. # ...
  5. if [[ $STARTSTOP == "start-foreground" ]]; then
  6. exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
  7. else
  8. if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
  9. # Start a single TaskManager
  10. # 我们关注这里,不使用NUMA方式后台启动
  11. "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
  12. else
  13. # Example output from `numactl --show` on an AWS c4.8xlarge:
  14. # policy: default
  15. # preferred node: current
  16. # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
  17. # cpubind: 0 1
  18. # nodebind: 0 1
  19. # membind: 0 1
  20. read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
  21. for NODE_ID in "${NODE_LIST[@]:1}"; do
  22. # Start a TaskManager for each NUMA node
  23. numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
  24. done
  25. fi
  26. fi

通过分析上面的脚本,我们看到如果没有使用start-foreground(前台模式启动),实际上调用的是flink-daemon.sh脚本。此脚本的使用方式和参数示例为:

  1. flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]

为了弄清楚Java的入口类,我们接着分析flink-daemon.sh,发现如下片段:

  1. # ...
  2. STARTSTOP=$1
  3. # 经上面分析可知,通过taskmanager.sh执行,DAEMON的值为taskexecutor
  4. DAEMON=$2
  5. # ...
  6. case $DAEMON in
  7. (taskexecutor)
  8. CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
  9. ;;
  10. (zookeeper)
  11. CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
  12. ;;
  13. (historyserver)
  14. CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
  15. ;;
  16. (standalonesession)
  17. CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
  18. ;;
  19. (standalonejob)
  20. CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
  21. ;;
  22. (*)
  23. echo "Unknown daemon '${DAEMON}'. $USAGE."
  24. exit 1
  25. ;;
  26. esac
  27. # ...

不难发现,DAEMON变量的值为taskexecutor,实际的入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTaskManagerRunner就是TaskManager启动的入口类。

TaskManagerRunner

TaskManagerRunner是TaskManager在yarn模式和standalone模式下的启动类。
我们查看下main方法:

  1. public static void main(String[] args) throws Exception {
  2. // startup checks and logging
  3. // 日志打印环境信息
  4. EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
  5. // 注册信号处理句柄
  6. // 在Linux下响应TERM,HUP和INT
  7. SignalHandler.register(LOG);
  8. // 增加shutdownlook,在JVM关闭之前回调
  9. // 让JVM关闭延迟5秒钟
  10. JvmShutdownSafeguard.installAsShutdownHook(LOG);
  11. // 获取并打印最大打开文件句柄数限制
  12. long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
  13. if (maxOpenFileHandles != -1L) {
  14. LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
  15. } else {
  16. LOG.info("Cannot determine the maximum number of open file descriptors");
  17. }
  18. // 启动TaskManager
  19. runTaskManagerSecurely(args);
  20. }

继续跟踪runTaskManagerSecurely方法,内容如下:

  1. public static void runTaskManagerSecurely(String[] args) {
  2. try {
  3. // // 读取flink-conf.yaml和命令行传入的动态参数,作为配置信息
  4. Configuration configuration = loadConfiguration(args);
  5. // 继续调用runTaskManagerSecurely
  6. runTaskManagerSecurely(configuration);
  7. } catch (Throwable t) {
  8. final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
  9. LOG.error("TaskManager initialization failed.", strippedThrowable);
  10. System.exit(STARTUP_FAILURE_RETURN_CODE);
  11. }
  12. }

继续跟踪runTaskManagerSecurely方法,内容如下:

  1. public static void runTaskManagerSecurely(Configuration configuration) throws Exception {
  2. // 替换掉配置文件中包含优雅退出的指令
  3. replaceGracefulExitWithHaltIfConfigured(configuration);
  4. // 从配置文件中创建插件管理器
  5. final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
  6. // 初始化共享文件系统配置(FileSystemFactory),比如HDFS
  7. FileSystem.initialize(configuration, pluginManager);
  8. // 读取安全相关配置,包含flink,JAAS,Hadoop和Zookeeper的安全配置
  9. SecurityUtils.install(new SecurityConfiguration(configuration));
  10. // 以安全认证环境下调用runTaskManager
  11. SecurityUtils.getInstalledContext().runSecured(() -> {
  12. runTaskManager(configuration, pluginManager);
  13. return null;
  14. });
  15. }

该方法载入了Flink的主配置文件,初始化了文件系统和服务安全配置。启动TaskManager的方法在runTaskManager。如下所示:

  1. public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {
  2. // 创建一个TaskManagerRunner
  3. final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager,
  4. TaskManagerRunner::createTaskExecutorService);
  5. // 调用start
  6. taskManagerRunner.start();
  7. }

首先我们分析下构造函数:

  1. public TaskManagerRunner(
  2. Configuration configuration,
  3. PluginManager pluginManager,
  4. TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
  5. this.configuration = checkNotNull(configuration);
  6. // 获取Akka超时时间
  7. timeout = AkkaUtils.getTimeoutAsTime(configuration);
  8. // 创建一个task manager的线程池
  9. // corePoolSize和机器CPU数量一致
  10. this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
  11. Hardware.getNumberCPUCores(),
  12. new ExecutorThreadFactory("taskmanager-future"));
  13. // 创建高可用服务
  14. // 负责选举JobManager和ResourceManager和获取leader信息
  15. highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
  16. configuration,
  17. executor,
  18. HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
  19. JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
  20. // 创建RPC服务,和其他Flink进程互相通信的时候使用
  21. // 用于连接一个RpcEndpoint,连接成功之后返回一个RpcGateway
  22. rpcService = createRpcService(configuration, highAvailabilityServices);
  23. this.resourceId = getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());
  24. // 创建心跳服务
  25. HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
  26. // 创建监控指标注册
  27. // 用于记录所有的metrics,连接MetricGroup和MetricReporter
  28. metricRegistry = new MetricRegistryImpl(
  29. MetricRegistryConfiguration.fromConfiguration(configuration),
  30. ReporterSetup.fromConfiguration(configuration, pluginManager));
  31. // 开启metrics查询服务
  32. // 以key-value方式返回Flink中已注册的metrics
  33. final RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
  34. metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
  35. // 创建BlobCache服务
  36. blobCacheService = new BlobCacheService(
  37. configuration, highAvailabilityServices.createBlobStore(), null
  38. );
  39. // 创建外部资源信息Provider
  40. final ExternalResourceInfoProvider externalResourceInfoProvider =
  41. ExternalResourceUtils.createStaticExternalResourceInfoProvider(
  42. ExternalResourceUtils.getExternalResourceAmountMap(configuration),
  43. ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));
  44. // 启动task manager
  45. // 稍后分析
  46. taskExecutorService = taskExecutorServiceFactory.createTaskExecutor(
  47. this.configuration,
  48. this.resourceId,
  49. rpcService,
  50. highAvailabilityServices,
  51. heartbeatServices,
  52. metricRegistry,
  53. blobCacheService,
  54. false,
  55. externalResourceInfoProvider,
  56. this);
  57. this.terminationFuture = new CompletableFuture<>();
  58. this.shutdown = false;
  59. handleUnexpectedTaskExecutorServiceTermination();
  60. MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
  61. }

在分析了层层封装之后,createTaskExecutor方法后终于找到startTaskManager方法。该方法创建出一个TaskExecutor对象,如下所示:

  1. public static TaskExecutor startTaskManager(
  2. Configuration configuration,
  3. ResourceID resourceID,
  4. RpcService rpcService,
  5. HighAvailabilityServices highAvailabilityServices,
  6. HeartbeatServices heartbeatServices,
  7. MetricRegistry metricRegistry,
  8. BlobCacheService blobCacheService,
  9. boolean localCommunicationOnly,
  10. ExternalResourceInfoProvider externalResourceInfoProvider,
  11. FatalErrorHandler fatalErrorHandler) throws Exception {
  12. checkNotNull(configuration);
  13. checkNotNull(resourceID);
  14. checkNotNull(rpcService);
  15. checkNotNull(highAvailabilityServices);
  16. LOG.info("Starting TaskManager with ResourceID: {}", resourceID.getStringWithMetadata());
  17. // 获取外部访问地址
  18. String externalAddress = rpcService.getAddress();
  19. // 获取task executor资源详情
  20. // 包含CPU核数,task堆内存,tsk堆外内存和managed内存
  21. final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
  22. // 获取Task Manager服务配置
  23. TaskManagerServicesConfiguration taskManagerServicesConfiguration =
  24. TaskManagerServicesConfiguration.fromConfiguration(
  25. configuration,
  26. resourceID,
  27. externalAddress,
  28. localCommunicationOnly,
  29. taskExecutorResourceSpec);
  30. // 创建task manager的MetricGroup
  31. Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
  32. metricRegistry,
  33. externalAddress,
  34. resourceID,
  35. taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
  36. // 创建用于IO任务的线程池
  37. final ExecutorService ioExecutor = Executors.newFixedThreadPool(
  38. taskManagerServicesConfiguration.getNumIoThreads(),
  39. new ExecutorThreadFactory("flink-taskexecutor-io"));
  40. // 创建Task Manager服务,是其他多种资源或服务的容器
  41. // 它涉及的服务也非常多,稍后我们单独分析
  42. TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
  43. taskManagerServicesConfiguration,
  44. blobCacheService.getPermanentBlobService(),
  45. taskManagerMetricGroup.f1,
  46. ioExecutor,
  47. fatalErrorHandler);
  48. MetricUtils.instantiateFlinkMemoryMetricGroup(
  49. taskManagerMetricGroup.f1,
  50. taskManagerServices.getTaskSlotTable(),
  51. taskManagerServices::getManagedMemorySize);
  52. // 创建task manager的配置
  53. TaskManagerConfiguration taskManagerConfiguration =
  54. TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);
  55. // 获取metrics查询服务的地址
  56. String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
  57. // 创建task executor
  58. return new TaskExecutor(
  59. rpcService,
  60. taskManagerConfiguration,
  61. highAvailabilityServices,
  62. taskManagerServices,
  63. externalResourceInfoProvider,
  64. heartbeatServices,
  65. taskManagerMetricGroup.f0,
  66. metricQueryServiceAddress,
  67. blobCacheService,
  68. fatalErrorHandler,
  69. new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
  70. createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()));
  71. }

到这里为止,一个完整的TaskExecutor创建完毕。创建的过程涉及到了很多相关服务器的初始化,稍后在文末以脑图形式为大家总结。

TaskManagerServices的fromConfiguration方法

TaskManagerServices是一系列TaskManager服务的容器,包含内存控制器,IO控制器,Shuffle环境等。各个服务的用途计划在后续博客中介绍。
这里我们重点关注它的fromConfiguration方法。如下所示:

  1. /**
  2. * Creates and returns the task manager services.
  3. *
  4. * @param taskManagerServicesConfiguration task manager configuration
  5. * @param permanentBlobService permanentBlobService used by the services
  6. * @param taskManagerMetricGroup metric group of the task manager
  7. * @param ioExecutor executor for async IO operations
  8. * @param fatalErrorHandler to handle class loading OOMs
  9. * @return task manager components
  10. * @throws Exception
  11. */
  12. public static TaskManagerServices fromConfiguration(
  13. TaskManagerServicesConfiguration taskManagerServicesConfiguration,
  14. PermanentBlobService permanentBlobService,
  15. MetricGroup taskManagerMetricGroup,
  16. ExecutorService ioExecutor,
  17. FatalErrorHandler fatalErrorHandler) throws Exception {
  18. // pre-start checks
  19. // 检查temp dir(yarn模式为local_dirs,standalone模式为java.io.tmpdir)目录是否存在,如果不存在会创建文件夹
  20. // 是否temp dir是否是目录,是否可写入
  21. checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
  22. // 创建TaskEventDispatcher
  23. // 任务事件派发器,用于消费任务向生产任务发送TaskEvent
  24. final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
  25. // 创建异步IO管理器
  26. // 根据配置的tmp dir的个数,创建对应数量的读写线程,负责异步读写数据
  27. // start the I/O manager, it will create some temp directories.
  28. final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
  29. // 创建ShuffleEnvironment
  30. // 负责在本地提供一个shuffle环境,使用memory segment作为数据存储
  31. // 可以创建数据写入端ResultPartitionWriter和数据消费端InputGate
  32. final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
  33. taskManagerServicesConfiguration,
  34. taskEventDispatcher,
  35. taskManagerMetricGroup,
  36. ioExecutor);
  37. // 启动ShuffleManager
  38. // 返回TaskManager的数据端口(taskmanager.data.port)
  39. final int listeningDataPort = shuffleEnvironment.start();
  40. // 创建key value状态存储服务
  41. final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
  42. kvStateService.start();
  43. final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation(
  44. taskManagerServicesConfiguration.getResourceID(),
  45. taskManagerServicesConfiguration.getExternalAddress(),
  46. // we expose the task manager location with the listening port
  47. // iff the external data port is not explicitly defined
  48. taskManagerServicesConfiguration.getExternalDataPort() > 0 ?
  49. taskManagerServicesConfiguration.getExternalDataPort() :
  50. listeningDataPort);
  51. // 创建广播变量管理器
  52. final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
  53. // 创建TaskSlotTable,维护task和slot的分配关系
  54. final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
  55. taskManagerServicesConfiguration.getNumberOfSlots(),
  56. taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
  57. taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
  58. taskManagerServicesConfiguration.getPageSize(),
  59. ioExecutor);
  60. final JobTable jobTable = DefaultJobTable.create();
  61. // 创建Job leader服务。Job leader是领导一个job的job manager。
  62. // 一旦某个job manager获得leader角色,或者失去leader状态,会通知JobLeaderListener,位于TaskExecutor.java中
  63. final JobLeaderService jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
  64. // 读取本地状态保存根路径
  65. // taskmanager.state.local.root-dirs
  66. final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
  67. final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
  68. // 创建目录
  69. for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
  70. stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
  71. }
  72. // 创建任务状态管理器
  73. final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
  74. taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
  75. stateRootDirectoryFiles,
  76. ioExecutor);
  77. final boolean failOnJvmMetaspaceOomError =
  78. taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM);
  79. final boolean checkClassLoaderLeak =
  80. taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
  81. final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
  82. permanentBlobService,
  83. BlobLibraryCacheManager.defaultClassLoaderFactory(
  84. taskManagerServicesConfiguration.getClassLoaderResolveOrder(),
  85. taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(),
  86. failOnJvmMetaspaceOomError ? fatalErrorHandler : null,
  87. checkClassLoaderLeak));
  88. // 构建TaskManagerServices
  89. return new TaskManagerServices(
  90. unresolvedTaskManagerLocation,
  91. taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
  92. ioManager,
  93. shuffleEnvironment,
  94. kvStateService,
  95. broadcastVariableManager,
  96. taskSlotTable,
  97. jobTable,
  98. jobLeaderService,
  99. taskStateManager,
  100. taskEventDispatcher,
  101. ioExecutor,
  102. libraryCacheManager);
  103. }

TaskManager的启动

以上只是相关服务的创建逻辑,服务启动的逻辑位于start方法中。
TaskManagerRunnerstart方法如下所示:

附录

TaskExecutor中的重要服务脑图

这些服务的作用在本人后续博客中计划陆续分析。
image.png