启动脚本分析
TaskManager的启动方式为taskmanager.sh start。因此我们如果想要知道程序入口类,必须首先分析这个脚本。
这个脚本比较长,我们重点关注如下片段:
# ...# 设置ENTRYPOINT变量值为taskexecutorENTRYPOINT=taskexecutor# ...if [[ $STARTSTOP == "start-foreground" ]]; thenexec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"elseif [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then# Start a single TaskManager# 我们关注这里,不使用NUMA方式后台启动"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"else# Example output from `numactl --show` on an AWS c4.8xlarge:# policy: default# preferred node: current# physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35# cpubind: 0 1# nodebind: 0 1# membind: 0 1read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")for NODE_ID in "${NODE_LIST[@]:1}"; do# Start a TaskManager for each NUMA nodenumactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"donefifi
通过分析上面的脚本,我们看到如果没有使用start-foreground(前台模式启动),实际上调用的是flink-daemon.sh脚本。此脚本的使用方式和参数示例为:
flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]
为了弄清楚Java的入口类,我们接着分析flink-daemon.sh,发现如下片段:
# ...STARTSTOP=$1# 经上面分析可知,通过taskmanager.sh执行,DAEMON的值为taskexecutorDAEMON=$2# ...case $DAEMON in(taskexecutor)CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner;;(zookeeper)CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer;;(historyserver)CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer;;(standalonesession)CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;;(standalonejob)CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint;;(*)echo "Unknown daemon '${DAEMON}'. $USAGE."exit 1;;esac# ...
不难发现,DAEMON变量的值为taskexecutor,实际的入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner。TaskManagerRunner就是TaskManager启动的入口类。
TaskManagerRunner
TaskManagerRunner是TaskManager在yarn模式和standalone模式下的启动类。
我们查看下main方法:
public static void main(String[] args) throws Exception {// startup checks and logging// 日志打印环境信息EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);// 注册信号处理句柄// 在Linux下响应TERM,HUP和INTSignalHandler.register(LOG);// 增加shutdownlook,在JVM关闭之前回调// 让JVM关闭延迟5秒钟JvmShutdownSafeguard.installAsShutdownHook(LOG);// 获取并打印最大打开文件句柄数限制long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();if (maxOpenFileHandles != -1L) {LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);} else {LOG.info("Cannot determine the maximum number of open file descriptors");}// 启动TaskManagerrunTaskManagerSecurely(args);}
继续跟踪runTaskManagerSecurely方法,内容如下:
public static void runTaskManagerSecurely(String[] args) {try {// // 读取flink-conf.yaml和命令行传入的动态参数,作为配置信息Configuration configuration = loadConfiguration(args);// 继续调用runTaskManagerSecurelyrunTaskManagerSecurely(configuration);} catch (Throwable t) {final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);LOG.error("TaskManager initialization failed.", strippedThrowable);System.exit(STARTUP_FAILURE_RETURN_CODE);}}
继续跟踪runTaskManagerSecurely方法,内容如下:
public static void runTaskManagerSecurely(Configuration configuration) throws Exception {// 替换掉配置文件中包含优雅退出的指令replaceGracefulExitWithHaltIfConfigured(configuration);// 从配置文件中创建插件管理器final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);// 初始化共享文件系统配置(FileSystemFactory),比如HDFSFileSystem.initialize(configuration, pluginManager);// 读取安全相关配置,包含flink,JAAS,Hadoop和Zookeeper的安全配置SecurityUtils.install(new SecurityConfiguration(configuration));// 以安全认证环境下调用runTaskManagerSecurityUtils.getInstalledContext().runSecured(() -> {runTaskManager(configuration, pluginManager);return null;});}
该方法载入了Flink的主配置文件,初始化了文件系统和服务安全配置。启动TaskManager的方法在runTaskManager。如下所示:
public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {// 创建一个TaskManagerRunnerfinal TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager,TaskManagerRunner::createTaskExecutorService);// 调用starttaskManagerRunner.start();}
首先我们分析下构造函数:
public TaskManagerRunner(Configuration configuration,PluginManager pluginManager,TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {this.configuration = checkNotNull(configuration);// 获取Akka超时时间timeout = AkkaUtils.getTimeoutAsTime(configuration);// 创建一个task manager的线程池// corePoolSize和机器CPU数量一致this.executor = java.util.concurrent.Executors.newScheduledThreadPool(Hardware.getNumberCPUCores(),new ExecutorThreadFactory("taskmanager-future"));// 创建高可用服务// 负责选举JobManager和ResourceManager和获取leader信息highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,executor,HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));// 创建RPC服务,和其他Flink进程互相通信的时候使用// 用于连接一个RpcEndpoint,连接成功之后返回一个RpcGatewayrpcService = createRpcService(configuration, highAvailabilityServices);this.resourceId = getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());// 创建心跳服务HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);// 创建监控指标注册// 用于记录所有的metrics,连接MetricGroup和MetricReportermetricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration),ReporterSetup.fromConfiguration(configuration, pluginManager));// 开启metrics查询服务// 以key-value方式返回Flink中已注册的metricsfinal RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);// 创建BlobCache服务blobCacheService = new BlobCacheService(configuration, highAvailabilityServices.createBlobStore(), null);// 创建外部资源信息Providerfinal ExternalResourceInfoProvider externalResourceInfoProvider =ExternalResourceUtils.createStaticExternalResourceInfoProvider(ExternalResourceUtils.getExternalResourceAmountMap(configuration),ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));// 启动task manager// 稍后分析taskExecutorService = taskExecutorServiceFactory.createTaskExecutor(this.configuration,this.resourceId,rpcService,highAvailabilityServices,heartbeatServices,metricRegistry,blobCacheService,false,externalResourceInfoProvider,this);this.terminationFuture = new CompletableFuture<>();this.shutdown = false;handleUnexpectedTaskExecutorServiceTermination();MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);}
在分析了层层封装之后,createTaskExecutor方法后终于找到startTaskManager方法。该方法创建出一个TaskExecutor对象,如下所示:
public static TaskExecutor startTaskManager(Configuration configuration,ResourceID resourceID,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,HeartbeatServices heartbeatServices,MetricRegistry metricRegistry,BlobCacheService blobCacheService,boolean localCommunicationOnly,ExternalResourceInfoProvider externalResourceInfoProvider,FatalErrorHandler fatalErrorHandler) throws Exception {checkNotNull(configuration);checkNotNull(resourceID);checkNotNull(rpcService);checkNotNull(highAvailabilityServices);LOG.info("Starting TaskManager with ResourceID: {}", resourceID.getStringWithMetadata());// 获取外部访问地址String externalAddress = rpcService.getAddress();// 获取task executor资源详情// 包含CPU核数,task堆内存,tsk堆外内存和managed内存final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);// 获取Task Manager服务配置TaskManagerServicesConfiguration taskManagerServicesConfiguration =TaskManagerServicesConfiguration.fromConfiguration(configuration,resourceID,externalAddress,localCommunicationOnly,taskExecutorResourceSpec);// 创建task manager的MetricGroupTuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(metricRegistry,externalAddress,resourceID,taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());// 创建用于IO任务的线程池final ExecutorService ioExecutor = Executors.newFixedThreadPool(taskManagerServicesConfiguration.getNumIoThreads(),new ExecutorThreadFactory("flink-taskexecutor-io"));// 创建Task Manager服务,是其他多种资源或服务的容器// 它涉及的服务也非常多,稍后我们单独分析TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration,blobCacheService.getPermanentBlobService(),taskManagerMetricGroup.f1,ioExecutor,fatalErrorHandler);MetricUtils.instantiateFlinkMemoryMetricGroup(taskManagerMetricGroup.f1,taskManagerServices.getTaskSlotTable(),taskManagerServices::getManagedMemorySize);// 创建task manager的配置TaskManagerConfiguration taskManagerConfiguration =TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);// 获取metrics查询服务的地址String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();// 创建task executorreturn new TaskExecutor(rpcService,taskManagerConfiguration,highAvailabilityServices,taskManagerServices,externalResourceInfoProvider,heartbeatServices,taskManagerMetricGroup.f0,metricQueryServiceAddress,blobCacheService,fatalErrorHandler,new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()));}
到这里为止,一个完整的TaskExecutor创建完毕。创建的过程涉及到了很多相关服务器的初始化,稍后在文末以脑图形式为大家总结。
TaskManagerServices的fromConfiguration方法
TaskManagerServices是一系列TaskManager服务的容器,包含内存控制器,IO控制器,Shuffle环境等。各个服务的用途计划在后续博客中介绍。
这里我们重点关注它的fromConfiguration方法。如下所示:
/*** Creates and returns the task manager services.** @param taskManagerServicesConfiguration task manager configuration* @param permanentBlobService permanentBlobService used by the services* @param taskManagerMetricGroup metric group of the task manager* @param ioExecutor executor for async IO operations* @param fatalErrorHandler to handle class loading OOMs* @return task manager components* @throws Exception*/public static TaskManagerServices fromConfiguration(TaskManagerServicesConfiguration taskManagerServicesConfiguration,PermanentBlobService permanentBlobService,MetricGroup taskManagerMetricGroup,ExecutorService ioExecutor,FatalErrorHandler fatalErrorHandler) throws Exception {// pre-start checks// 检查temp dir(yarn模式为local_dirs,standalone模式为java.io.tmpdir)目录是否存在,如果不存在会创建文件夹// 是否temp dir是否是目录,是否可写入checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());// 创建TaskEventDispatcher// 任务事件派发器,用于消费任务向生产任务发送TaskEventfinal TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();// 创建异步IO管理器// 根据配置的tmp dir的个数,创建对应数量的读写线程,负责异步读写数据// start the I/O manager, it will create some temp directories.final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());// 创建ShuffleEnvironment// 负责在本地提供一个shuffle环境,使用memory segment作为数据存储// 可以创建数据写入端ResultPartitionWriter和数据消费端InputGatefinal ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(taskManagerServicesConfiguration,taskEventDispatcher,taskManagerMetricGroup,ioExecutor);// 启动ShuffleManager// 返回TaskManager的数据端口(taskmanager.data.port)final int listeningDataPort = shuffleEnvironment.start();// 创建key value状态存储服务final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);kvStateService.start();final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation(taskManagerServicesConfiguration.getResourceID(),taskManagerServicesConfiguration.getExternalAddress(),// we expose the task manager location with the listening port// iff the external data port is not explicitly definedtaskManagerServicesConfiguration.getExternalDataPort() > 0 ?taskManagerServicesConfiguration.getExternalDataPort() :listeningDataPort);// 创建广播变量管理器final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();// 创建TaskSlotTable,维护task和slot的分配关系final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(taskManagerServicesConfiguration.getNumberOfSlots(),taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),taskManagerServicesConfiguration.getPageSize(),ioExecutor);final JobTable jobTable = DefaultJobTable.create();// 创建Job leader服务。Job leader是领导一个job的job manager。// 一旦某个job manager获得leader角色,或者失去leader状态,会通知JobLeaderListener,位于TaskExecutor.java中final JobLeaderService jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());// 读取本地状态保存根路径// taskmanager.state.local.root-dirsfinal String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];// 创建目录for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);}// 创建任务状态管理器final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(taskManagerServicesConfiguration.isLocalRecoveryEnabled(),stateRootDirectoryFiles,ioExecutor);final boolean failOnJvmMetaspaceOomError =taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM);final boolean checkClassLoaderLeak =taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(permanentBlobService,BlobLibraryCacheManager.defaultClassLoaderFactory(taskManagerServicesConfiguration.getClassLoaderResolveOrder(),taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(),failOnJvmMetaspaceOomError ? fatalErrorHandler : null,checkClassLoaderLeak));// 构建TaskManagerServicesreturn new TaskManagerServices(unresolvedTaskManagerLocation,taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),ioManager,shuffleEnvironment,kvStateService,broadcastVariableManager,taskSlotTable,jobTable,jobLeaderService,taskStateManager,taskEventDispatcher,ioExecutor,libraryCacheManager);}
TaskManager的启动
以上只是相关服务的创建逻辑,服务启动的逻辑位于start方法中。
TaskManagerRunner的start方法如下所示:
附录
TaskExecutor中的重要服务脑图
这些服务的作用在本人后续博客中计划陆续分析。
