概述
Namenode 在 hadoop 的 hdfs 体系中的作用是毋庸置疑的。本文主要分析 namenode 的启动过程,同时梳理一下Namenode的逻辑关系
Namenode 实体在代码实现中主要对应于三个类,即 NameNode 类、NameNodeRpcServer 类以及FSNamesystem 类
NameNode类:负责管理 Namenode 配置、RPC 接口以及 HTTP 接口等
NameNodeRpcServer 类:接收和处理所有的RPC请求
FSNamesystem 类:实现Namenode的所有逻辑
启动
Namenode 启动的入口这个很容易找,可以从启动命令的 shell 脚本开始找,比如从${HADOOP_HOME}/sbin/start-dfs.sh 脚本开始找起。最终的执行命令如下
/Library/java/JavaVirtualMachines/jdk1.8.0_161.jdk/Contents/Home/bin/java
-Dproc_namenode
-Djava.net.preferIPv4Stack=true
-Djava.security.krb5.realm=
-Djava.security.krb5.kdc=
-Djava.security.krb5.conf=
-Dhdfs.audit.logger=INFO,NullAppender
-Dhadoop.security.logger=INFO,RFAS
-Dyarn.log.dir=/tools/hadoop-3.2.1/logs
-Dyarn.log.file=hadoop-sysadmin-namenode-bogon.log
-Dyarn.home.dir=/tools/hadoop-3.2.1
-Dyarn.root.logger=INFO,console
-Djava.library.path=/tools/hadoop-3.2.1/lib/native
-Dhadoop.log.dir=/tools/hadoop-3.2.1/logs
-Dhadoop.log.file=hadoop-sysadmin-namenode-bogon.log
-Dhadoop.home.dir=/tools/hadoop-3.2.1
-Dhadoop.id.str=sysadmin
-Dhadoop.root.logger=INFO,RFA
-Dhadoop.policy.file=hadoop-policy.xml
org.apache.hadoop.hdfs.server.namenode.NameNode
最重要的就是 Namenode 的启动类,我们可以看到直接调用的是这个类,然后启动就可以了
org.apache.hadoop.hdfs.server.namenode.NameNode
对应的 main 方法入口
public static void main(String[] args) throws Exception {
//参数校验
if (DFSUtil.parseHelpArgument(args, Cli.USAGE, System.out, true)) {
System.exit(0);
}
try {
StringUtils.startupShutdownMessage(Namenode.class, args, Cli);
//调用createNameNode创建NameNode对象
NameNode namenode = createNameNode(args, null);
if(namenode != null){
//等待NameNode RPC服务结束
namenode.join();
}
} catch (Throwable e) {
Cli.printError("Failed to run " + Cli.COMMAND, e);
System.exit(-2);
}
}
}
重点在于
NameNode namenode = createNameNode(args, null);
创建NameNode
NameNode 是通过 createNameNode 方法进行创建的
public static NameNode createNameNode(String args[], Configuration conf)
throws IOException {
LOG.info("createNameNode " + Arrays.asList(args));
// 构建配置文件
if (conf == null)
conf = new HdfsConfiguration();
// Parse out some generic args into Configuration.
GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
args = hParser.getRemainingArgs();
// Parse the rest, NN specific args.
// 解析命令行的参数
StartupOption startOpt = parseArguments(args);
if (startOpt == null) {
printUsage(System.err);
return null;
}
setStartupOption(conf, startOpt);
boolean aborted = false;
// 根据启动选项调用对应的方法执行操作
switch (startOpt) {
// 格式化当前Namenode,调用format()方法执行格式化操作
case FORMAT:
aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid javac warning
case GENCLUSTERID:
System.err.println("Generating new cluster id:");
System.out.println(NNStorage.newClusterID());
terminate(0);
return null;
// 回滚上一次升级, 调用doRollback()方法执行回滚操作
case ROLLBACK:
aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
return null; // avoid warning
// 拷贝Active Namenode的最新命名空间数据到StandbyNamenode,
// 调用BootstrapStandby.run()方法执行操作
case BOOTSTRAPSTANDBY:
String[] toolArgs = Arrays.copyOfRange(args, 1, args.length);
int rc = BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return null; // avoid warning
// 初始化editlog的共享存储空间,并从Active
// Namenode中拷贝足够的editlog数据,使得Standby节点能够顺利启动
// 这里调用了静态方法initializeSharedEdits()执行操作
case INITIALIZESHAREDEDITS:
aborted = initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid warning
// 参看下面的CHECKPOINT代码 .
case BACKUP:
// 启动checkpoint节点,也是直接构造BackupNode对象并返回
case CHECKPOINT:
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
return new BackupNode(conf, role);
// 恢复损坏的元数据以及文件系统,这里调用了doRecovery()方法执行操作
case RECOVER:
NameNode.doRecovery(startOpt, conf);
return null;
// 确认配置文件夹存在,并且打印fsimage文件和文件系统的元数据版本
case METADATAVERSION:
printMetadataVersion(conf);
terminate(0);
return null; // avoid javac warning
// 升级Namenode,升级完成后关闭Namenode
case UPGRADEONLY:
DefaultMetricsSystem.initialize("NameNode");
new NameNode(conf);
terminate(0);
return null;
// 在默认情况下直接构造NameNode对象并返回
default:
// 初始化 度量服务
DefaultMetricsSystem.initialize("NameNode");
return new NameNode(conf);
}
}
这里最重要的是根据输入的参数来执行不同的任务,任务类型如下:
FORMAT | 格式化当前Namenode,调用format()方法执行格式化操作 |
---|---|
GENCLUSTERID | 生成集群id |
ROLLBACK | 回滚上一次升级, 调用doRollback()方法执行回滚操作 |
BOOTSTRAPSTANDBY | 拷贝Active Namenode的最新命名空间数据到StandbyNamenode,调用BootstrapStandby.run()方法执行操作 |
INITIALIZESHAREDEDITS | 初始化editlog的共享存储空间,并从ActiveNamenode中拷贝足够的editlog数据,使得Standby节点能够顺利启动。这里调用了静态方法initializeSharedEdits()执行操作 |
BACKUP | |
CHECKPOINT | 启动checkpoint节点,也是直接构造BackupNode对象并返回 |
RECOVER | 恢复损坏的元数据以及文件系统,这里调用了doRecovery()方法执行操作 |
METADATAVERSION | 确认配置文件夹存在,并且打印fsimage文件和文件系统的元数据版本 |
UPGRADEONLY | 升级Namenode,升级完成后关闭Namenode |
构造方法
NameNode 的构建是通过其构造方法 NameNode(Configuration conf) 作为入口进行创建的
protected NameNode(Configuration conf, NamenodeRole role) throws IOException {
super(conf);
this.tracer = new Tracer.Builder("NameNode").
conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).
build();
this.tracerConfigurationManager =
new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);
this.role = role;
String nsId = getNameServiceId(conf);
// 根据配置确认是否开启了HA
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
// fs.defaultFS localhost:8020
clientNamenodeAddress = NameNodeUtils.getClientNamenodeAddress(
conf, nsId);
if (clientNamenodeAddress != null) {
LOG.info("Clients should use {} to access"
+ " this namenode/service.", clientNamenodeAddress);
}
// 是否启用ha
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
// 非HA ==> ACTIVE_STATE
state = createHAState(getStartupOption(conf));
// This is used only by tests at the moment.
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
initializeGenericKeys(conf, nsId, namenodeId);
// 执行初始化操作
initialize(getConf());
try {
haContext.writeLock();
// 初始化完成后,Namenode进入Standby状态
// 在这里会开启StandbyCheckpointer里面的
// checkpointer 线程,定时合并&处理images文件
state.prepareToEnterState(haContext);
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
// 出现异常,直接停止Namenode服务
this.stopAtException(e);
throw e;
} catch (HadoopIllegalArgumentException e) {
// 直接停止Namenode服务
this.stopAtException(e);
throw e;
}
this.started.set(true);
}
里面最重要的就是调用了初始化的方法 initialize(getConf())
初始化 initialize
initialize(getConf()) 是 namenode 的初始化方法。Namenode的一些重要服务的启动都是通过他来完成的
/**
* Initialize name-node.
*
* @param conf the configuration
*/
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS, intervals);
}
}
UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf);
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
//构造JvmPauseMonitor对象,并启动
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
//启动HTTP服务
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
// 初始化FSNamesystem
// NameNode将对文件系统的管理都委托给了FSNamesystem对象,
// NameNode会调用FSNamesystem.loadFromDisk()创建FSNamesystem对象。
//
// FSNamesystem.loadFromDisk()首先调用构造方法构造FSNamesystem对象,
// 然后将fsimage以及editlog文件加载到命名空间中。
loadNamesystem(conf);
startAliasMapServerIfNecessary(conf);
//创建RPC服务
rpcServer = createRpcServer(conf);
initReconfigurableBackoffKey();
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(getNameNodeAddress());
LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service.");
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
//启动httpServer以及 rpcServer
startCommonServices(conf);
//启动计时器定期将NameNode度量写入日志文件。此行为可由配置禁用。
startMetricsLogger(conf);
}
构建http服务:startHttpServer
其实就是构建一个 NameNodeHttpServer 对象,然后启动就可以了
private void startHttpServer(final Configuration conf) throws IOException {
httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
httpServer.start();
httpServer.setStartupProgress(startupProgress);
}
/**
* @see DFSUtil#getHttpPolicy(org.apache.hadoop.conf.Configuration)
* for information related to the different configuration options and
* Http Policy is decided.
*/
void start() throws IOException {
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
// 获取服务器host
final String infoHost = bindAddress.getHostName();
// 获取绑定地址
final InetSocketAddress httpAddr = bindAddress;
// 获取服务地址 0.0.0.0:9871
final String httpsAddrString = conf.getTrimmed(
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT);
// 构建网络InetSocketAddress服务:
InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);
if (httpsAddr != null) {
// 绑定地址 如果dfs.namenode.https-bind-host已绑定了地址的话,将会覆盖掉之前创建的
final String bindHost = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_HTTPS_BIND_HOST_KEY);
if (bindHost != null && !bindHost.isEmpty()) {
httpsAddr = new InetSocketAddress(bindHost, httpsAddr.getPort());
}
}
HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
httpAddr, httpsAddr, "hdfs",
DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
final boolean xFrameEnabled = conf.getBoolean(
DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED,
DFSConfigKeys.DFS_XFRAME_OPTION_ENABLED_DEFAULT);
final String xFrameOptionValue = conf.getTrimmed(
DFSConfigKeys.DFS_XFRAME_OPTION_VALUE,
DFSConfigKeys.DFS_XFRAME_OPTION_VALUE_DEFAULT);
builder.configureXFrame(xFrameEnabled).setXFrameOption(xFrameOptionValue);
// 构建http服务
httpServer = builder.build();
if (policy.isHttpsEnabled()) {
// assume same ssl port for all datanodes
InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.getTrimmed(
DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":"
+ DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT));
httpServer.setAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY,
datanodeSslPort.getPort());
}
initWebHdfs(conf, bindAddress.getHostName(), httpServer,
NamenodeWebHdfsMethods.class.getPackage().getName());
httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
setupServlets(httpServer, conf);
httpServer.start();
int connIdx = 0;
if (policy.isHttpEnabled()) {
httpAddress = httpServer.getConnectorAddress(connIdx++);
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
NetUtils.getHostPortString(httpAddress));
}
if (policy.isHttpsEnabled()) {
httpsAddress = httpServer.getConnectorAddress(connIdx);
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
NetUtils.getHostPortString(httpsAddress));
}
}
构建FSNamesystem
从磁盘中加载 FSNamesystem
protected void loadNamesystem(Configuration conf) throws IOException {
// 从磁盘中加载 FSNamesystem
this.namesystem = FSNamesystem.loadFromDisk(conf);
}
在这里,是调用 FSNamesystem.loadFromDisk(conf) 加载文件系统信息
/**
* 从配置中加载image和edits目录,实例化FSNamesystem
* Instantiates an FSNamesystem loaded from the image and edits
* directories specified in the passed Configuration.
*
* @param conf the Configuration which specifies the storage directories
* from which to load
* @return an FSNamesystem which contains the loaded namespace
* @throws IOException if loading fails
*/
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
checkConfiguration(conf);
// 构建FSImage
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf), //dfs.namenode.name.dir
FSNamesystem.getNamespaceEditsDirs(conf));
// FSNamesystem的构造方法比较长,但是逻辑很简单,主要是从配置文件中获取参数,
// 然后构造FSDirectory、BlockManager、SnapshotManager、CacheManagerSafeModeInfo等对象
// 需要注意的是,FSNamesystem的构造方法并不从磁盘上加载fsimage以及editlog文件,
// 这些操作是在创建FSNamesystem对象成功后,在loadFromDisk()中执行的
// 如果FSNamesystem初始化失败,则会调用FSNamesystem.close()方法
// 关闭FSNamesystem启动的所有服务
FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long loadStart = monotonicNow();
try {
// 加载fsimage以及editlog文件
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
LOG.warn("Encountered exception loading fsimage", ioe);
fsImage.close();
throw ioe;
}
long timeTakenToLoadFSImage = monotonicNow() - loadStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
if (nnMetrics != null) {
nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
}
namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
return namesystem;
}
首先是检查配置文件中的配置:checkConfiguration(conf)
根据配置文件中的配置信息如下
private static void checkConfiguration(Configuration conf)
throws IOException {
final Collection<URI> namespaceDirs =
FSNamesystem.getNamespaceDirs(conf);
final Collection<URI> editsDirs =
FSNamesystem.getNamespaceEditsDirs(conf);
final Collection<URI> requiredEditsDirs =
FSNamesystem.getRequiredNamespaceEditsDirs(conf);
final Collection<URI> sharedEditsDirs =
FSNamesystem.getSharedEditsDirs(conf);
for (URI u : requiredEditsDirs) {
if (u.toString().compareTo(
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_DEFAULT) == 0) {
continue;
}
FSImage 由下面的代码构建
// 构建 FSImage
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf), //dfs.namenode.name.dir
FSNamesystem.getNamespaceEditsDirs(conf));
FSImage 的初始化方法
protected FSImage(Configuration conf, Collection<URI> imageDirs, List<URI> editsDirs)
throws IOException{
this.conf = conf;
// 构建NNStorage,负责管理namenode使用的StorageDirectories
storage = new NNStorage(conf, imageDirs, editsDirs);
// dfs.namenode.name.dir.restore default: false
// 设置为true可使namenode尝试恢复以前失败的 dfs.namenode.name.dir
// 启用后,将在检查点期间尝试恢复任何失败的目录
if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)){
storage.setRestoreFailedStorage(true);
}
// 构建FSEditLog
this.editlog = FSEditLog.newInstance(conf, storage, editsDirs);
}
在这里,可以看到设置的 image 的存储目录和 edits 的存储目录
上述代码之后,就是构建 FSEditLog
this.editlog = FSEditLog.newInstance(conf, storage, editsDirs);
再回到 loadFromDisk 方法,到这里已经构建完 FSImage,接下来就是构建 FSNamesystem,加载 FSImage 以及editlog 文件
// 构建FSImage
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf), //dfs.namenode.name.dir
FSNamesystem.getNamespaceEditsDirs(conf));
// FSNamesystem的构造方法比较长,但是逻辑很简单,主要是从配置文件中获取参数,
// 然后构造FSDirectory、BlockManager、SnapshotManager、CacheManagerSafeModeInfo等对象
// 需要注意的是,FSNamesystem的构造方法并不从磁盘上加载fsimage以及editlog文件,
// 这些操作是在创建FSNamesystem对象成功后,在loadFromDisk()中执行的
// 如果FSNamesystem初始化失败,则会调用FSNamesystem.close()方法
// 关闭FSNamesystem启动的所有服务
FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long loadStart = monotonicNow();
try {
// 加载fsimage以及editlog文件
namesystem.loadFSImage(startOpt);
构建NameNodeRpcServer服务:startHttpServer
/**
* Create the RPC server implementation. Used as an extension point for the
* BackupNode.
*/
protected NameNodeRpcServer createRpcServer(Configuration conf)
throws IOException {
return new NameNodeRpcServer(conf, this);
}
创建 RPC 服务,这个供程序调用,常用的是 8020 端口
创建完 RPC 服务器,调用 startCommonServices 方法进行启动服务
//启动httpServer以及 rpcServer
startCommonServices(conf);
修改Namenode的状态为active
try {
haContext.writeLock();
// 初始化完成后,Namenode进入Standby状态
// 在这里会开启StandbyCheckpointer里面的
// checkpointer线程,定时合并&处理images文件
state.prepareToEnterState(haContext);
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}