FSEditLog.jpeg
在 Namenode 中,命名空间,也就是文件系统中的目录树、 文件元数据等信息,是被全部缓存在内存中的, 一旦 Namenode 重启或者宕机,内存中的所有数据将会全部丢失,所以必须要有一种机制能够将整个命名空间持久化保存, 并且能在 Namenode 重启时重建命名空间

目前 Namenode 的实现是将命名空间信息记录在一个叫作 fsimage(命名空间镜像)的二进制文件中, fsimage将文件系统目录树中的每个文件或者目录的信息保存为一条记录, 每条记录中包括了该文件( 或目录)的名称、大小、用户、用户组、修改时间、创建时间等信息

Namenode 重启时,会读取这个 fsimage 文件来重构命名空间。但是 fsimage 始终是磁盘上的一个文件,不可能时时刻刻都跟 Namenode 内存中的数据结构保持同步,并且 fsimage 文件一般都很大,GB级别的很常见, 如果所有的更新操作都实时地写 fsimage 文件,则会导致Namenode运行得十分缓慢, 所以HDFS每过一段时间才更新一次fsimage文件

HDFS 将这些操作记录在editlog(编辑日志)文件中,editlog 是一个日志文件,HDFS 客户端执行的所有操作首先会被记录到editlog文件中。HDFS 会定期地将 editlog 文件与 fsimage 文件进行合并,以保持 fsimage 跟Namenode 内存中记录的命名空间完全同步

在 HDFS 源码中,使用 FSEditLog 类来管理 editlog 文件。和 fsimage 文件不同,editlog 文件会随着 Namenode的运行实时更新,所以 FSEditLog 类的实现依赖于底层的输入流和输出流,同时 FSEditLog 类还需要对外提供大量的 log 方法用于记录命名空间的修改操作
20200910223643964.png

TransactionId机制

image.png
TransactionId 与客户端每次发起的 RPC 操作相关,当客户端发起一次 RPC 请求对 Namenode 的命名空间修改后,Namenode 就会在 editlog 中发起一个新的 transaction 用于记录这次操作,每个 transaction 会用一个唯一的 transactionId 标识

  • edits_start transaction id——end transaction id:edits 文件就是 editlog 文件,edits 文件中存放的是客户端执行的所有更新命名空间的操作。每个 edits 文件都包含了文件名中start trancsaction id - end transaction id之间的所有事务

    • 比如 edits_0000000000000000001-0000000000000000006,这个文件记录了 transaction id 在1和6之间的所有事务
  • edits_inprogress_start transaction id:正在进行处理的 editlog。所有从 start transaction id 开始的新的修改操作都会记录在这个文件中,直到HDFS重置这个日志文件。重置操作会将 inprogress 文件关闭,并将inprogress 文件改名为正常的 editlog 文件(如上一项所示),同时还会打开一个新的 inprogress 文件, 记录正在进行的事务

    • 例如 edits_inprogress_0000000000000000478 文件,这个文件记录了所有 transaction id 大于478的新开始的事务,我们将这个事务区间称为一个日志段落(segment)
    • Namenode 元数据文件夹中存在这个文件有两种可能:要么是 Active Namenode 正在写入数据, 要么是前一个 Namenode 没有正确地关闭
  • fsimage_end transaction id:fsimage 文件是 Hadoop 文件系统元数据的一个永久性的检查点,包含Hadoop文件系统中 end transaction id 前的完整的 HDFS 命名空间元数据镜像,也就是 HDFS 所有目录和文件对应的INode 的序列化信息

    • fsimage_0000000000000000473就是fsimage_0000000000000000472与edits_0000000000000000473-0000000000000000473合并后的镜像文件,保存了 transaction id 小于473的 HDFS 命名空间的元数据。 每个 fsimage 文件还有一个对应的md5文件,用来确保 fsimage 文件的正确性,以防止磁盘异常发生
  • seen_txid:这个文件中保存了上一个检查点(checkpoint),即合并 edits 和 fsimage 文件,以及编辑日志重置(editlog roll)(持久化当前的 inprogress 文件并且创建一个新的 inprogress 文件)时最新的事务id (transaction id)。要特别注意的是,这个事务id并不是 Namenode 内存中最新的事务id, 因为 seen_txid只在检查点操作以及编辑日志重置操作时更新。这个文件的作用在于 Namenode 启动时, 可以利用这个文件判断是否有edits文件丢失

    • Namenode 使用不同的目录保存 fsimage 以及 edits 文件,如果保存 edits 的目录内容丢失, Namenode 将会使用上一个检查点保存的 fsimage 启动,那么上一个检查点之后的所有事务都会丢失。 为了防止发生这种状况,Namenode 启动时会检查 seen_txid 并确保内存中加载的事务 id 至少超过seen_txid;否则Namenode将终止启动操作 ```java /**
    • TransactionId与客户端每次发起的RPC操作相关,
    • 当客户端发起一次RPC请求对Namenode的命名空间修改后,
    • Namenode就会在editlog中发起一个新的transaction用于记录这次操作,
    • 每个transaction会用一个唯一的transactionId标识。 / private static class TransactionId { public long txid;

      TransactionId(long value) { this.txid = value; } } ```

构造方法

FSEditLog 是通过 newInstance 方法进行构造的,可以根据配置 dfs.namenode.edits.asynclogging 生成不同的FSEditLog 实例,默认是 FSEditLogAsync

  1. static FSEditLog newInstance(Configuration conf, NNStorage storage,
  2. List<URI> editsDirs) {
  3. boolean asyncEditLogging = conf.getBoolean(
  4. DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
  5. DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT);
  6. LOG.info("Edit logging is async:" + asyncEditLogging);
  7. return asyncEditLogging
  8. ? new FSEditLogAsync(conf, storage, editsDirs)
  9. : new FSEditLog(conf, storage, editsDirs);
  10. }

FSEditLog状态机

FSEditLog 类被设计成一个状态机,用内部类 FSEditLog.State 描述

FSEditLog有以下5个状态:

  • UNINITIALIZED:editlog的初始状态
  • BETWEEN_LOG_SEGMENTS:editlog的前一个segment已经关闭,新的还没开始
  • IN_SEGMENT:editlog处于可写状态
  • OPEN_FOR_READING:editlog处于可读状态
  • CLOSED:editlog处于关闭状态
  1. private enum State {
  2. // editlog的初始状态。
  3. UNINITIALIZED,
  4. // editlog的前一个segment已经关闭,新的还没开始。
  5. BETWEEN_LOG_SEGMENTS,
  6. // editlog处于可写状态。
  7. IN_SEGMENT,
  8. // editlog处于可读状态。
  9. OPEN_FOR_READING,
  10. // editlog处于关闭状态。
  11. CLOSED;
  12. }

对于非 HA 机制的情况:

  • FSEditLog 应该开始于 UNINITIALIZED 或者 CLOSED 状态(因为在构造 FSEditLog 对象时,FSEditLog 的成员变量 state 默认为 State.UNINITIALIZED)
  • FSEditLog 初始化完成之后进入BETWEEN_LOG_SEGMENTS 状态,表示前一个 segment 已经关闭,新的还没开始,日志已经做好准备了
  • 当打开日志服务时,改变 FSEditLog 状态为 IN_SEGMENT 状态,表示可以写 editlog 文件了

对于 HA 机制的情况:

  • FSEditLog同样应该开始于 UNINITIALIZED 或者 CLOSED 状态,但是在完成初始化后 FSEditLog 并不进入BETWEEN_LOG_SEGMENTS状态, 而是进入 OPEN_FOR_READING 状态,因为目前 Namenode 启动时都是以 Standby 模式启动的,然后通过 DFSHAAdmin 发送命令把其中一个 Standby NameNode 转换成 Active Namenode

InitJournalsForWrite()

IniJournalsForWrite() 方法是 FSEditLog 的 public 方法,调用这个方法会将 FSEditLog 从 UNINITIALIZED 状态转换为 BETWEEN_LOG_SEGMENTS 状态

  1. public synchronized void initJournalsForWrite() {
  2. Preconditions.checkState(state == State.UNINITIALIZED ||
  3. state == State.CLOSED, "Unexpected state: %s", state);
  4. // 调用initJournals()方法
  5. // initJournals()方法会根据传入的 dirs 变量
  6. // (保存的是 editlog 文件的存储位置,都是URI)
  7. // 初始化journalSet字段 (JournalManager对象的集合)。
  8. // 初始化之后,FSEditLog就可以调用journalSet对象的方法向多个日志存储位置写editlog文件了。
  9. initJournals(this.editsDirs);
  10. //状态转换为BETWEEN_LOG_SEGMENTS
  11. state = State.BETWEEN_LOG_SEGMENTS;
  12. }

JournalManager 类是负责在特定存储目录上持久化 editlog 文件的类,它的 format() 方法负责格式化底层存储,startLogSegment() 方法负责从指定事务 id 开始记录一个操作的段落,finalizeLogSegment() 方法负责完成指定事务id区间的写操作

这里之所以抽象这个接口,是因为 Namenode 可能将 editlog 文件持久化到不同类型的存储上,也就需要不同类型的 JournalManager 来管理,所以需要定义一个抽象的接口。JoumalManager 有多个子类,普通的文件系统由 FileJournalManager 类管理,共享 NFS 由 BackupJournalManager 类管理、Bookkeeper由 BookkeeperJournalManager 类管理、Quorum集群则由 QuorumJournalManager 类管理

  1. /**
  2. * dirs editsDirs
  3. * @param dirs
  4. */
  5. private synchronized void initJournals(List<URI> dirs) {
  6. // dfs.namenode.edits.dir.minimum 默认值: 1
  7. int minimumRedundantJournals = conf.getInt(
  8. DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
  9. DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
  10. synchronized(journalSetLock) {
  11. // 初始化journalSet集合,存放存储路径对应的所有JournalManager对象
  12. journalSet = new JournalSet(minimumRedundantJournals);
  13. // 根据传入的URI获取对应的JournalManager对象
  14. for (URI u : dirs) {
  15. boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
  16. .contains(u);
  17. if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
  18. StorageDirectory sd = storage.getStorageDirectory(u);
  19. if (sd != null) {
  20. // 本地URI,则加入FileJournalManager即可
  21. journalSet.add(new FileJournalManager(conf, sd, storage),
  22. required, sharedEditsDirs.contains(u));
  23. }
  24. } else {
  25. // 否则根椐URI创建对应的JournalManager对象,并放入journalSet中保存
  26. journalSet.add(createJournal(u), required,
  27. sharedEditsDirs.contains(u));
  28. }
  29. }
  30. }
  31. if (journalSet.isEmpty()) {
  32. LOG.error("No edits directories configured!");
  33. }
  34. }


InitSharedJournalsForRead()

InitSharedJournalsForRead() 方法是 FSEditLog 的 public 方法,用在 HA 情况下。调用这个方法会将 FSEditLog 从 UNINITIALIZED 状态转换为 OPEN_FOR_READING 状态

与 initJournalsForWrite() 方法相同,initSharedJournalsForRead() 方法也调用了 initJournals() 方法执行初始化操作,只不过 editlog 文件的存储位置不同,在 HA 的情况下,editlog 文件的存储目录为共享存储目录,这个共享存储目录由 Active Namenode 和 StandbyNamenode 共享读取

  1. public synchronized void initSharedJournalsForRead() {
  2. if (state == State.OPEN_FOR_READING) {
  3. LOG.warn("Initializing shared journals for READ, already open for READ",
  4. new Exception());
  5. return;
  6. }
  7. Preconditions.checkState(state == State.UNINITIALIZED ||
  8. state == State.CLOSED);
  9. // 对于HA的情况,editlog的日志存储目录为共享的目录sharedEditsDirs
  10. initJournals(this.sharedEditsDirs);
  11. state = State.OPEN_FOR_READING;
  12. }

OpenForWrite()

OpenForWrite() 方法用于初始化 editlog 文件的输出流,并且打开第一个日志段落(log segment)。 在非 HA 机制下,调用这个方法会完成 BETWEEN_LOG_SEGMENTS 状态到 IN_SEGMENT 状态的转换

  1. synchronized void openForWrite(int layoutVersion) throws IOException {
  2. Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
  3. "Bad state: %s", state);
  4. // 返回最后一个写入log的transactionId+1,作为本次操作的transactionId ,
  5. // 假设当前的transactionId为31
  6. long segmentTxId = getLastWrittenTxId() + 1;
  7. // Safety check: we should never start a segment if there are
  8. // newer txids readable.
  9. List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
  10. // 传入了参数segmentTxId,
  11. // 这个参数会作为这次操作的transactionId,
  12. // 值为editlog已经记录的最新的transactionId加1(这里是 31+1=32)。
  13. //
  14. // selectInputStreams()方法会判断有没有一个以segmentTxId(32)开始的日志,
  15. // 如果没有则表示当前transactionId的值选择正确,可以打开新的editlog文件记录
  16. // 以segmentTxId开始的日志段落。
  17. // 如果方法找到了包含这个transactionId的editlog文件,
  18. // 则表示出现了两个日志 transactionId交叉的情况,抛出异常。
  19. journalSet.selectInputStreams(streams, segmentTxId, true, false);
  20. // 这里判断,有没有包含这个新的segmentTxId的editlog文件,如果有则抛出异常
  21. if (!streams.isEmpty()) {
  22. String error = String.format("Cannot start writing at txid %s " +
  23. "when there is a stream available for read: %s",
  24. segmentTxId, streams.get(0));
  25. IOUtils.cleanupWithLogger(LOG,
  26. streams.toArray(new EditLogInputStream[0]));
  27. throw new IllegalStateException(error);
  28. }
  29. //写入日志
  30. startLogSegmentAndWriteHeaderTxn(segmentTxId, layoutVersion);
  31. assert state == State.IN_SEGMENT : "Bad state: " + state;
  32. }

在所有 editlog 文件的存储路径上构造输出流,并将这些输出流保存在 FSEditLog 的字段 journalSet.journals 中

  1. /**
  2. *
  3. * Start writing to the log segment with the given txid.
  4. * Transitions from BETWEEN_LOG_SEGMENTS state to IN_LOG_SEGMENT state.
  5. */
  6. private void startLogSegment(final long segmentTxId, int layoutVersion)
  7. throws IOException {
  8. assert Thread.holdsLock(this);
  9. LOG.info("Starting log segment at " + segmentTxId);
  10. Preconditions.checkArgument(segmentTxId > 0,
  11. "Bad txid: %s", segmentTxId);
  12. Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
  13. "Bad state: %s", state);
  14. Preconditions.checkState(segmentTxId > curSegmentTxId,
  15. "Cannot start writing to log segment " + segmentTxId +
  16. " when previous log segment started at " + curSegmentTxId);
  17. Preconditions.checkArgument(segmentTxId == txid + 1,
  18. "Cannot start log segment at txid %s when next expected " +
  19. "txid is %s", segmentTxId, txid + 1);
  20. numTransactions = 0;
  21. totalTimeTransactions = 0;
  22. numTransactionsBatchedInSync.set(0L);
  23. // TODO no need to link this back to storage anymore!
  24. // See HDFS-2174.
  25. storage.attemptRestoreRemovedStorage();
  26. try {
  27. //初始化editLogStream
  28. editLogStream = journalSet.startLogSegment(segmentTxId, layoutVersion);
  29. } catch (IOException ex) {
  30. throw new IOException("Unable to start log segment " +
  31. segmentTxId + ": too few journals successfully started.", ex);
  32. }
  33. //当前正在写入txid设置为segmentTxId
  34. curSegmentTxId = segmentTxId;
  35. state = State.IN_SEGMENT;
  36. }

EndCurrentLogSegment()

EndCurrentLogSegment() 会将当前正在写入的日志段落关闭,它调用了 journalSet.finalizeLogSegment() 方法将curSegmentTxid -> lastTxId 之间的操作持久化到磁盘上

持久化是将程序数据在持久状态和瞬时状态间转换的机制。通俗的讲,就是瞬时数据(比如内存中的数据,是不能永久保存的)持久化为持久数据(比如持久化至数据库中,能够长久保存)

这个方法会将 FSEditLog 状态机更改为 BETWEEN_LOG_SEGMENTS 状态

  1. /**
  2. *
  3. * Finalize the current log segment.
  4. * Transitions from IN_SEGMENT state to BETWEEN_LOG_SEGMENTS state.
  5. */
  6. public synchronized void endCurrentLogSegment(boolean writeEndTxn) {
  7. LOG.info("Ending log segment " + curSegmentTxId +
  8. ", " + getLastWrittenTxId());
  9. Preconditions.checkState(isSegmentOpen(),
  10. "Bad state: %s", state);
  11. if (writeEndTxn) {
  12. logEdit(LogSegmentOp.getInstance(cache.get(),
  13. FSEditLogOpCodes.OP_END_LOG_SEGMENT));
  14. }
  15. // always sync to ensure all edits are flushed.
  16. logSyncAll();
  17. printStatistics(true);
  18. final long lastTxId = getLastWrittenTxId();
  19. // 获取当前写入的最后一个id
  20. final long lastSyncedTxId = getSyncTxId();
  21. Preconditions.checkArgument(lastTxId == lastSyncedTxId,
  22. "LastWrittenTxId %s is expected to be the same as lastSyncedTxId %s",
  23. lastTxId, lastSyncedTxId);
  24. try {
  25. // 调用journalSet.finalizeLogSegment将curSegmentTxid -> lastTxId之间的操作
  26. // 写入磁盘(例如editlog文件edits_0032-0034)
  27. journalSet.finalizeLogSegment(curSegmentTxId, lastTxId);
  28. editLogStream = null;
  29. } catch (IOException e) {
  30. // All journals have failed, it will be handled in logSync.
  31. }
  32. // 更改状态机的状态
  33. state = State.BETWEEN_LOG_SEGMENTS;
  34. }

journalSet.finalizeLogSegment() 方法也会调用 mapJournalsAndReportErrors() 方法将 finalizeLogSegment() 调用前转到 journals 集合中保存的所有的 JournalManager 对象上。比如 FileJournalManager, FileJoumalManager.finalizeLogSegment() 方法会将 edit_inprogress 文件改名为 edit 文件,新生成的 edit 文件覆盖了 curSegmentTxid -> lastTxId 之间的所有事务

  1. @Override
  2. synchronized public void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException {
  3. // 原有的inprogress文件
  4. File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId);
  5. // 构造新的edit文件
  6. File dstFile = NNStorage.getFinalizedEditsFile( sd, firstTxId, lastTxId);
  7. LOG.info("Finalizing edits file " + inprogressFile + " -> " + dstFile);
  8. Preconditions.checkState(!dstFile.exists(),
  9. "Can't finalize edits file " + inprogressFile + " since finalized file " +
  10. "already exists");
  11. try {
  12. //执行重命名操作
  13. NativeIO.renameTo(inprogressFile, dstFile);
  14. } catch (IOException e) {
  15. errorReporter.reportErrorOnFile(dstFile);
  16. throw new IllegalStateException("Unable to finalize edits file " + inprogressFile, e);
  17. }
  18. if (inprogressFile.equals(currentInProgress)) {
  19. currentInProgress = null;
  20. }
  21. }

close()

close() 方法用于关闭 editlog 文件的存储,完成了 IN_SEGMENT 到 CLOSED 状态的改变。close()会首先等待 sync操作完成,然后调用 endCurrentLogSegment() 方法,将当前正在进行写操作的日志段落结束。之后 close() 方法会关闭 journalSet 对象,并将 FSEditLog 状态机转变为 CLOSED 状态

  1. synchronized void close() {
  2. if (state == State.CLOSED) {
  3. LOG.debug("Closing log when already closed");
  4. return;
  5. }
  6. try {
  7. if (state == State.IN_SEGMENT) {
  8. assert editLogStream != null;
  9. //如果有sync操作, 则等待sync操作完成
  10. waitForSyncToFinish();
  11. //结束当前logSegment
  12. endCurrentLogSegment(true);
  13. }
  14. } finally {
  15. //关闭journalSet
  16. if (journalSet != null && !journalSet.isEmpty()) {
  17. try {
  18. synchronized(journalSetLock) {
  19. journalSet.close();
  20. }
  21. } catch (IOException ioe) {
  22. LOG.warn("Error closing journalSet", ioe);
  23. }
  24. }
  25. //将状态机更改为CLOSED状态
  26. state = State.CLOSED;
  27. }
  28. }

EditLogOutputStream

FSEditLog 类会调用 FSEditLog.editLogStream 字段的 write() 方法在 editlog 文件中记录一个操作,数据会先被写入到 editlog 文件输出流的缓存中,然后 FSEditLog 类会调用 editLogStream.flush() 方法将缓存中的数据同步到磁盘上

FSEditLog 的 editLogStream 字段是 EditLogOutputStream 类型的,EditLogOutputStream类是一个抽象类,它定义了向持久化存储上写 editlog 文件的相关接口

EditLogOutputStream定义了多个子类来向不同存储系统上的 editlog 文件中写入数据
image.png

JournalSetOutputStream

JournalSetOutputStream 类是 EditLogOutputStream 的子类,在 JournalSetOutputStream 对象上调用的所有EditLogOutputStream 接口方法都会被前转到 FSEditLog.journalSet 字段中保存的 editlog 文件在所有存储位置上的输出流对象(通过调用 mapJournalsAndReportErrors() 方法实现)

FSEditLog 的 editLogStream 字段就是 JournalSetOutputStream 类型的(是在 startLogSegment() 方法中赋值的),通过调用 JournalSetOutputStream 对象提供的方法,FSEditLog 可以将 Namenode 多个存储位置上的editlog 文件输出流对外封装成一个输出流,大大方便了调用

JournalSetOutputStream 类是通过 mapJournalsAndReportErrors() 方法,将 EditLogOutputStream 接口上的write() 调用前转到了 FSEditLog 中保存的所有存储路径上 editlog 文件对应的 EditLogOutputStream 输出流对象上的。这个方法会遍历 FSEditLog.journalSet.journals 集合,然后将 write() 请求前转到 journals 集合中保存的所有 JournalAndStream 对象上

journalSet 的 journals 字段是一个 JournalAndStream 对象的集合,JournalAndStream 对象封装了一个JournalManager 对象,以及在这个 JournalManager 上打开的 editlog 文件的 EditLogOutputStream 对象

journalSet.journals 字段是在 FSEditLog.startLogSegment() 方法中赋值的 ,这个方法调用了journalSet.startLogSegment() 方法在所有 editlog 文件的存储路径上构造输出流,并将这些输出流保存在FSEditLog 的 journalSet.journals 字段中
image.png

  1. /**
  2. * Apply the given operation across all of the journal managers, disabling
  3. * any for which the closure throws an IOException.
  4. * @param closure {@link JournalClosure} object encapsulating the operation.
  5. * @param status message used for logging errors (e.g. "opening journal")
  6. * @throws IOException If the operation fails on all the journals.
  7. */
  8. private void mapJournalsAndReportErrors(
  9. JournalClosure closure, String status) throws IOException{
  10. List<JournalAndStream> badJAS = Lists.newLinkedList();
  11. //遍历journals字段中保存的所有JournalAndStream对象
  12. for (JournalAndStream jas : journals) {
  13. try {
  14. //在闭包对象上调用apply()方法前转请求
  15. closure.apply(jas);
  16. } catch (Throwable t) {
  17. if (jas.isRequired()) {
  18. final String msg = "Error: " + status + " failed for required journal ("
  19. + jas + ")";
  20. LOG.error(msg, t);
  21. abortAllJournals();
  22. terminate(1, msg);
  23. } else {
  24. LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
  25. badJAS.add(jas);
  26. }
  27. }
  28. }
  29. disableAndReportErrorOnJournals(badJAS);
  30. if (!NameNodeResourcePolicy.areResourcesAvailable(journals,
  31. minimumRedundantJournals)) {
  32. String message = status + " failed for too many journals";
  33. LOG.error("Error: " + message);
  34. throw new IOException(message);
  35. }
  36. }

mapJournalsAndReportErrors() 方法在调用时传入了一个闭包对象 closure,这个对象是在JournalSetOutputStream 实现的 EditLogOutputStream 接口方法上定义的。以 JournalSetOutputStream.write() 方法为例,write() 方法定义了写操作的闭包对象,这个闭包对象会提取出 JournalAndStream 对象中封装的EditLogOutputStream 对象,然后调用这个对象上的 write() 方法来完成写数据的功能。通过这种闭包机制, JournalSetOutputStream 完成了将 EditLogOutputStream 接口上的 write() 调用前转到 JournalAndStream 保存的EditLogOutputStream 对象上的操作

写入方法

  1. @Override
  2. public void write(final FSEditLogOp op)
  3. throws IOException {
  4. mapJournalsAndReportErrors(new JournalClosure() {
  5. public void apply(JournalAndStream jas) throws IOException {
  6. if (jas.isActive()) {
  7. // 提取出JournalAndStream对象中封装的EditLogOutputStream对象,
  8. // 并在EditLogOutputStream对象上调用write()方法
  9. jas.getCurrentStream().write(op);
  10. }
  11. }
  12. }, "write op");
  13. }

EditLogFileOutputStream

EditLogFileOutputStream 是向本地文件系统中保存的 editlog 文件写数据的输出流,向 EditLogFileOutputStream写数据时,数据首先被写入到输出流的缓冲区中,当显式地调用 flush() 操作后,数据才会从缓冲区同步到editlog文件中

构造方法

  1. /**
  2. * Creates output buffers and file object.
  3. *
  4. * @param conf
  5. * Configuration object
  6. * @param name
  7. * File name to store edit log
  8. * @param size
  9. * Size of flush buffer
  10. * @throws IOException
  11. */
  12. public EditLogFileOutputStream(Configuration conf, File name, int size)
  13. throws IOException {
  14. super();
  15. shouldSyncWritesAndSkipFsync = conf.getBoolean(
  16. DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH,
  17. DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT);
  18. file = name;
  19. doubleBuf = new EditsDoubleBuffer(size);
  20. RandomAccessFile rp;
  21. if (shouldSyncWritesAndSkipFsync) {
  22. rp = new RandomAccessFile(name, "rws");
  23. } else {
  24. rp = new RandomAccessFile(name, "rw");
  25. }
  26. fp = new FileOutputStream(rp.getFD()); // open for append
  27. fc = rp.getChannel();
  28. fc.position(fc.size());
  29. }

常量

  1. public static final int MIN_PREALLOCATION_LENGTH = 1024 * 1024;
  2. // 输出流对应的editlog文件
  3. private File file;
  4. // editlog文件对应的输出流
  5. private FileOutputStream fp; // file stream for storing edit logs
  6. // editlog文件对应的输出流通道。
  7. private FileChannel fc; // channel of the file stream for sync
  8. // 一个具有两块缓存的缓冲区, 数据必须先写入缓存,然后再由缓存同步到磁盘上
  9. private EditsDoubleBuffer doubleBuf;
  10. // 用来扩充editlog文件大小的数据块
  11. // 当要进行同步操作时,如果editlog文件不够大,则使用fill来扩充editlog
  12. // 文件最小1M
  13. static final ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);
  14. private boolean shouldSyncWritesAndSkipFsync = false;
  15. private static boolean shouldSkipFsyncForTests = false;
  16. // EditLogFileOutputStream有一个static的代码段
  17. // 将fill字段用FSEditLogOpCodes.OP_INVALID 字节填满
  18. static {
  19. fill.position(0);
  20. for (int i = 0; i < fill.capacity(); i++) {
  21. fill.put(FSEditLogOpCodes.OP_INVALID.getOpCode());
  22. }
  23. }

在创建 edits_inprogress_0000000000000000485 文件的时候,首先会用 -1 填充 1M 大小的文件空间,然后将写入的指针归0。当有数据的时候进行写入,写入的时候,会覆盖之前预制填充的数据。但不管怎么样,如果数据大小不满1M的话,那么edits文件的大小最小为1M

每次重启 namenode 的时候都会将之前的 editsinprogress 文件关闭,并重命名为 edits** 文件, 创建一个新的 edits_inprogress_0000000000000000485 文件

write()、 setReadyToFlush()

  1. // 直接调用doubleBuf中的对应方法,向输出流写入一个操作
  2. @Override
  3. public void write(FSEditLogOp op) throws IOException {
  4. // 向doubleBuf写入FSEditLogOp对象
  5. doubleBuf.writeOp(op, getCurrentLogVersion());
  6. }
  7. /**
  8. *
  9. * 为同步数据做准备
  10. * 调用doubleBuf.setReadyToFlush()交换两个缓冲区
  11. *
  12. * All data that has been written to the stream so far will be flushed. New
  13. * data can be still written to the stream while flushing is performed.
  14. */
  15. @Override
  16. public void setReadyToFlush() throws IOException {
  17. doubleBuf.setReadyToFlush();
  18. }

flushAndSync() 方法则用于将输出流中缓存的数据同步到磁盘上的 editlog 文件中

  1. /**
  2. *
  3. * 将准备好的缓冲区刷新到持久性存储
  4. * 由于会刷新和同步readyBuffer,因此currentBuffer不会累积新的日志记录,因此不会刷新
  5. *
  6. * Flush ready buffer to persistent store. currentBuffer is not flushed as it
  7. * accumulates new log records while readyBuffer will be flushed and synced.
  8. */
  9. @Override
  10. public void flushAndSync(boolean durable) throws IOException {
  11. // fp: editlog文件对应的输出流
  12. if (fp == null) {
  13. throw new IOException("Trying to use aborted output stream");
  14. }
  15. if (doubleBuf.isFlushed()) {
  16. LOG.info("Nothing to flush");
  17. return;
  18. }
  19. // preallocate()方法用于在 editLog 文件大小不够时,填充editlog文件
  20. preallocate(); // preallocate file if necessary
  21. //将缓存中的数据同步到editlog文件中
  22. doubleBuf.flushTo(fp);
  23. if (durable && !shouldSkipFsyncForTests && !shouldSyncWritesAndSkipFsync) {
  24. fc.force(false); // metadata updates not needed
  25. }
  26. }

EditsDoubleBuffer类

EditsDoubleBuffer 中包括两块缓存,数据会先被写入到 EditsDoubleBuffer 的一块缓存中,而 EditsDoubleBuffer的另一块缓存可能正在进行磁盘的同步操作(就是将缓存中的文件写入磁盘的操作)

EditsDoubleBuffer 这样的设计会保证输出流进行磁盘同步操作的同时,并不影响数据写入的功能

  1. // 正在写入的缓冲区
  2. private TxnBuffer bufCurrent; // current buffer for writing
  3. // 准备好同步的缓冲区
  4. private TxnBuffer bufReady; // buffer ready for flushing
  5. // 缓冲区的大小 默认 512K
  6. private final int initBufferSize;

输出流要进行同步操作时,首先要调用 EditsDoubleBuffer.setReadyToFlush() 方法交换两个缓冲区,将正在写入的缓存改变为同步缓存,然后才可以进行同步操作

  1. // 将正在写入的缓存改变为同步缓存, 然后才可以进行同步操作。
  2. public void setReadyToFlush() {
  3. assert isFlushed() : "previous data not flushed yet";
  4. //交换两个缓冲区
  5. TxnBuffer tmp = bufReady;
  6. bufReady = bufCurrent;
  7. bufCurrent = tmp;
  8. }

完成了 setReadyToFlush() 调用之后,输出流就可以调用 flushTo() 方法将同步缓存中的数据写入到文件中

  1. /**
  2. * Writes the content of the "ready" buffer to the given output stream,
  3. * and resets it. Does not swap any buffers.
  4. *
  5. */
  6. public void flushTo(OutputStream out) throws IOException {
  7. // 将同步缓存中的数据写入文件
  8. bufReady.writeTo(out); // write data to file
  9. // 将同步缓存中保存的数据清空
  10. bufReady.reset(); // erase all data in the buffer
  11. }

EditLogFileInputStream

EditLogFileInputStream 类抽象了从持久化存储上读 editlog 文件的相关接口
image.png
EditLogFileInputStream 定义了本地文件系统的 editlog 文件的输入流。它定义的方法都很简单,都是返回了EditLogFileInputStream 初始化以后的相应字段,或者调用了 FSEditLogOp.Reader 对象的 readOp() 方法从editlog 文件中解析出一个 FSEditLogOp 对象

构造方法

  1. private EditLogFileInputStream(LogSource log,
  2. long firstTxId, long lastTxId,
  3. boolean isInProgress) {
  4. this.log = log;
  5. this.firstTxId = firstTxId;
  6. this.lastTxId = lastTxId;
  7. this.isInProgress = isInProgress;
  8. // 最大值 50 * 1024 * 1024 ==> 50M ???????
  9. this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT;
  10. }

属性

  1. private final LogSource log;
  2. private final long firstTxId;
  3. private final long lastTxId;
  4. private final boolean isInProgress;
  5. private int maxOpSize;
  6. static private enum State {
  7. UNINIT,
  8. OPEN,
  9. CLOSED
  10. }
  11. private State state = State.UNINIT;
  12. private InputStream fStream = null;
  13. private int logVersion = 0;
  14. private FSEditLogOp.Reader reader = null;
  15. private FSEditLogLoader.PositionTrackingInputStream tracker = null;
  16. private DataInputStream dataIn = null;
  17. static final Logger LOG = LoggerFactory.getLogger(EditLogInputStream.class);

init

  1. private void init(boolean verifyLayoutVersion)
  2. throws LogHeaderCorruptException, IOException {
  3. Preconditions.checkState(state == State.UNINIT);
  4. BufferedInputStream bin = null;
  5. try {
  6. fStream = log.getInputStream();
  7. bin = new BufferedInputStream(fStream);
  8. tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
  9. dataIn = new DataInputStream(tracker);
  10. try {
  11. logVersion = readLogVersion(dataIn, verifyLayoutVersion);
  12. } catch (EOFException eofe) {
  13. throw new LogHeaderCorruptException("No header found in log");
  14. }
  15. if (logVersion == -1) {
  16. // The edits in progress file is pre-allocated with 1MB of "-1" bytes
  17. // when it is created, then the header is written. If the header is
  18. // -1, it indicates the an exception occurred pre-allocating the file
  19. // and the header was never written. Therefore this is effectively a
  20. // corrupt and empty log.
  21. throw new LogHeaderCorruptException("No header present in log (value " +
  22. "is -1), probably due to disk space issues when it was created. " +
  23. "The log has no transactions and will be sidelined.");
  24. }
  25. // We assume future layout will also support ADD_LAYOUT_FLAGS
  26. if (NameNodeLayoutVersion.supports(
  27. LayoutVersion.Feature.ADD_LAYOUT_FLAGS, logVersion) ||
  28. logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION) {
  29. try {
  30. LayoutFlags.read(dataIn);
  31. } catch (EOFException eofe) {
  32. throw new LogHeaderCorruptException("EOF while reading layout " +
  33. "flags from log");
  34. }
  35. }
  36. reader = FSEditLogOp.Reader.create(dataIn, tracker, logVersion);
  37. reader.setMaxOpSize(maxOpSize);
  38. state = State.OPEN;
  39. } finally {
  40. if (reader == null) {
  41. IOUtils.cleanupWithLogger(LOG, dataIn, tracker, bin, fStream);
  42. state = State.CLOSED;
  43. }
  44. }
  45. }

ReadOp

  1. /**
  2. * Read an operation from the stream
  3. * @return an operation from the stream or null if at end of stream
  4. * @throws IOException if there is an error reading from the stream
  5. */
  6. public FSEditLogOp readOp() throws IOException {
  7. FSEditLogOp ret;
  8. if (cachedOp != null) {
  9. ret = cachedOp;
  10. cachedOp = null;
  11. return ret;
  12. }
  13. return nextOp();
  14. }

NextOpImpl

  1. // skipBrokenEdits 是否跳过阻塞的edits .
  2. private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
  3. FSEditLogOp op = null;
  4. switch (state) {
  5. case UNINIT:
  6. try {
  7. // 执行初始化操作
  8. init(true);
  9. } catch (Throwable e) {
  10. LOG.error("caught exception initializing " + this, e);
  11. if (skipBrokenEdits) {
  12. return null;
  13. }
  14. Throwables.propagateIfPossible(e, IOException.class);
  15. }
  16. Preconditions.checkState(state != State.UNINIT);
  17. return nextOpImpl(skipBrokenEdits);
  18. case OPEN:
  19. // 读取 FSEditLogOp 类型操作
  20. op = reader.readOp(skipBrokenEdits);
  21. if ((op != null) && (op.hasTransactionId())) {
  22. long txId = op.getTransactionId();
  23. if ((txId >= lastTxId) &&
  24. (lastTxId != HdfsServerConstants.INVALID_TXID)) {
  25. //
  26. // Sometimes, the NameNode crashes while it's writing to the
  27. // edit log. In that case, you can end up with an unfinalized edit log
  28. // which has some garbage at the end.
  29. // JournalManager#recoverUnfinalizedSegments will finalize these
  30. // unfinished edit logs, giving them a defined final transaction
  31. // ID. Then they will be renamed, so that any subsequent
  32. // readers will have this information.
  33. //
  34. // Since there may be garbage at the end of these "cleaned up"
  35. // logs, we want to be sure to skip it here if we've read everything
  36. // we were supposed to read out of the stream.
  37. // So we force an EOF on all subsequent reads.
  38. //
  39. long skipAmt = log.length() - tracker.getPos();
  40. if (skipAmt > 0) {
  41. if (LOG.isDebugEnabled()) {
  42. LOG.debug("skipping " + skipAmt + " bytes at the end " +
  43. "of edit log '" + getName() + "': reached txid " + txId +
  44. " out of " + lastTxId);
  45. }
  46. tracker.clearLimit();
  47. IOUtils.skipFully(tracker, skipAmt);
  48. }
  49. }
  50. }
  51. break;
  52. case CLOSED:
  53. break; // return null
  54. }
  55. return op;
  56. }

FSEditLog

FSEditLog 类最重要的作用就是在 editlog 文件中记录 Namenode 命名空间的更改,FSEditLog 类对外提供了若干方法用于执行这个操作

LogDelete()

logDelete() 方法用于在 editlog 文件中记录删除 HDFS 文件的操作

logDelete() 方法首先会构造一个 DeleteOp 对象,这个 DeleteOp 类是 FSEditLogOp 类的子类,用于记录删除操作的相关信息,包括了 ClientProtocol.delete() 调用中所有参数携带的信息。构造 DeleteOp 对象后, logDelete()方法会调用 logRpcIds() 方法在 DeleteOp 对象中添加 RPC 调用相关信息,之后 logDelete() 方法会调用 logEdit()方法在 editlog 文件中记录这次删除操作

  1. /**
  2. * Add delete file record to edit log
  3. */
  4. void logDelete(String src, long timestamp, boolean toLogRpcIds) {
  5. // 构造DeleteOp对象
  6. DeleteOp op = DeleteOp.getInstance(cache.get())
  7. .setPath(src)
  8. .setTimestamp(timestamp);
  9. // 记录RPC调用相关信息
  10. logRpcIds(op, toLogRpcIds);
  11. // 调用logEdit()方法记录删除操作
  12. logEdit(op);
  13. }

logEdit()

基本上所有的 log*( )方法在底层都调用了 logEdit() 方法来执行记录操作,这里会传入一个 FSEditLogOp 对象来标识当前需要被记录的操作类型以及操作的信息

  1. /**
  2. * Write an operation to the edit log.
  3. * <p/>
  4. * Additionally, this will sync the edit log if required by the underlying
  5. * edit stream's automatic sync policy (e.g. when the buffer is full, or
  6. * if a time interval has elapsed).
  7. */
  8. void logEdit(final FSEditLogOp op) {
  9. boolean needsSync = false;
  10. synchronized (this) {
  11. assert isOpenForWrite() :
  12. "bad state: " + state;
  13. // 如果自动同步开启,则等待同、 步完成
  14. waitIfAutoSyncScheduled();
  15. // check if it is time to schedule an automatic sync
  16. needsSync = doEditTransaction(op);
  17. if (needsSync) {
  18. isAutoSyncScheduled = true;
  19. }
  20. }
  21. // Sync the log if an automatic sync is required.
  22. if (needsSync) {
  23. logSync();
  24. }
  25. }
  1. // 同步操作, 即使是多个线程, 依旧会进行同步操作. txid 不会错乱
  2. // 保证了多个线程调用FSEditLog.log*()方法向editlog文件中写数据时,editlog文件记录的内容不会相互影响。
  3. // 也保证了这几个并发线程保存操作对应的transactionId;通过调用beginTransaction()方法获得
  4. synchronized boolean doEditTransaction(final FSEditLogOp op) {
  5. //开启一个新的transaction , 更新 txid
  6. long start = beginTransaction();
  7. op.setTransactionId(txid);
  8. try {
  9. // 使用editLogStream写入Op操作
  10. editLogStream.write(op);
  11. } catch (IOException ex) {
  12. // All journals failed, it is handled in logSync.
  13. } finally {
  14. op.reset();
  15. }
  16. //结束当前的transaction
  17. endTransaction(start);
  18. //检查是否需要强制同步
  19. return shouldForceSync();
  20. }

logEdit() 方法会调用 beginTransaction() 方法在 editlog 文件中开启一个新的 transaction,然后使用 editlog 输入流写入要被记录的操作,接下来调用 endTransaction() 方法关闭这个transaction,最后调用 logSync() 方法将写入的信息同步到磁盘上

logEdit() 方法调用 beginTransaction()、editLogStream.write() 以及 endTransaction() 三个方法时使用了synchronized 关键字进行同步操作,这样就保证了多个线程调用 FSEditLog.log*() 方法向 editlog 文件中写数据时,editlog 文件记录的内容不会相互影响。 同时, 也保证了这几个并发线程保存操作对应的 transactionId(通过调用 beginTransaction() 方法获得)是唯一并递增的

logEdit() 方法中调用 logSync() 方法执行刷新操作的语句并不在 synchronized 代码段中。这是因为调用 logSync()方法必然会触发写editlog文件的磁盘操作,这是一个非常耗时的操作,如果放入同步模块中会造成其他调用FSEditLog.log() 线程的等待时间过长。所以,HDFS 设计者将需要进行同步操作的 synchronized 代码段放入logSync() 方法中,也就让输出日志记录和刷新缓冲区数据到磁盘这两个操作分离了。同时,利用EditLogOutputStream 的两个缓冲区,使得日志记录和刷新缓冲区数据这两个操作可以并发执行,*大大地提高了Namenode的吞吐量

BeginTransaction()

logEdit() 方法会调用 beginTransaction() 方法开启一个新的 transaction,也就是将 FSEditLog.txid 字段增加 1 并作为当前操作的 transactionId

FSEditLog.txid 字段维护了一个全局递增的 transactionId,这样也就保证了 FSEditLog 为所有操作分配的transactionId 是唯一且递增的。调用 beginTransaction() 方法之后会将新申请的 transactionId 放入 ThreadLocal的变量 my TransactionId 中, myTransactionId 保存了当前线程记录操作对应的 transactionId,方便了以后线程做sync同步操作

对于 FSEditLog 类,可能同时有多个线程并发地调用 log() 方法执行日志记录操作,所以 FSEditLog 类使用了一个 ThreadLocal 变量 myTransactionId 为每个调用log()操作的线程保存独立的 txid,这个 txid 为当前线程记录操作对应的transactionId

  1. private long beginTransaction() {
  2. assert Thread.holdsLock(this);
  3. // get a new transactionId
  4. // 全局的transactionId ++
  5. txid++;
  6. //
  7. // 使用ThreadLocal变量保存当前线程持有的transactionId
  8. // record the transactionId when new data was written to the edits log
  9. //
  10. TransactionId id = myTransactionId.get();
  11. id.txid = txid;
  12. return monotonicNow();
  13. }

EndTransaction()

logEdit() 方法会调用 endTransaction() 方法结束一个 transaction,这个方法就是更改一些统计数据

  1. private void endTransaction(long start) {
  2. assert Thread.holdsLock(this);
  3. // update statistics
  4. long end = monotonicNow();
  5. numTransactions++;
  6. totalTimeTransactions += (end-start);
  7. if (metrics != null) // Metrics is non-null only when used inside name node
  8. metrics.addTransaction(end-start);
  9. }

LogSync()

logEdit() 方法通过调用 beginTransaction() 方法成功地获取一个 transactionId 之后,就会通过输出流向 editlog 文件写数据以记录当前的操作,但是写入的这些数据并没有直接保存在 editlog 文件中, 而是暂存在输出流的缓冲区中。所以当 logEdit() 方法将一个完整的操作写入输出流后,需要调用 logSync() 方法同步当前线程对editlog文件所做的修改

editlog同步策略:

  • 所有的操作项同步的写入缓存时,每个操作会被赋予一个唯一的 transactionId
  • 当一个线程要将它的操作同步到 editlog 文件中时,logSync() 方法会使用 ThreadLocal 变量 myTransactionId获取该线程需要同步的 transactionId,然后对比这个 transactionId 和已经同步到 editlog 文件中的transactionId。如果当前线程的 transactionId 大于 editlog 文件中的 transactionId,则表明 editlog 文件中记录的数据不是最新的,同时如果当前没有别的线程执行同步操作,则开始同步操作将输出流缓存中的数据写入 editlog 文件中。需要注意的是,由于 editlog 输出流使用了双 buffer 的结构,所以在进行 sync 操作的同时,并不影响 editlog 输出流的使用
  • 在 logSync() 方法中使用 isSyncRunning 变量标识当前是否有线程正在进行同步操作, 这里注意isSyncRunning 是一个 volatile 的 boolean 类型变量

logSync()方法分为以下三个部分,并分开进行加锁操作,这样的设计提高了并发的程度:

  • 判断当前操作是否已经同步到了 editlog 文件中,如果还没有同步,则将 editlog 的双 buffer 调换位置,为同步操作做准备,同时将 isSyncRunning 标志位设置为 true,这部分代码需要进行 synchronized 加锁操作
  • 调用 logStream.flush() 方法将缓存的数据持久化到存储上,这部分代码不需要进行加锁操作,因为在上一段同步代码中已经将双buffer调换了位置,不会有线程向用于刷新数据的缓冲区中写入数据,所以调用 flush() 操作并不需要加锁
  • 重置 isSyncRunning 标志位,并且通知等待的线程,这部分代码需要进行 synchronized 加锁操作

    1. protected void logSync(long mytxid) {
    2. long syncStart = 0;
    3. boolean sync = false;
    4. long editsBatchedInSync = 0;
    5. try {
    6. EditLogOutputStream logStream = null;
    7. synchronized (this) {
    8. try {
    9. // 第一部分,打印统计信息
    10. printStatistics(false);
    11. // 当前txid大于editlog中已经同步的txid,
    12. // 并且有线程正在同步, 则等待.
    13. // if somebody is already syncing, then wait
    14. while (mytxid > synctxid && isSyncRunning) {
    15. try {
    16. wait(1000);
    17. } catch (InterruptedException ie) {
    18. }
    19. }
    20. //
    21. // 如果txid小于editlog中已经同步的txid,则表明当前操作已经被同步到存储上,
    22. // 不需要再次同步
    23. //
    24. if (mytxid <= synctxid) {
    25. return;
    26. }
    27. // 开始同步操作,将isSyncRunning标志位设置为true
    28. // now, this thread will do the sync. track if other edits were
    29. // included in the sync - ie. batched. if this is the only edit
    30. // synced then the batched count is 0
    31. editsBatchedInSync = txid - synctxid - 1;
    32. syncStart = txid;
    33. isSyncRunning = true;
    34. sync = true;
    35. // swap buffers
    36. try {
    37. if (journalSet.isEmpty()) {
    38. throw new IOException("No journals available to flush");
    39. }
    40. // 通过调用 setReadyToFlush() 方法将两个缓冲区互换,为同步做准备
    41. editLogStream.setReadyToFlush();
    42. } catch (IOException e) {
    43. final String msg =
    44. "Could not sync enough journals to persistent storage " +
    45. "due to " + e.getMessage() + ". " +
    46. "Unsynced transactions: " + (txid - synctxid);
    47. LOG.error(msg, new Exception());
    48. synchronized(journalSetLock) {
    49. IOUtils.cleanupWithLogger(LOG, journalSet);
    50. }
    51. terminate(1, msg);
    52. }
    53. } finally {
    54. // 防止其他log edit 写入阻塞, 引起的RuntimeException
    55. // Prevent RuntimeException from blocking other log edit write
    56. doneWithAutoSyncScheduling();
    57. }
    58. //editLogStream may become null,
    59. //so store a local variable for flush.
    60. logStream = editLogStream;
    61. }
    62. // 第二部分,调用flush()方法,将缓存中的数据同步到editlog文件中
    63. // do the sync
    64. long start = monotonicNow();
    65. try {
    66. if (logStream != null) {
    67. logStream.flush();
    68. }
    69. } catch (IOException ex) {
    70. synchronized (this) {
    71. final String msg =
    72. "Could not sync enough journals to persistent storage. "
    73. + "Unsynced transactions: " + (txid - synctxid);
    74. LOG.error(msg, new Exception());
    75. synchronized(journalSetLock) {
    76. IOUtils.cleanupWithLogger(LOG, journalSet);
    77. }
    78. terminate(1, msg);
    79. }
    80. }
    81. long elapsed = monotonicNow() - start;
    82. if (metrics != null) { // Metrics non-null only when used inside name node
    83. metrics.addSync(elapsed);
    84. metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
    85. numTransactionsBatchedInSync.addAndGet(editsBatchedInSync);
    86. }
    87. } finally {
    88. // Prevent RuntimeException from blocking other log edit sync
    89. //第三部分, 恢复标志位
    90. synchronized (this) {
    91. if (sync) {
    92. // 已同步txid赋值为开始sync操作的txid
    93. synctxid = syncStart;
    94. for (JournalManager jm : journalSet.getJournalManagers()) {
    95. /**
    96. * {@link FileJournalManager#lastReadableTxId} is only meaningful
    97. * for file-based journals. Therefore the interface is not added to
    98. * other types of {@link JournalManager}.
    99. */
    100. if (jm instanceof FileJournalManager) {
    101. ((FileJournalManager)jm).setLastReadableTxId(syncStart);
    102. }
    103. }
    104. isSyncRunning = false;
    105. }
    106. this.notifyAll();
    107. }
    108. }
    109. }

    由于 logEdit() 方法中输出日志记录和调用 logSync() 刷新缓冲区数据到磁盘这两个操作是独立加锁的,同时EditLogOutputStream 提供了两个缓冲区可以同时进行日志记录和刷新缓冲区操作,它们都使用 FSEditLog 对象作为锁对象,所以 logEdit() 方法中使用 synchronized 关键字同步的日志记录操作和 logSync() 方法中使用synchronized 关键字同步的刷新缓冲区数据到磁盘的操作是可以并发同步进行的。这种设计大大地提高了多个线程记录 editlog 操作的并发性,且通过 transactionId 机制保证了 editlog 日志记录的正确性