启动脚本分析
TaskManager的启动方式为taskmanager.sh start。因此我们如果想要知道程序入口类,必须首先分析这个脚本。
这个脚本比较长,我们重点关注如下片段:
# ...
# 设置ENTRYPOINT变量值为taskexecutor
ENTRYPOINT=taskexecutor
# ...
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
else
if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
# Start a single TaskManager
# 我们关注这里,不使用NUMA方式后台启动
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
else
# Example output from `numactl --show` on an AWS c4.8xlarge:
# policy: default
# preferred node: current
# physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# cpubind: 0 1
# nodebind: 0 1
# membind: 0 1
read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
for NODE_ID in "${NODE_LIST[@]:1}"; do
# Start a TaskManager for each NUMA node
numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
done
fi
fi
通过分析上面的脚本,我们看到如果没有使用start-foreground(前台模式启动),实际上调用的是flink-daemon.sh脚本。此脚本的使用方式和参数示例为:
flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]
为了弄清楚Java的入口类,我们接着分析flink-daemon.sh,发现如下片段:
# ...
STARTSTOP=$1
# 经上面分析可知,通过taskmanager.sh执行,DAEMON的值为taskexecutor
DAEMON=$2
# ...
case $DAEMON in
(taskexecutor)
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
;;
(zookeeper)
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;
(historyserver)
CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
;;
(standalonesession)
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
;;
(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;
(*)
echo "Unknown daemon '${DAEMON}'. $USAGE."
exit 1
;;
esac
# ...
不难发现,DAEMON变量的值为taskexecutor,实际的入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner。TaskManagerRunner就是TaskManager启动的入口类。
TaskManagerRunner
TaskManagerRunner是TaskManager在yarn模式和standalone模式下的启动类。
我们查看下main方法:
public static void main(String[] args) throws Exception {
// startup checks and logging
// 日志打印环境信息
EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
// 注册信号处理句柄
// 在Linux下响应TERM,HUP和INT
SignalHandler.register(LOG);
// 增加shutdownlook,在JVM关闭之前回调
// 让JVM关闭延迟5秒钟
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// 获取并打印最大打开文件句柄数限制
long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
if (maxOpenFileHandles != -1L) {
LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
} else {
LOG.info("Cannot determine the maximum number of open file descriptors");
}
// 启动TaskManager
runTaskManagerSecurely(args);
}
继续跟踪runTaskManagerSecurely方法,内容如下:
public static void runTaskManagerSecurely(String[] args) {
try {
// // 读取flink-conf.yaml和命令行传入的动态参数,作为配置信息
Configuration configuration = loadConfiguration(args);
// 继续调用runTaskManagerSecurely
runTaskManagerSecurely(configuration);
} catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("TaskManager initialization failed.", strippedThrowable);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
}
继续跟踪runTaskManagerSecurely方法,内容如下:
public static void runTaskManagerSecurely(Configuration configuration) throws Exception {
// 替换掉配置文件中包含优雅退出的指令
replaceGracefulExitWithHaltIfConfigured(configuration);
// 从配置文件中创建插件管理器
final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
// 初始化共享文件系统配置(FileSystemFactory),比如HDFS
FileSystem.initialize(configuration, pluginManager);
// 读取安全相关配置,包含flink,JAAS,Hadoop和Zookeeper的安全配置
SecurityUtils.install(new SecurityConfiguration(configuration));
// 以安全认证环境下调用runTaskManager
SecurityUtils.getInstalledContext().runSecured(() -> {
runTaskManager(configuration, pluginManager);
return null;
});
}
该方法载入了Flink的主配置文件,初始化了文件系统和服务安全配置。启动TaskManager的方法在runTaskManager。如下所示:
public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {
// 创建一个TaskManagerRunner
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager,
TaskManagerRunner::createTaskExecutorService);
// 调用start
taskManagerRunner.start();
}
首先我们分析下构造函数:
public TaskManagerRunner(
Configuration configuration,
PluginManager pluginManager,
TaskExecutorServiceFactory taskExecutorServiceFactory) throws Exception {
this.configuration = checkNotNull(configuration);
// 获取Akka超时时间
timeout = AkkaUtils.getTimeoutAsTime(configuration);
// 创建一个task manager的线程池
// corePoolSize和机器CPU数量一致
this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("taskmanager-future"));
// 创建高可用服务
// 负责选举JobManager和ResourceManager和获取leader信息
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
// 创建RPC服务,和其他Flink进程互相通信的时候使用
// 用于连接一个RpcEndpoint,连接成功之后返回一个RpcGateway
rpcService = createRpcService(configuration, highAvailabilityServices);
this.resourceId = getTaskManagerResourceID(configuration, rpcService.getAddress(), rpcService.getPort());
// 创建心跳服务
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
// 创建监控指标注册
// 用于记录所有的metrics,连接MetricGroup和MetricReporter
metricRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(configuration),
ReporterSetup.fromConfiguration(configuration, pluginManager));
// 开启metrics查询服务
// 以key-value方式返回Flink中已注册的metrics
final RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, rpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
// 创建BlobCache服务
blobCacheService = new BlobCacheService(
configuration, highAvailabilityServices.createBlobStore(), null
);
// 创建外部资源信息Provider
final ExternalResourceInfoProvider externalResourceInfoProvider =
ExternalResourceUtils.createStaticExternalResourceInfoProvider(
ExternalResourceUtils.getExternalResourceAmountMap(configuration),
ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));
// 启动task manager
// 稍后分析
taskExecutorService = taskExecutorServiceFactory.createTaskExecutor(
this.configuration,
this.resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
externalResourceInfoProvider,
this);
this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;
handleUnexpectedTaskExecutorServiceTermination();
MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
}
在分析了层层封装之后,createTaskExecutor方法后终于找到startTaskManager方法。该方法创建出一个TaskExecutor对象,如下所示:
public static TaskExecutor startTaskManager(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
FatalErrorHandler fatalErrorHandler) throws Exception {
checkNotNull(configuration);
checkNotNull(resourceID);
checkNotNull(rpcService);
checkNotNull(highAvailabilityServices);
LOG.info("Starting TaskManager with ResourceID: {}", resourceID.getStringWithMetadata());
// 获取外部访问地址
String externalAddress = rpcService.getAddress();
// 获取task executor资源详情
// 包含CPU核数,task堆内存,tsk堆外内存和managed内存
final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
// 获取Task Manager服务配置
TaskManagerServicesConfiguration taskManagerServicesConfiguration =
TaskManagerServicesConfiguration.fromConfiguration(
configuration,
resourceID,
externalAddress,
localCommunicationOnly,
taskExecutorResourceSpec);
// 创建task manager的MetricGroup
Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
externalAddress,
resourceID,
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
// 创建用于IO任务的线程池
final ExecutorService ioExecutor = Executors.newFixedThreadPool(
taskManagerServicesConfiguration.getNumIoThreads(),
new ExecutorThreadFactory("flink-taskexecutor-io"));
// 创建Task Manager服务,是其他多种资源或服务的容器
// 它涉及的服务也非常多,稍后我们单独分析
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
blobCacheService.getPermanentBlobService(),
taskManagerMetricGroup.f1,
ioExecutor,
fatalErrorHandler);
MetricUtils.instantiateFlinkMemoryMetricGroup(
taskManagerMetricGroup.f1,
taskManagerServices.getTaskSlotTable(),
taskManagerServices::getManagedMemorySize);
// 创建task manager的配置
TaskManagerConfiguration taskManagerConfiguration =
TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);
// 获取metrics查询服务的地址
String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
// 创建task executor
return new TaskExecutor(
rpcService,
taskManagerConfiguration,
highAvailabilityServices,
taskManagerServices,
externalResourceInfoProvider,
heartbeatServices,
taskManagerMetricGroup.f0,
metricQueryServiceAddress,
blobCacheService,
fatalErrorHandler,
new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()));
}
到这里为止,一个完整的TaskExecutor创建完毕。创建的过程涉及到了很多相关服务器的初始化,稍后在文末以脑图形式为大家总结。
TaskManagerServices的fromConfiguration方法
TaskManagerServices是一系列TaskManager服务的容器,包含内存控制器,IO控制器,Shuffle环境等。各个服务的用途计划在后续博客中介绍。
这里我们重点关注它的fromConfiguration方法。如下所示:
/**
* Creates and returns the task manager services.
*
* @param taskManagerServicesConfiguration task manager configuration
* @param permanentBlobService permanentBlobService used by the services
* @param taskManagerMetricGroup metric group of the task manager
* @param ioExecutor executor for async IO operations
* @param fatalErrorHandler to handle class loading OOMs
* @return task manager components
* @throws Exception
*/
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
PermanentBlobService permanentBlobService,
MetricGroup taskManagerMetricGroup,
ExecutorService ioExecutor,
FatalErrorHandler fatalErrorHandler) throws Exception {
// pre-start checks
// 检查temp dir(yarn模式为local_dirs,standalone模式为java.io.tmpdir)目录是否存在,如果不存在会创建文件夹
// 是否temp dir是否是目录,是否可写入
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
// 创建TaskEventDispatcher
// 任务事件派发器,用于消费任务向生产任务发送TaskEvent
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
// 创建异步IO管理器
// 根据配置的tmp dir的个数,创建对应数量的读写线程,负责异步读写数据
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
// 创建ShuffleEnvironment
// 负责在本地提供一个shuffle环境,使用memory segment作为数据存储
// 可以创建数据写入端ResultPartitionWriter和数据消费端InputGate
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup,
ioExecutor);
// 启动ShuffleManager
// 返回TaskManager的数据端口(taskmanager.data.port)
final int listeningDataPort = shuffleEnvironment.start();
// 创建key value状态存储服务
final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
kvStateService.start();
final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation(
taskManagerServicesConfiguration.getResourceID(),
taskManagerServicesConfiguration.getExternalAddress(),
// we expose the task manager location with the listening port
// iff the external data port is not explicitly defined
taskManagerServicesConfiguration.getExternalDataPort() > 0 ?
taskManagerServicesConfiguration.getExternalDataPort() :
listeningDataPort);
// 创建广播变量管理器
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
// 创建TaskSlotTable,维护task和slot的分配关系
final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
taskManagerServicesConfiguration.getPageSize(),
ioExecutor);
final JobTable jobTable = DefaultJobTable.create();
// 创建Job leader服务。Job leader是领导一个job的job manager。
// 一旦某个job manager获得leader角色,或者失去leader状态,会通知JobLeaderListener,位于TaskExecutor.java中
final JobLeaderService jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
// 读取本地状态保存根路径
// taskmanager.state.local.root-dirs
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
// 创建目录
for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}
// 创建任务状态管理器
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
ioExecutor);
final boolean failOnJvmMetaspaceOomError =
taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM);
final boolean checkClassLoaderLeak =
taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
permanentBlobService,
BlobLibraryCacheManager.defaultClassLoaderFactory(
taskManagerServicesConfiguration.getClassLoaderResolveOrder(),
taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(),
failOnJvmMetaspaceOomError ? fatalErrorHandler : null,
checkClassLoaderLeak));
// 构建TaskManagerServices
return new TaskManagerServices(
unresolvedTaskManagerLocation,
taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
ioManager,
shuffleEnvironment,
kvStateService,
broadcastVariableManager,
taskSlotTable,
jobTable,
jobLeaderService,
taskStateManager,
taskEventDispatcher,
ioExecutor,
libraryCacheManager);
}
TaskManager的启动
以上只是相关服务的创建逻辑,服务启动的逻辑位于start方法中。
TaskManagerRunner的start方法如下所示:
附录
TaskExecutor中的重要服务脑图
这些服务的作用在本人后续博客中计划陆续分析。