概述
Namenode 在 hadoop 的 hdfs 体系中的作用是毋庸置疑的。本文主要分析 namenode 的启动过程,同时梳理一下Namenode的逻辑关系
Namenode 实体在代码实现中主要对应于三个类,即 NameNode 类、NameNodeRpcServer 类以及FSNamesystem 类
NameNode类:负责管理 Namenode 配置、RPC 接口以及 HTTP 接口等
NameNodeRpcServer 类:接收和处理所有的RPC请求
FSNamesystem 类:实现Namenode的所有逻辑
启动
Namenode 启动的入口这个很容易找,可以从启动命令的 shell 脚本开始找,比如从${HADOOP_HOME}/sbin/start-dfs.sh 脚本开始找起。最终的执行命令如下
/Library/java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/bin/java-Dproc_namenode-Djava.net.preferIPv4Stack=true-Djava.security.krb5.realm=-Djava.security.krb5.kdc=-Djava.security.krb5.conf=-Dhdfs.audit.logger=INFO,NullAppender-Dhadoop.security.logger=INFO,RFAS-Dyarn.log.dir=/tools/hadoop-3.2.1/logs-Dyarn.log.file=hadoop-sysadmin-namenode-bogon.log-Dyarn.home.dir=/tools/hadoop-3.2.1-Dyarn.root.logger=INFO,console-Djava.library.path=/tools/hadoop-3.2.1/lib/native-Dhadoop.log.dir=/tools/hadoop-3.2.1/logs-Dhadoop.log.file=hadoop-sysadmin-namenode-bogon.log-Dhadoop.home.dir=/tools/hadoop-3.2.1-Dhadoop.id.str=sysadmin-Dhadoop.root.logger=INFO,RFA-Dhadoop.policy.file=hadoop-policy.xmlorg.apache.hadoop.hdfs.server.namenode.NameNode
最重要的就是 Namenode 的启动类,我们可以看到直接调用的是这个类,然后启动就可以了
org.apache.hadoop.hdfs.server.namenode.NameNode
对应的 main 方法入口
public static void main(String[] args) throws Exception {//参数校验if (DFSUtil.parseHelpArgument(args, Cli.USAGE, System.out, true)) {System.exit(0);}try {StringUtils.startupShutdownMessage(Namenode.class, args, Cli);//调用createNameNode创建NameNode对象NameNode namenode = createNameNode(args, null);if(namenode != null){//等待NameNode RPC服务结束namenode.join();}} catch (Throwable e) {Cli.printError("Failed to run " + Cli.COMMAND, e);System.exit(-2);}}}
重点在于
NameNode namenode = createNameNode(args, null);
创建NameNode
NameNode 是通过 createNameNode 方法进行创建的
public static NameNode createNameNode(String args[], Configuration conf)throws IOException {LOG.info("createNameNode " + Arrays.asList(args));// 构建配置文件if (conf == null)conf = new HdfsConfiguration();// Parse out some generic args into Configuration.GenericOptionsParser hParser = new GenericOptionsParser(conf, args);args = hParser.getRemainingArgs();// Parse the rest, NN specific args.// 解析命令行的参数StartupOption startOpt = parseArguments(args);if (startOpt == null) {printUsage(System.err);return null;}setStartupOption(conf, startOpt);boolean aborted = false;// 根据启动选项调用对应的方法执行操作switch (startOpt) {// 格式化当前Namenode,调用format()方法执行格式化操作case FORMAT:aborted = format(conf, startOpt.getForceFormat(),startOpt.getInteractiveFormat());terminate(aborted ? 1 : 0);return null; // avoid javac warningcase GENCLUSTERID:System.err.println("Generating new cluster id:");System.out.println(NNStorage.newClusterID());terminate(0);return null;// 回滚上一次升级, 调用doRollback()方法执行回滚操作case ROLLBACK:aborted = doRollback(conf, true);terminate(aborted ? 1 : 0);return null; // avoid warning// 拷贝Active Namenode的最新命名空间数据到StandbyNamenode,// 调用BootstrapStandby.run()方法执行操作case BOOTSTRAPSTANDBY:String[] toolArgs = Arrays.copyOfRange(args, 1, args.length);int rc = BootstrapStandby.run(toolArgs, conf);terminate(rc);return null; // avoid warning// 初始化editlog的共享存储空间,并从Active// Namenode中拷贝足够的editlog数据,使得Standby节点能够顺利启动// 这里调用了静态方法initializeSharedEdits()执行操作case INITIALIZESHAREDEDITS:aborted = initializeSharedEdits(conf,startOpt.getForceFormat(),startOpt.getInteractiveFormat());terminate(aborted ? 1 : 0);return null; // avoid warning// 参看下面的CHECKPOINT代码 .case BACKUP:// 启动checkpoint节点,也是直接构造BackupNode对象并返回case CHECKPOINT:NamenodeRole role = startOpt.toNodeRole();DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));return new BackupNode(conf, role);// 恢复损坏的元数据以及文件系统,这里调用了doRecovery()方法执行操作case RECOVER:NameNode.doRecovery(startOpt, conf);return null;// 确认配置文件夹存在,并且打印fsimage文件和文件系统的元数据版本case METADATAVERSION:printMetadataVersion(conf);terminate(0);return null; // avoid javac warning// 升级Namenode,升级完成后关闭Namenodecase UPGRADEONLY:DefaultMetricsSystem.initialize("NameNode");new NameNode(conf);terminate(0);return null;// 在默认情况下直接构造NameNode对象并返回default:// 初始化 度量服务DefaultMetricsSystem.initialize("NameNode");return new NameNode(conf);}}
这里最重要的是根据输入的参数来执行不同的任务,任务类型如下:
| FORMAT | 格式化当前Namenode,调用format()方法执行格式化操作 |
|---|---|
| GENCLUSTERID | 生成集群id |
| ROLLBACK | 回滚上一次升级, 调用doRollback()方法执行回滚操作 |
| BOOTSTRAPSTANDBY | 拷贝Active Namenode的最新命名空间数据到StandbyNamenode,调用BootstrapStandby.run()方法执行操作 |
| INITIALIZESHAREDEDITS | 初始化editlog的共享存储空间,并从ActiveNamenode中拷贝足够的editlog数据,使得Standby节点能够顺利启动。这里调用了静态方法initializeSharedEdits()执行操作 |
| BACKUP | |
| CHECKPOINT | 启动checkpoint节点,也是直接构造BackupNode对象并返回 |
| RECOVER | 恢复损坏的元数据以及文件系统,这里调用了doRecovery()方法执行操作 |
| METADATAVERSION | 确认配置文件夹存在,并且打印fsimage文件和文件系统的元数据版本 |
| UPGRADEONLY | 升级Namenode,升级完成后关闭Namenode |
构造方法
NameNode 的构建是通过其构造方法 NameNode(Configuration conf) 作为入口进行创建的
protected NameNode(Configuration conf, NamenodeRole role) throws IOException {super(conf);this.tracer = new Tracer.Builder("NameNode").conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).build();this.tracerConfigurationManager =new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);this.role = role;String nsId = getNameServiceId(conf);// 根据配置确认是否开启了HAString namenodeId = HAUtil.getNameNodeId(conf, nsId);// fs.defaultFS localhost:8020clientNamenodeAddress = NameNodeUtils.getClientNamenodeAddress(conf, nsId);if (clientNamenodeAddress != null) {LOG.info("Clients should use {} to access"+ " this namenode/service.", clientNamenodeAddress);}// 是否启用hathis.haEnabled = HAUtil.isHAEnabled(conf, nsId);// 非HA ==> ACTIVE_STATEstate = createHAState(getStartupOption(conf));// This is used only by tests at the moment.this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);this.haContext = createHAContext();try {initializeGenericKeys(conf, nsId, namenodeId);// 执行初始化操作initialize(getConf());try {haContext.writeLock();// 初始化完成后,Namenode进入Standby状态// 在这里会开启StandbyCheckpointer里面的// checkpointer 线程,定时合并&处理images文件state.prepareToEnterState(haContext);state.enterState(haContext);} finally {haContext.writeUnlock();}} catch (IOException e) {// 出现异常,直接停止Namenode服务this.stopAtException(e);throw e;} catch (HadoopIllegalArgumentException e) {// 直接停止Namenode服务this.stopAtException(e);throw e;}this.started.set(true);}
里面最重要的就是调用了初始化的方法 initialize(getConf())
初始化 initialize
initialize(getConf()) 是 namenode 的初始化方法。Namenode的一些重要服务的启动都是通过他来完成的
/*** Initialize name-node.** @param conf the configuration*/protected void initialize(Configuration conf) throws IOException {if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);if (intervals != null) {conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS, intervals);}}UserGroupInformation.setConfiguration(conf);loginAsNameNodeUser(conf);NameNode.initMetrics(conf, this.getRole());StartupProgressMetrics.register(startupProgress);//构造JvmPauseMonitor对象,并启动pauseMonitor = new JvmPauseMonitor();pauseMonitor.init(conf);pauseMonitor.start();metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);//启动HTTP服务if (NamenodeRole.NAMENODE == role) {startHttpServer(conf);}// 初始化FSNamesystem// NameNode将对文件系统的管理都委托给了FSNamesystem对象,// NameNode会调用FSNamesystem.loadFromDisk()创建FSNamesystem对象。//// FSNamesystem.loadFromDisk()首先调用构造方法构造FSNamesystem对象,// 然后将fsimage以及editlog文件加载到命名空间中。loadNamesystem(conf);startAliasMapServerIfNecessary(conf);//创建RPC服务rpcServer = createRpcServer(conf);initReconfigurableBackoffKey();if (clientNamenodeAddress == null) {// This is expected for MiniDFSCluster. Set it now using// the RPC server's bind address.clientNamenodeAddress =NetUtils.getHostPortString(getNameNodeAddress());LOG.info("Clients are to use " + clientNamenodeAddress + " to access"+ " this namenode/service.");}if (NamenodeRole.NAMENODE == role) {httpServer.setNameNodeAddress(getNameNodeAddress());httpServer.setFSImage(getFSImage());}//启动httpServer以及 rpcServerstartCommonServices(conf);//启动计时器定期将NameNode度量写入日志文件。此行为可由配置禁用。startMetricsLogger(conf);}
构建http服务:startHttpServer
其实就是构建一个 NameNodeHttpServer 对象,然后启动就可以了
private void startHttpServer(final Configuration conf) throws IOException {httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));httpServer.start();httpServer.setStartupProgress(startupProgress);}
/*** @see DFSUtil#getHttpPolicy(org.apache.hadoop.conf.Configuration)* for information related to the different configuration options and* Http Policy is decided.*/void start() throws IOException {HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);// 获取服务器hostfinal String infoHost = bindAddress.getHostName();// 获取绑定地址final InetSocketAddress httpAddr = bindAddress;// 获取服务地址 0.0.0.0:9871final String httpsAddrString = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT);// 构建网络InetSocketAddress服务:InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);if (httpsAddr != null) {// 绑定地址 如果dfs.namenode.https-bind-host已绑定了地址的话,将会覆盖掉之前创建的final String bindHost = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_HTTPS_BIND_HOST_KEY);if (bindHost != null && !bindHost.isEmpty()) {httpsAddr = new InetSocketAddress(bindHost, httpsAddr.getPort());}}HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,httpAddr, httpsAddr, "hdfs",DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);final boolean xFrameEnabled = conf.getBoolean(DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED,DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED_DEFAULT);final String xFrameOptionValue = conf.getTrimmed(DFSConfigKeys.DFS_XFRAME_OPTION_VALUE,DFSConfigKeys.DFS_XFRAME_OPTION_VALUE_DEFAULT);builder.configureXFrame(xFrameEnabled).setXFrameOption(xFrameOptionValue);// 构建http服务httpServer = builder.build();if (policy.isHttpsEnabled()) {// assume same ssl port for all datanodesInetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":"+ DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT));httpServer.setAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY,datanodeSslPort.getPort());}initWebHdfs(conf, bindAddress.getHostName(), httpServer,NamenodeWebHdfsMethods.class.getPackage().getName());httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);setupServlets(httpServer, conf);httpServer.start();int connIdx = 0;if (policy.isHttpEnabled()) {httpAddress = httpServer.getConnectorAddress(connIdx++);conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,NetUtils.getHostPortString(httpAddress));}if (policy.isHttpsEnabled()) {httpsAddress = httpServer.getConnectorAddress(connIdx);conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,NetUtils.getHostPortString(httpsAddress));}}
构建FSNamesystem
从磁盘中加载 FSNamesystem
protected void loadNamesystem(Configuration conf) throws IOException {// 从磁盘中加载 FSNamesystemthis.namesystem = FSNamesystem.loadFromDisk(conf);}
在这里,是调用 FSNamesystem.loadFromDisk(conf) 加载文件系统信息
/*** 从配置中加载image和edits目录,实例化FSNamesystem* Instantiates an FSNamesystem loaded from the image and edits* directories specified in the passed Configuration.** @param conf the Configuration which specifies the storage directories* from which to load* @return an FSNamesystem which contains the loaded namespace* @throws IOException if loading fails*/static FSNamesystem loadFromDisk(Configuration conf) throws IOException {checkConfiguration(conf);// 构建FSImageFSImage fsImage = new FSImage(conf,FSNamesystem.getNamespaceDirs(conf), //dfs.namenode.name.dirFSNamesystem.getNamespaceEditsDirs(conf));// FSNamesystem的构造方法比较长,但是逻辑很简单,主要是从配置文件中获取参数,// 然后构造FSDirectory、BlockManager、SnapshotManager、CacheManagerSafeModeInfo等对象// 需要注意的是,FSNamesystem的构造方法并不从磁盘上加载fsimage以及editlog文件,// 这些操作是在创建FSNamesystem对象成功后,在loadFromDisk()中执行的// 如果FSNamesystem初始化失败,则会调用FSNamesystem.close()方法// 关闭FSNamesystem启动的所有服务FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);StartupOption startOpt = NameNode.getStartupOption(conf);if (startOpt == StartupOption.RECOVER) {namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);}long loadStart = monotonicNow();try {// 加载fsimage以及editlog文件namesystem.loadFSImage(startOpt);} catch (IOException ioe) {LOG.warn("Encountered exception loading fsimage", ioe);fsImage.close();throw ioe;}long timeTakenToLoadFSImage = monotonicNow() - loadStart;LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();if (nnMetrics != null) {nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);}namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());return namesystem;}
首先是检查配置文件中的配置:checkConfiguration(conf)
根据配置文件中的配置信息如下
private static void checkConfiguration(Configuration conf)throws IOException {final Collection<URI> namespaceDirs =FSNamesystem.getNamespaceDirs(conf);final Collection<URI> editsDirs =FSNamesystem.getNamespaceEditsDirs(conf);final Collection<URI> requiredEditsDirs =FSNamesystem.getRequiredNamespaceEditsDirs(conf);final Collection<URI> sharedEditsDirs =FSNamesystem.getSharedEditsDirs(conf);for (URI u : requiredEditsDirs) {if (u.toString().compareTo(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_DEFAULT) == 0) {continue;}
FSImage 由下面的代码构建
// 构建 FSImageFSImage fsImage = new FSImage(conf,FSNamesystem.getNamespaceDirs(conf), //dfs.namenode.name.dirFSNamesystem.getNamespaceEditsDirs(conf));
FSImage 的初始化方法
protected FSImage(Configuration conf, Collection<URI> imageDirs, List<URI> editsDirs)throws IOException{this.conf = conf;// 构建NNStorage,负责管理namenode使用的StorageDirectoriesstorage = new NNStorage(conf, imageDirs, editsDirs);// dfs.namenode.name.dir.restore default: false// 设置为true可使namenode尝试恢复以前失败的 dfs.namenode.name.dir// 启用后,将在检查点期间尝试恢复任何失败的目录if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)){storage.setRestoreFailedStorage(true);}// 构建FSEditLogthis.editlog = FSEditLog.newInstance(conf, storage, editsDirs);}
在这里,可以看到设置的 image 的存储目录和 edits 的存储目录
上述代码之后,就是构建 FSEditLog
this.editlog = FSEditLog.newInstance(conf, storage, editsDirs);
再回到 loadFromDisk 方法,到这里已经构建完 FSImage,接下来就是构建 FSNamesystem,加载 FSImage 以及editlog 文件
// 构建FSImageFSImage fsImage = new FSImage(conf,FSNamesystem.getNamespaceDirs(conf), //dfs.namenode.name.dirFSNamesystem.getNamespaceEditsDirs(conf));// FSNamesystem的构造方法比较长,但是逻辑很简单,主要是从配置文件中获取参数,// 然后构造FSDirectory、BlockManager、SnapshotManager、CacheManagerSafeModeInfo等对象// 需要注意的是,FSNamesystem的构造方法并不从磁盘上加载fsimage以及editlog文件,// 这些操作是在创建FSNamesystem对象成功后,在loadFromDisk()中执行的// 如果FSNamesystem初始化失败,则会调用FSNamesystem.close()方法// 关闭FSNamesystem启动的所有服务FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);StartupOption startOpt = NameNode.getStartupOption(conf);if (startOpt == StartupOption.RECOVER) {namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);}long loadStart = monotonicNow();try {// 加载fsimage以及editlog文件namesystem.loadFSImage(startOpt);
构建NameNodeRpcServer服务:startHttpServer
/*** Create the RPC server implementation. Used as an extension point for the* BackupNode.*/protected NameNodeRpcServer createRpcServer(Configuration conf)throws IOException {return new NameNodeRpcServer(conf, this);}
创建 RPC 服务,这个供程序调用,常用的是 8020 端口
创建完 RPC 服务器,调用 startCommonServices 方法进行启动服务
//启动httpServer以及 rpcServerstartCommonServices(conf);
修改Namenode的状态为active
try {haContext.writeLock();// 初始化完成后,Namenode进入Standby状态// 在这里会开启StandbyCheckpointer里面的// checkpointer线程,定时合并&处理images文件state.prepareToEnterState(haContext);state.enterState(haContext);} finally {haContext.writeUnlock();}
