Namenode 会定期将文件系统的命名空间(文件目录树、文件/目录元信息)保存到 fsimage 文件中,以防止Namenode 掉电或者进程崩溃。但如果 Namenode 实时地将内存中的元数据同步到 fsimage 文件中,将会非常消耗资源且造成 Namenode 运行缓慢。所以 Namenode 会先将命名空间的修改操作保存在 editlog 文件中, 然后定期合并 fsimage 和 editlog 文件

FSImage类主要实现了以下功能:

  • 保存命名空间:将当前时刻 Namenode 内存中的命名空间保存到 fsimage 文件中
  • 加载 fsimage 文件:将磁盘上 fsimage 文件中保存的命名空间加载到 Namenode 内存中,这个操作是保存命名空间操作的逆操作
  • 加载 editlog 文件:Namenode 加载了 fsimage 文件后,内存中只包含了命名空间在保存 fsimage 文件时的信息,Namenode 还需要加载后续对命名空间的修改操作,即 editlog 文件中记录的内容。所以 FSImage 类还提供了加载 editlog 文件到 Namenode 内存中的功能

构建和初始化

FSImage 是在 FSNamesystem 中的 loadFromDisk 方法进行初始化的

  1. static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
  2. checkConfiguration(conf);
  3. // 构建FSImage
  4. FSImage fsImage = new FSImage(conf,
  5. FSNamesystem.getNamespaceDirs(conf),
  6. FSNamesystem.getNamespaceEditsDirs(conf));
  7. FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
  8. StartupOption startOpt = NameNode.getStartupOption(conf);
  9. if (startOpt == StartupOption.RECOVER) {
  10. namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
  11. }
  12. long loadStart = monotonicNow();
  13. try {
  14. namesystem.loadFSImage(startOpt);
  15. } catch (IOException ioe) {
  16. LOG.warn("Encountered exception loading fsimage", ioe);
  17. fsImage.close();
  18. throw ioe;
  19. }
  20. long timeTakenToLoadFSImage = monotonicNow() - loadStart;
  21. LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
  22. NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
  23. if (nnMetrics != null) {
  24. nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
  25. }
  26. namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
  27. return namesystem;
  28. }
  1. protected FSImage(Configuration conf,
  2. Collection<URI> imageDirs,
  3. List<URI> editsDirs)
  4. throws IOException {
  5. this.conf = conf;
  6. //构建NNStorage ==> NNStorage负责管理NameNode使用的 StorageDirectories。
  7. storage = new NNStorage(conf, imageDirs, editsDirs);
  8. // dfs.namenode.name.dir.restore 默认: false
  9. // 设置为true可使NameNode尝试恢复以前失败的dfs.NameNode.name.dir。
  10. // 启用后,将在检查点期间尝试恢复任何失败的目录。
  11. if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
  12. DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
  13. storage.setRestoreFailedStorage(true);
  14. }
  15. // 构建 FSEditLog
  16. this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
  17. archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
  18. }

保存命名空间

FSImage 类最重要的功能之一就是将当前时刻 Namenode 的命名空间保存到 fsimage 文件中

SaveFSImageInAllDirs

Namenode 可以定义多个存储路径来保存 fsimage 文件,对于每一个存储路径,saveFSImageInAllDirs() 方法都会启动一个线程负责在这个路径上保存 fsimage 文件。同时,为了防止保存过程中出现错误,命名空间信息首先会被保存在一个 fsimage.ckpt 文件中,当保存操作全部完成之后,才会将 fsimage.ckpt 重命名为 fsimage 文件。 之后 saveFSImageInAllDirs() 方法会清理 Namenode 元数据存储文件夹中过期的 editlog 文件和 fsimage 文件

  1. /**
  2. *
  3. * @param source
  4. * @param nnf
  5. * @param txid
  6. * @param canceler
  7. * @throws IOException
  8. */
  9. private synchronized void saveFSImageInAllDirs(FSNamesystem source,
  10. NameNodeFile nnf, long txid, Canceler canceler) throws IOException {
  11. StartupProgress prog = NameNode.getStartupProgress();
  12. prog.beginPhase(Phase.SAVING_CHECKPOINT);
  13. if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
  14. throw new IOException("No image directories available!");
  15. }
  16. if (canceler == null) {
  17. canceler = new Canceler();
  18. }
  19. // 构造保存命名空间操作的上下文
  20. SaveNamespaceContext ctx = new SaveNamespaceContext(source, txid, canceler);
  21. try {
  22. // 在每一个保存路径上启动一个线程,该线程使用FSImageSaver类保存fsimage文件
  23. List<Thread> saveThreads = new ArrayList<Thread>();
  24. // save images into current
  25. for (Iterator<StorageDirectory> it = storage.dirIterator(NameNodeDirType.IMAGE);
  26. it.hasNext();) {
  27. StorageDirectory sd = it.next();
  28. // 命名空间具体的保存操作是由 FSImageSaver 这个类来承担的,
  29. // FSImageSaver是FSImage中的内部类,也是一个线程类,
  30. // 它的run()方法调用了saveFSImage()方法来保存fsimage文件。
  31. FSImageSaver saver = new FSImageSaver(ctx, sd, nnf);
  32. Thread saveThread = new Thread(saver, saver.toString());
  33. saveThreads.add(saveThread);
  34. saveThread.start();
  35. }
  36. // 等待所有线程执行完毕
  37. waitForThreads(saveThreads);
  38. saveThreads.clear();
  39. storage.reportErrorsOnDirectories(ctx.getErrorSDs());
  40. // 保存文件失败则抛出异常
  41. if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
  42. throw new IOException(
  43. "Failed to save in any storage directories while saving namespace.");
  44. }
  45. if (canceler.isCancelled()) {
  46. deleteCancelledCheckpoint(txid);
  47. ctx.checkCancelled(); // throws
  48. assert false : "should have thrown above!";
  49. }
  50. // 将fsimage.ckpt 改名为 fsimage
  51. renameCheckpoint(txid, NameNodeFile.IMAGE_NEW, nnf, false);
  52. // Since we now have a new checkpoint, we can clean up some
  53. // old edit logs and checkpoints.
  54. // Do not purge anything if we just wrote a corrupted FsImage.
  55. if (!exitAfterSave.get()) {
  56. // 我们已经完成了fsimage的保存,那么可以将存储上的一部分editlog和fsimage删除
  57. // 如果没有成功,则失败.
  58. purgeOldStorage(nnf);
  59. archivalManager.purgeCheckpoints(NameNodeFile.IMAGE_NEW);
  60. }
  61. } finally {
  62. // Notify any threads waiting on the checkpoint to be canceled
  63. // that it is complete.
  64. // 通知所有等待的线程
  65. ctx.markComplete();
  66. ctx = null;
  67. }
  68. prog.endPhase(Phase.SAVING_CHECKPOINT);
  69. }

FSImageSaver

命名空间具体的保存操作是由 FSImageSaver 这个类来承担的,FSImageSaver 是 FSImage 中的内部类,也是一个线程类,它的 run() 方法调用了 saveFSImage() 方法来保存 fsimage 文件

  1. private class FSImageSaver implements Runnable {
  2. private final SaveNamespaceContext context;
  3. private final StorageDirectory sd;
  4. private final NameNodeFile nnf;
  5. public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd,
  6. NameNodeFile nnf) {
  7. this.context = context;
  8. this.sd = sd;
  9. this.nnf = nnf;
  10. }
  11. @Override
  12. public void run() {
  13. // Deletes checkpoint file in every storage directory when shutdown.
  14. Runnable cancelCheckpointFinalizer = () -> {
  15. try {
  16. deleteCancelledCheckpoint(context.getTxId());
  17. LOG.info("FSImageSaver clean checkpoint: txid={} when meet " +
  18. "shutdown.", context.getTxId());
  19. } catch (IOException e) {
  20. LOG.error("FSImageSaver cancel checkpoint threw an exception:", e);
  21. }
  22. };
  23. ShutdownHookManager.get().addShutdownHook(cancelCheckpointFinalizer,
  24. SHUTDOWN_HOOK_PRIORITY);
  25. try {
  26. // 保存fsimage文件
  27. System.out.println("context : "+ context);
  28. System.out.println("sd : "+ sd);
  29. System.out.println("nnf : "+ nnf);
  30. saveFSImage(context, sd, nnf);
  31. } catch (SaveNamespaceCancelledException snce) {
  32. LOG.info("Cancelled image saving for " + sd.getRoot() +
  33. ": " + snce.getMessage());
  34. // don't report an error on the storage dir!
  35. } catch (Throwable t) {
  36. LOG.error("Unable to save image for " + sd.getRoot(), t);
  37. context.reportErrorOnStorageDirectory(sd);
  38. try {
  39. deleteCancelledCheckpoint(context.getTxId());
  40. LOG.info("FSImageSaver clean checkpoint: txid={} when meet " +
  41. "Throwable.", context.getTxId());
  42. } catch (IOException e) {
  43. LOG.error("FSImageSaver cancel checkpoint threw an exception:", e);
  44. }
  45. }
  46. }
  47. @Override
  48. public String toString() {
  49. return "FSImageSaver for " + sd.getRoot() +
  50. " of type " + sd.getStorageDirType();
  51. }
  52. }

SaveFSImage(context, sd, nnf)

SaveFSImage() 方法会使用一个 FSImageFormat.Saver 对象来完成保存操作,FSImageFormat.Saver 类会以fsimage 文件定义的格式保存 Namenode 的命名空间信息,需要注意命名空空间信息会先写入 fsimage.ckpt 文件中。saveFSImage() 方法还会生成 fsimage 文件的 md5 校验文件,以确保 fsimage 文件的正确性

  1. /**
  2. * 将 FS image 的内容保存到文件中。
  3. * Save the contents of the FS image to the file.
  4. */
  5. void saveFSImage(SaveNamespaceContext context, StorageDirectory sd,
  6. NameNodeFile dstType) throws IOException {
  7. // 获取当前命名空间中记录的最新事务的txid
  8. long txid = context.getTxId();
  9. // fsimage文件
  10. File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
  11. File dstFile = NNStorage.getStorageFile(sd, dstType, txid);
  12. // FSImageFormatProtobuf.Saver类负责保存fsimage
  13. FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
  14. // 压缩类
  15. FSImageCompression compression = FSImageCompression.createCompression(conf);
  16. // 调用Saver类保存fsimage文件
  17. long numErrors = saver.save(newFile, compression);
  18. if (numErrors > 0) {
  19. // The image is likely corrupted.
  20. LOG.error("Detected " + numErrors + " errors while saving FsImage " +
  21. dstFile);
  22. exitAfterSave.set(true);
  23. }
  24. // 保存MD5校验值
  25. MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
  26. storage.setMostRecentCheckpointInfo(txid, Time.now());
  27. }

saveFSImage() 方法构造了一个 FSImageFormatProtobuf.Saver 对象来保存命名空间,FSImageFormatProtobuf是一个工具类,它提供了以 protobuf 格式读取和写入 fsimage 文件的方法

  1. /**
  2. * save()方法会打开fsimage文件的输出流并且获得文件通道,
  3. * 然后调用saveInternal()方法将命名空间保存到fsimage文件中。
  4. *
  5. * @return number of non-fatal errors detected while writing the image.
  6. * @throws IOException on fatal error.
  7. */
  8. long save(File file, FSImageCompression compression) throws IOException {
  9. FileOutputStream fout = new FileOutputStream(file);
  10. fileChannel = fout.getChannel();
  11. try {
  12. LOG.info("Saving image file {} using {}", file, compression);
  13. long startTime = monotonicNow();
  14. // 保存到fsimage文件
  15. long numErrors = saveInternal( fout, compression, file.getAbsolutePath());
  16. LOG.info("Image file {} of size {} bytes saved in {} seconds {}.", file,
  17. file.length(), (monotonicNow() - startTime) / 1000,
  18. (numErrors > 0 ? (" with" + numErrors + " errors") : ""));
  19. return numErrors;
  20. } finally {
  21. fout.close();
  22. }
  23. }

FSImageFormatProtobuf.Saver

使用 protobuf 定义的 fsimage 文件的格式,它包括了4个部分的信息

  • MAGIC:fsimage 的文件头,是“HDFSIMG1”这个字符串的二进制形式,MAGIC 头标识了当前 fsimage 文件是使用 protobuf 格式序列化的。 FSImage 类在读取 fsimage 文件时,会先判断 fsimage 文件是否包含了MAGIC头,如果包含了则使用 protobuf 格式反序列化 fsimage 文件

  • SECTIONS:fsimage 文件会将同一类型的 Namenode 元信息保存在一个 section 中,例如将文件系统元信息保存在 NameSystemSection 中,将文件系统目录树中的所有 INode 信息保存在 INodeSection 中,将快照信息保存在 SnapshotSection 中等。fsimage 文件的第二个部分就是 Namenode 各类元信息对应的所有section,每类 section 中都包含了对应 Namenode 元信息的属性

  • FileSummary:FileSummary 记录了 fsimage 文件的元信息,以及 fsimage 文件保存的所有 section 的信息。 FileSummary 中的 ondiskVersion 字段记录了 fsimage 文件的版本号,layoutVersion 字段记录了当前HDFS 的文件系统布局版本号,codec 字段记录了 fsimage 文件的压缩编码,sections 字段则记录了 fsimage文件中各个 section 字段的元信息,每个 fsimage 文件中记录的 section 在 FileSummary 中都有一个与之对应的 section 字段。FileSummary 的 section 字段记录了对应的 fsimage 中 section 的名称、在 fsimage 文件中的长度,以及这个 section 在 fsimage 中的起始位置。FSImage 类在读取 fsimage 文件时,会先从 fsimage中读取出 FileSummary 部分,然后利用 FileSummary 记录的元信息指导 fsimage 文件的反序列化操作

  • FileSummaryLength:FileSummaryLength 记录了 FileSummary 在 fsimage 文件中所占的长度, FSImage 类在读取 fsimage 文件时,会首先读取 FileSummaryLength 获取 FileSummary 部分的长度, 然后根据这个长度从 fsimage 中反序列化出 FileSummary

FSImageFormatProtobuf.Saver 类就是以 protobuf 格式将 Namenode 的命名空间保存至 fsimage 文件的工具类。这个类的入口方法是 save() 方法。save() 方法会打开 fsimage 文件的输出流并且获得文件通道,然后调用saveInternal() 方法将命名空间保存到 fsimage 文件中

SaveInternal

saveInternal() 方法首先构造底层 fsimage 文件的输出流,构造 fsimage 文件的描述类 FileSummary,然后在FileSummary 中记录 ondiskVersion、layoutVersion、codec等信息

接下来 saveInternal() 方法依次向 fsimage 文件中写入命名空间信息、inode信息、快照信息、安全信息、缓存信息、StringTable信息等。上述信息都是以 section 为单位写入

saveInternal() 方法以 section 为单位写入元数据信息时,还会在 FileSummary 中记录这个 section 的长度,以及section 在 fsimage 文件中的起始位置等信息。当完成了所有 section 的写入后,FileSummary 对象也就构造完毕了。saveInternal() 最后会将 FileSummary 对象写入 fsimage 文件中

  1. private long saveInternal(FileOutputStream fout, FSImageCompression compression,
  2. String filePath) throws IOException {
  3. StartupProgress prog = NameNode.getStartupProgress();
  4. // 构造输出流,一边写入数据,一边写入校验值
  5. MessageDigest digester = MD5Hash.getDigester();
  6. int layoutVersion = context.getSourceNamesystem().getEffectiveLayoutVersion();
  7. underlyingOutputStream = new DigestOutputStream(new BufferedOutputStream(fout),
  8. digester);
  9. underlyingOutputStream.write(FSImageUtil.MAGIC_HEADER);
  10. fileChannel = fout.getChannel();
  11. // FileSummary为fsimage文件的描述部分,也是protobuf定义的
  12. FileSummary.Builder b = FileSummary.newBuilder()
  13. .setOndiskVersion(FSImageUtil.FILE_VERSION)
  14. .setLayoutVersion(
  15. context.getSourceNamesystem().getEffectiveLayoutVersion());
  16. // 获取压缩格式,并装饰输出流
  17. codec = compression.getImageCodec();
  18. if (codec != null) {
  19. b.setCodec(codec.getClass().getCanonicalName());
  20. sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
  21. } else {
  22. sectionOutputStream = underlyingOutputStream;
  23. }
  24. //保存命名空间信息
  25. saveNameSystemSection(b);
  26. // Check for cancellation right after serializing the name system section.
  27. // Some unit tests, such as TestSaveNamespace#testCancelSaveNameSpace
  28. // depends on this behavior.
  29. // 检查是否取消了保存操作
  30. context.checkCancelled();
  31. Step step;
  32. // Erasure coding policies should be saved before inodes
  33. if (NameNodeLayoutVersion.supports(
  34. NameNodeLayoutVersion.Feature.ERASURE_CODING, layoutVersion)) {
  35. step = new Step(StepType.ERASURE_CODING_POLICIES, filePath);
  36. prog.beginStep(Phase.SAVING_CHECKPOINT, step);
  37. //保存 ErasureCoding 信息
  38. saveErasureCodingSection(b);
  39. prog.endStep(Phase.SAVING_CHECKPOINT, step);
  40. }
  41. //保存命名空间中的inode信息
  42. step = new Step(StepType.INODES, filePath);
  43. prog.beginStep(Phase.SAVING_CHECKPOINT, step);
  44. // Count number of non-fatal errors when saving inodes and snapshots.
  45. // 保存命名空间中的inode信息
  46. long numErrors = saveInodes(b);
  47. // 保存快照信息
  48. numErrors += saveSnapshots(b);
  49. prog.endStep(Phase.SAVING_CHECKPOINT, step);
  50. // 保存安全信息
  51. step = new Step(StepType.DELEGATION_TOKENS, filePath);
  52. prog.beginStep(Phase.SAVING_CHECKPOINT, step);
  53. saveSecretManagerSection(b);
  54. prog.endStep(Phase.SAVING_CHECKPOINT, step);
  55. // 保存缓存信息
  56. step = new Step(StepType.CACHE_POOLS, filePath);
  57. prog.beginStep(Phase.SAVING_CHECKPOINT, step);
  58. saveCacheManagerSection(b);
  59. prog.endStep(Phase.SAVING_CHECKPOINT, step);
  60. // 保存StringTable
  61. saveStringTableSection(b);
  62. // We use the underlyingOutputStream to write the header. Therefore flush
  63. // the buffered stream (which is potentially compressed) first.
  64. // flush输出流
  65. flushSectionOutputStream();
  66. FileSummary summary = b.build();
  67. //将FileSummary写入文件
  68. saveFileSummary(underlyingOutputStream, summary);
  69. //关闭底层输出流
  70. underlyingOutputStream.close();
  71. savedDigest = new MD5Hash(digester.digest());
  72. return numErrors;
  73. }

写入文件头

fsimage 的文件头,是“HDFSIMG1”这个字符串的二进制形式。MAGIC 头标识了当前 fsimage 文件是使用protobuf 格式序列化的

  1. private long saveInternal(FileOutputStream fout, FSImageCompression compression,
  2. String filePath) throws IOException {
  3. StartupProgress prog = NameNode.getStartupProgress();
  4. // 构造输出流,一边写入数据,一边写入校验值
  5. MessageDigest digester = MD5Hash.getDigester();
  6. int layoutVersion = context.getSourceNamesystem().getEffectiveLayoutVersion();
  7. // 构造underlyingOutputStream
  8. underlyingOutputStream = new DigestOutputStream(new BufferedOutputStream(fout),
  9. digester);
  10. // 写入头信息
  11. underlyingOutputStream.write(FSImageUtil.MAGIC_HEADER);
  12. // 打开文件通道
  13. fileChannel = fout.getChannel();

构建FileSummary对象

  1. // FileSummary为fsimage文件的描述部分,也是protobuf定义的
  2. FileSummary.Builder b = FileSummary.newBuilder()
  3. .setOndiskVersion(FSImageUtil.FILE_VERSION)
  4. .setLayoutVersion(
  5. context.getSourceNamesystem().getEffectiveLayoutVersion());

处理数据的压缩方式

  1. // 获取压缩格式,并装饰输出流
  2. codec = compression.getImageCodec();
  3. if (codec != null) {
  4. b.setCodec(codec.getClass().getCanonicalName());
  5. sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
  6. } else {
  7. sectionOutputStream = underlyingOutputStream;
  8. }

Save

SaveInternal() 方法调用了多个 save*() 方法来记录不同 section 的元数据信息,这些方法除了在 fsimage 文件中写入对应种类的元数据信息外,还会在 FileSummary 中记录 section 的大小,以及在fsimage中的起始位置

以 saveINodes() 方法举例,该方法构造了一个 FSImageFormatPBINode.Saver 对象,并调用这个对象对应的方法保存文件系统目录树中的 INode 信息、INodeDirectory 信息,以及处于构建状态的文件信息

  1. private long saveInodes(FileSummary.Builder summary) throws IOException {
  2. FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver(this,summary);
  3. // 保存INode信息是由FSImageFormatPBINode.Saver.serializeINodeSection()方法实现的
  4. saver.serializeINodeSection(sectionOutputStream);
  5. // 保存info目录信息
  6. saver.serializeINodeDirectorySection(sectionOutputStream);
  7. // 租约管理
  8. saver.serializeFilesUCSection(sectionOutputStream);
  9. return saver.getNumImageErrors();
  10. }

serializeINodeSection() 方法会首先构造一个 INodeSection 对象,记录文件系统目录树中保存的最后一个 inode的 inodeid,以及命名空间中所有 inode 的个数。之后迭代处理将所有 inode 信息写入 fsimage 文件中,最后将INodeSection 的属性信息记录在 FileSummary 中

  1. void serializeINodeSection(OutputStream out) throws IOException {
  2. INodeMap inodesMap = fsn.dir.getINodeMap();
  3. // 构造一个INodeSection,保存最后一个inode的inodeid,以及这个命名空间中所有inode的个数
  4. INodeSection.Builder b = INodeSection.newBuilder()
  5. .setLastInodeId(fsn.dir.getLastInodeId()).setNumInodes(inodesMap.size());
  6. INodeSection s = b.build();
  7. // 序列化至输出流
  8. s.writeDelimitedTo(out);
  9. int i = 0;
  10. // 迭代处理inodeMap中所有的inode,调用save()方法将inode信息保存到fsimage中
  11. Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
  12. while (iter.hasNext()) {
  13. INodeWithAdditionalFields n = iter.next();
  14. // 将所有inode信息写入fsimage文件中
  15. save(out, n);
  16. ++i;
  17. if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
  18. context.checkCancelled();
  19. }
  20. }
  21. // 调用commitSection()方法在FileSummary中写入inode section
  22. parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
  23. }

save() 方法首先将当前 INode 对象分为目录、文件以及符号链接等几类,然后调用各个类型对应的 save() 重载方法。重载方法的实现也非常简单,就是构造不同的 protobuf Builder 类,然后设置相应字段的值,并将序列化之后的对象写入 fsimage 文件的输出流

以INodeFile为例,首先构造 protobuf Builder 类,INodeSection.INodeFile.Builder,然后设置blocks,也就是当前文件有哪些数据块,如果当前的 INodeFile 处于构建状态,则设置对应的构建信息。最后将序列化后的 inode 信息写入输出流中

  1. private void save(OutputStream out, INode n) throws IOException {
  2. if (n.isDirectory()) {
  3. save(out, n.asDirectory());
  4. } else if (n.isFile()) {
  5. save(out, n.asFile());
  6. } else if (n.isSymlink()) {
  7. save(out, n.asSymlink());
  8. }
  9. }
  10. private void save(OutputStream out, INodeDirectory n) throws IOException {
  11. INodeSection.INodeDirectory.Builder b = buildINodeDirectory(n,
  12. parent.getSaverContext());
  13. INodeSection.INode r = buildINodeCommon(n)
  14. .setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
  15. r.writeDelimitedTo(out);
  16. }
  17. // 首先构造protobuf Builder类——INodeSection.INodeFile.Builder,
  18. // 然后设置blocks——也就是当前文件有哪些数据块,
  19. // 如果当前的INodeFile处于构建状态, 则设置对应的构建信息。
  20. //
  21. // 最后将序列化后的inode信息写入输出流中。
  22. private void save(OutputStream out, INodeFile n) throws IOException {
  23. INodeSection.INodeFile.Builder b = buildINodeFile(n,
  24. parent.getSaverContext());
  25. BlockInfo[] blocks = n.getBlocks();
  26. if (blocks != null) {
  27. for (Block block : n.getBlocks()) {
  28. b.addBlocks(PBHelperClient.convert(block));
  29. }
  30. }
  31. FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
  32. if (uc != null) {
  33. INodeSection.FileUnderConstructionFeature f =
  34. INodeSection.FileUnderConstructionFeature
  35. .newBuilder().setClientName(uc.getClientName())
  36. .setClientMachine(uc.getClientMachine()).build();
  37. b.setFileUC(f);
  38. }
  39. INodeSection.INode r = buildINodeCommon(n)
  40. .setType(INodeSection.INode.Type.FILE).setFile(b).build();
  41. r.writeDelimitedTo(out);
  42. }
  43. private void save(OutputStream out, INodeSymlink n) throws IOException {
  44. INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
  45. .newBuilder()
  46. .setPermission(buildPermissionStatus(n))
  47. .setTarget(ByteString.copyFrom(n.getSymlink()))
  48. .setModificationTime(n.getModificationTime())
  49. .setAccessTime(n.getAccessTime());
  50. INodeSection.INode r = buildINodeCommon(n)
  51. .setType(INodeSection.INode.Type.SYMLINK).setSymlink(b).build();
  52. r.writeDelimitedTo(out);
  53. }


将FileSummary写入文件

  1. private static void saveFileSummary(OutputStream out, FileSummary summary)
  2. throws IOException {
  3. summary.writeDelimitedTo(out);
  4. int length = getOndiskTrunkSize(summary);
  5. byte[] lengthBytes = new byte[4];
  6. ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
  7. out.write(lengthBytes);
  8. }

FSImage.loadFSImage

当 Namenode 启动时,首先会将 fsimage 文件中记录的命名空间加载到 Namenode 内存中,然后再一条一条地将 editlog 文件中记录的更新操作加载并合并到命名空间中
20200927004349193.png
Namenode 会等待各个 Datanode 向自己汇报数据块信息来组装 blockMap,从而离开安全模式。Namenode 每次启动时都会调用 FSImage.loadFSImage() 方法执行加载 fsimage 和 editlog 文件的操作

这里需要关注两个地方,一个是加载 FSImage 文件 fsImage.recoverTransitionRead(startOpt, this, recovery); 另外一个是根据条件判断是否要合并edits文件——间隔周期1小时或者 edits 数量操作100w

loadFSImage方法,其实就是加载最后一个fsimage文件。加载分两步:

  • 获取加载器:FSImageFormat.LoaderDelegator
  • 加载文件:loader.load(curFile, requireSameLayoutVersion)

    1. private void loadFSImage(File curFile, MD5Hash expectedMd5,
    2. FSNamesystem target, MetaRecoveryContext recovery,
    3. boolean requireSameLayoutVersion) throws IOException {
    4. // BlockPoolId is required when the FsImageLoader loads the rolling upgrade
    5. // information. Make sure the ID is properly set.
    6. target.setBlockPoolId(this.getBlockPoolID());
    7. // 获取加载器 FSImageFormat.LoaderDelegator
    8. FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
    9. // 加载文件
    10. loader.load(curFile, requireSameLayoutVersion);
    11. // Check that the image digest we loaded matches up with what we expected
    12. MD5Hash readImageMd5 = loader.getLoadedImageMd5();
    13. if (expectedMd5 != null &&
    14. !expectedMd5.equals(readImageMd5)) {
    15. throw new IOException("Image file " + curFile +
    16. " is corrupt with MD5 checksum of " + readImageMd5 +
    17. " but expecting " + expectedMd5);
    18. }
    19. long txId = loader.getLoadedImageTxId();
    20. LOG.info("Loaded image for txid " + txId + " from " + curFile);
    21. lastAppliedTxId = txId;
    22. storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
    23. }
    1. public void load(File file, boolean requireSameLayoutVersion)
    2. throws IOException {
    3. Preconditions.checkState(impl == null, "Image already loaded!");
    4. FileInputStream is = null;
    5. try {
    6. is = new FileInputStream(file);
    7. byte[] magic = new byte[FSImageUtil.MAGIC_HEADER.length];
    8. IOUtils.readFully(is, magic, 0, magic.length);
    9. // fsimage文件中包括 magicHeader,使用的是protobuf序列化方式
    10. if (Arrays.equals(magic, FSImageUtil.MAGIC_HEADER)) {
    11. // 构造FSImageFormatProtobuf.Loader加载fsimage文件
    12. FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
    13. conf, fsn, requireSameLayoutVersion);
    14. impl = loader;
    15. loader.load(file);
    16. } else {
    17. // 否则构造FSImageFormat.Loader加载fsimage文件
    18. Loader loader = new Loader(conf, fsn);
    19. impl = loader;
    20. loader.load(file);
    21. }
    22. } finally {
    23. IOUtils.cleanupWithLogger(LOG, is);
    24. }
    25. }

    这里面是构造 FSImageFormatProtobuf.Loader 对象,使用它的 load 方法加载 fsimage 文件。在 load 方法中最终调用了 loadInternal(raFile, fin); 方法。这个方法是加载 fsimage 文件的相对最底层的方法了

在加载 fsimage 操作中,最终会调用 FSImageFormatProtobuf.Loader 作为 fsimage 文件的加载类FSImageFormatProtobuf.Loader.loadInternal() 方法执行了加载 fsimage 文件的操作,loadInternal() 方法会打开fsimage 文件通道,然后读取 fsimage 文件中的 FileSummary 对象,FileSummary 对象中记录了 fsimage 中保存的所有section的信息。loadInternal() 会对 FileSummary 对象中保存的 section 排序, 然后遍历每个section并调用对应的方法从 fsimage 文件中加载这个 section
20200927005049903.png

  1. // loadInternal()方法会打开fsimage文件通道,
  2. // 然后读取fsimage文件中的FileSummary对象,
  3. // FileSummary对象中记录了fsimage中保存的所有section的信息。
  4. // loadInternal()会对FileSummary对象中保存的section排序,
  5. // 然后遍历每个section并调用对应的方法从fsimage文件中加载这个section。
  6. private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
  7. throws IOException {
  8. if (!FSImageUtil.checkFileFormat(raFile)) {
  9. throw new IOException("Unrecognized file format");
  10. }
  11. // 从fsimage文件末尾加载FileSummary,也就是fsimage文件内容的描述
  12. FileSummary summary = FSImageUtil.loadSummary(raFile);
  13. if (requireSameLayoutVersion && summary.getLayoutVersion() !=
  14. HdfsServerConstants.NAMENODE_LAYOUT_VERSION) {
  15. throw new IOException("Image version " + summary.getLayoutVersion() +
  16. " is not equal to the software version " +
  17. HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
  18. }
  19. // 获取通道
  20. FileChannel channel = fin.getChannel();
  21. // 构造FSImageFormatPBINode.Loader和FSImageFormatPBSnapshot.
  22. // Loader加载INode以及Snapshot
  23. FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
  24. fsn, this);
  25. FSImageFormatPBSnapshot.Loader snapshotLoader = new
  26. FSImageFormatPBSnapshot.Loader(fsn, this);
  27. // 对fsimage文件描述中记录的sections进行排序
  28. ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
  29. .getSectionsList());
  30. Collections.sort(sections, new Comparator<FileSummary.Section>() {
  31. @Override
  32. public int compare(FileSummary.Section s1, FileSummary.Section s2) {
  33. SectionName n1 = SectionName.fromString(s1.getName());
  34. SectionName n2 = SectionName.fromString(s2.getName());
  35. if (n1 == null) {
  36. return n2 == null ? 0 : -1;
  37. } else if (n2 == null) {
  38. return -1;
  39. } else {
  40. return n1.ordinal() - n2.ordinal();
  41. }
  42. }
  43. });
  44. StartupProgress prog = NameNode.getStartupProgress();
  45. /**
  46. * beginStep() and the endStep() calls do not match the boundary of the
  47. * sections. This is because that the current implementation only allows
  48. * a particular step to be started for once.
  49. */
  50. Step currentStep = null;
  51. // 遍历每个section,并调用对应的方法加载这个 section
  52. for (FileSummary.Section s : sections) {
  53. // 在通道中定位这个 section 的起始位置
  54. channel.position(s.getOffset());
  55. InputStream in = new BufferedInputStream(new LimitInputStream(fin,
  56. s.getLength()));
  57. in = FSImageUtil.wrapInputStreamForCompression(conf,
  58. summary.getCodec(), in);
  59. String n = s.getName();
  60. // 调用对应的方法加载不同的section
  61. switch (SectionName.fromString(n)) {
  62. case NS_INFO:
  63. loadNameSystemSection(in);
  64. break;
  65. case STRING_TABLE:
  66. loadStringTableSection(in);
  67. break;
  68. case INODE: {
  69. currentStep = new Step(StepType.INODES);
  70. prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
  71. inodeLoader.loadINodeSection(in, prog, currentStep);
  72. }
  73. break;
  74. case INODE_REFERENCE:
  75. snapshotLoader.loadINodeReferenceSection(in);
  76. break;
  77. case INODE_DIR:
  78. inodeLoader.loadINodeDirectorySection(in);
  79. break;
  80. case FILES_UNDERCONSTRUCTION:
  81. inodeLoader.loadFilesUnderConstructionSection(in);
  82. break;
  83. case SNAPSHOT:
  84. snapshotLoader.loadSnapshotSection(in);
  85. break;
  86. case SNAPSHOT_DIFF:
  87. snapshotLoader.loadSnapshotDiffSection(in);
  88. break;
  89. case SECRET_MANAGER: {
  90. prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
  91. Step step = new Step(StepType.DELEGATION_TOKENS);
  92. prog.beginStep(Phase.LOADING_FSIMAGE, step);
  93. loadSecretManagerSection(in, prog, step);
  94. prog.endStep(Phase.LOADING_FSIMAGE, step);
  95. }
  96. break;
  97. case CACHE_MANAGER: {
  98. Step step = new Step(StepType.CACHE_POOLS);
  99. prog.beginStep(Phase.LOADING_FSIMAGE, step);
  100. loadCacheManagerSection(in, prog, step);
  101. prog.endStep(Phase.LOADING_FSIMAGE, step);
  102. }
  103. break;
  104. case ERASURE_CODING:
  105. Step step = new Step(StepType.ERASURE_CODING_POLICIES);
  106. prog.beginStep(Phase.LOADING_FSIMAGE, step);
  107. loadErasureCodingSection(in);
  108. prog.endStep(Phase.LOADING_FSIMAGE, step);
  109. break;
  110. default:
  111. LOG.warn("Unrecognized section {}", n);
  112. break;
  113. }
  114. }
  115. }

对于不同类型的 section,loadInternal() 方法会调用不同的方法加载这个 section,例如对于 INodeSection 会调用 InodeLoader.loadINodeSection() 方法加载。load*() 方法的实现都比较简单,就是按照 protobuf 格式加载不同的 section。慢慢恢复/构建 FSNamesystem 对象中的内容

  1. void loadINodeSection(InputStream in, StartupProgress prog,
  2. Step currentStep) throws IOException {
  3. INodeSection s = INodeSection.parseDelimitedFrom(in);
  4. fsn.dir.resetLastInodeId(s.getLastInodeId());
  5. long numInodes = s.getNumInodes();
  6. LOG.info("Loading " + numInodes + " INodes.");
  7. prog.setTotal(Phase.LOADING_FSIMAGE, currentStep, numInodes);
  8. Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, currentStep);
  9. for (int i = 0; i < numInodes; ++i) {
  10. INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
  11. if (p.getId() == INodeId.ROOT_INODE_ID) {
  12. // 加载root
  13. loadRootINode(p);
  14. } else {
  15. // 加载子节点
  16. INode n = loadINode(p);
  17. dir.addToInodeMap(n);
  18. }
  19. counter.increment();
  20. }
  21. }
  1. // 构建目录体系
  2. void loadINodeDirectorySection(InputStream in) throws IOException {
  3. final List<INodeReference> refList = parent.getLoaderContext()
  4. .getRefList();
  5. while (true) {
  6. INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
  7. .parseDelimitedFrom(in);
  8. // note that in is a LimitedInputStream
  9. if (e == null) {
  10. break;
  11. }
  12. INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
  13. for (long id : e.getChildrenList()) {
  14. INode child = dir.getInode(id);
  15. addToParent(p, child);
  16. }
  17. for (int refId : e.getRefChildrenList()) {
  18. INodeReference ref = refList.get(refId);
  19. addToParent(p, ref);
  20. }
  21. }
  22. }

加载Edits文件

FSImage.loadEdits() 方法会构造一个 FSEditLogLoader 对象。然后遍历 Namenode 所有存储路径上保存的 editlog 文件的输入流并调用 FSEditLogLoader.loadFSEdits() 方法加载指定路径上的 editlog 文件

  1. public long loadEdits(Iterable<EditLogInputStream> editStreams,
  2. FSNamesystem target, long maxTxnsToRead,
  3. StartupOption startOpt, MetaRecoveryContext recovery)
  4. throws IOException {
  5. LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
  6. StartupProgress prog = NameNode.getStartupProgress();
  7. prog.beginPhase(Phase.LOADING_EDITS);
  8. //记录命名空间中加载的最新的事务id
  9. long prevLastAppliedTxId = lastAppliedTxId;
  10. long remainingReadTxns = maxTxnsToRead;
  11. try {
  12. //构造FSEditLogLoader对象用于加栽editlog文件
  13. FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
  14. //遍历所有存储路径上editlog文件对应的输入流
  15. // Load latest edits
  16. for (EditLogInputStream editIn : editStreams) {
  17. LogAction logAction = loadEditLogHelper.record();
  18. if (logAction.shouldLog()) {
  19. String logSuppressed = "";
  20. if (logAction.getCount() > 1) {
  21. logSuppressed = "; suppressed logging for " +
  22. (logAction.getCount() - 1) + " edit reads";
  23. }
  24. LOG.info("Reading " + editIn + " expecting start txid #" +
  25. (lastAppliedTxId + 1) + logSuppressed);
  26. }
  27. try {
  28. //调用FSEditLogLoader.loadFSEdits()从某个存储路径上的editlog文件加载修改操作
  29. remainingReadTxns -= loader.loadFSEdits(editIn, lastAppliedTxId + 1,
  30. remainingReadTxns, startOpt, recovery);
  31. } finally {
  32. // lastAppliedTxId记录从editlog加载的最新的事务id
  33. // Update lastAppliedTxId even in case of error, since some ops may
  34. // have been successfully applied before the error.
  35. lastAppliedTxId = loader.getLastAppliedTxId();
  36. }
  37. // If we are in recovery mode, we may have skipped over some txids.
  38. if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID
  39. && recovery != null) {
  40. lastAppliedTxId = editIn.getLastTxId();
  41. }
  42. if (remainingReadTxns <= 0) {
  43. break;
  44. }
  45. }
  46. } finally {
  47. //关闭所有editlog文件的输入流
  48. FSEditLog.closeAllStreams(editStreams);
  49. }
  50. prog.endPhase(Phase.LOADING_EDITS);
  51. return lastAppliedTxId - prevLastAppliedTxId;
  52. }