分布式存储系统NameNode代码实现

首先,做edit log场景的时候,执行一个命令,hadoop fs -mkdir /usr/warehosue,会做两件事情

  • 在内存里的文件目录树中加入对应的目录节点;
  • 在磁盘里写入一条edits log,记录本次元数据的修改

    1.rpc调用的服务接口

    hdfs client去创建目录的话,会给hdfs NameNode发送一个rpc接口调用的请求,调用人家的mkdir()接口,在那个接口里就会完成上述的两件事情。

    1. public class NameNodeRpcServer {
    2. /**
    3. * 管理元数据的组件
    4. */
    5. private FSNamesystem namesystem;
    6. public NameNodeRpcServer(FSNamesystem namesystem) {
    7. this.namesystem = namesystem;
    8. }
    9. /**
    10. * 创建目录
    11. * @param path 路径
    12. * @return 是否创建成功
    13. * @throws Exception
    14. */
    15. public Boolean mkdir(String path) throws Exception {
    16. return this.namesystem.mkdir(path);
    17. }
    18. /**
    19. * 启动rpc server
    20. */
    21. public void start() {
    22. System.out.println("开始监听指定的rpc server端口号");
    23. }
    24. }

    2.文件目录树管理&edit log的写入

    请求达到mkdir的入口方法时,第一件是在内存文件目录树中,加入进去对应的一个目录节点,第二件事情是在edits log写入磁盘文件,因此需要目录树组件和edit log日志组件 ```java /**

    • 负责管理内存目录树 */ public class FSDirectory {

      /**

      • 创建目录
      • @param path */ public void mkdir(String path) {

      } }

/**

  • 负责管理edits log日志的核心组件 */ public class FSEditLog {

    /**

    • 记录edit log
    • @param log */ public void logEdit(String log) {

      } } ```

      3.管理元数据的核心组件

      FSNamesystem,其实是作为NameNode里元数据操作的核心入口,负责管理所有的元数据的操作,但是在里面的话呢,他可能会调用其他的组件完成相关的事情 ```java /**

  • 负责管理元数据的核心组件 / public class FSNamesystem { /*

    • 内存目录树的组件 */ private FSDirectory directory;

      /**

    • 负责管理EditLog写入磁盘的组件 */ private FSEditLog editLog;

      public FSNamesystem() { this.directory = new FSDirectory(); this.editLog = new FSEditLog(); }

      /**

    • 创建目录
    • @param path
    • @return
    • @throws Exception */ public Boolean mkdir (String path) throws Exception { this.directory.mkdir(path); this.editLog.logEdit(“创建了一个目录:” + path); return true; } } ```

      目录树组件实现

      请求目录:/usr/warehouse/hive
      目录树就是用于管理数据文件存储的位置,因此该组件需要提供对目录树维护的一切功能 ```java /**
  • 负责管理内存目录树 */ public class FSDirectory {

    private INodeDirectory dirTree;

    public FSDirectory() {

    1. //初始化一个目录
    2. this.dirTree = new INodeDirectory("/");

    }

    /**

    • 创建目录
    • @param path */ public void mkdir(String path) { //命令传入一个路径path = /usr/warehouse/hive //1.判断一下这个路径有没有存在,没有就得根据path去一步步在根节点上挂载出剩下的目录 synchronized (dirTree) {

       String[] paths = path.split("/");
       INodeDirectory parent = null;
       for (String splitedPath : paths) {
           if (splitedPath.trim().equals("")) {
               continue;
           }
           //优化:如果parent节点是null意味着传入进来的path都没有挂载在目录树上,
           // 如果parent不是null,就可以直接用parent这个节点继续往下遍历,不用再从根目录去遍历
           INodeDirectory dir = findDirectory(
                   parent == null ? dirTree : parent,
                   splitedPath);
           if (dir != null) {
               parent = dir;
               continue;
           }
      
           INodeDirectory childrenDir = new INodeDirectory(splitedPath);
           parent.addChild(childrenDir);
       }
      

      } }

      /* 对文件目录树递归查找目录是否存在

    • @param dir
    • @param path
    • @return */ private INodeDirectory findDirectory(INodeDirectory dir, String path) { if (dir.getChildern().size() == 0) {

       return null;
      

      } INodeDirectory resultDir = null;

      for (INode child : dirTree.getChildern()) {

       INodeDirectory childDir = (INodeDirectory) child;
       if (child instanceof INodeDirectory) {
           if (childDir.getPath().equals(path)){
               return childDir;
           }
            resultDir = findDirectory(childDir, path);
           if (resultDir != null) {
               return resultDir;
           }
       }
      

      }

      return null; }

      /**

    • 代表的是文件目录树中的一个节点 */ private interface INode {

      }

      /**

    • 代表文件目录树中的第一个目录 */ public static class INodeDirectory implements INode {

      private String path; private List childern;

      public INodeDirectory(String path) {

       this.path = path;
       this.childern = new LinkedList<INode>();
      

      }

      public void addChild(INode iNode) {

       this.childern.add(iNode);
      

      }

      public String getPath() {

       return path;
      

      }

      public void setPath(String path) {

       this.path = path;
      

      }

      public List getChildern() {

       return childern;
      

      }

      public void setChildern(List childern) {

       this.childern = childern;
      

      } }

      /**

    • 代表文件目录树中的一个文件 */ public static class INodeFile implements INode { private String name;

      public String getName() {

       return name;
      

      }

      public void setName(String name) {

       this.name = name;
      

      } }

}

<a name="kseHF"></a>
# Edit log日志组件实现
<a name="JEmRu"></a>
## 全局txid的生成
通过synchronized加锁保证并发问题解决
```java
public class FSEditLog {
    /**
     * txid序号
     */
    private long txidSeq = 0L;

    /**
     * 记录edit log
     * @param content
     */
    void logEdit(String content) {
        //需要加锁
        synchronized (this) {
            //获取全局唯一递增的txid,代表了edits log的序号
            txidSeq++;
            long txid = txidSeq;
        }
    }
}

双缓冲机制实现

双缓冲区就是为了提高并发时记录日志刷日志进磁盘的效率

/**
     * 内存双缓冲
     */
    private class DoubleBuffer {
        //用于线程写入edits log的缓冲区
        private List<EditLog> currentBuffer = new LinkedList<EditLog>();
        //专门将数据同步的buffer缓冲区
        private List<EditLog> syncBuffer = new LinkedList<EditLog>();

        /**
         * 将edits log写到内存缓冲中
         * @param log
         */
        public void write(EditLog log) {
            currentBuffer.add(log);
        }

        /**
         * 交换两块缓冲区,为了同步内存数据
         */
        public void setReadyToSync() {
            List<EditLog> tmp = currentBuffer;
            currentBuffer = syncBuffer;
            syncBuffer = tmp;
        }

        /**
         * 将syncBuffer数据刷入磁盘中
         */
        public void flush() {
            for (EditLog log : syncBuffer) {
                System.out.println("将edit log写入磁盘中");
                //用文件输出流将数据写入磁盘中
            }
            //刷新完成后将缓冲区清空掉
            syncBuffer.clear();
        }
    }

Edit log分段加锁

edit log 主要做两件事

  • 将数据刷入内存缓冲
    • 为保证写入的高效,写入缓冲区和刷盘是要分开加锁的。
  • 将缓冲区数据刷入磁盘

    • 在内存数据处理这里加锁,与磁盘操作分开。在加锁的代码块中就对多线程操作数据做了控制这个内存操作是很快的,加锁完直接释放锁。

      /**
      * 同步到磁盘中的最大的txid
      */
      private volatile Long syncMaxTxid = 0L;
      
      /**
      * 每个线程自己的Txid副本
      */
      private ThreadLocal<Long> localTxid = new ThreadLocal<Long>();
      
      /**
      * 记录edit log
      * @param content
      */
      void logEdit(String content) {
         //需要加锁
         synchronized (this) {
             //获取全局唯一递增的txid,代表了edits log的序号
             txidSeq++;
             long txid = txidSeq;
             //将txid搞一个线程自己的副本
             localTxid.set(txid);
             //构造一个edits log对象
             EditLog log = new EditLog(txid, content);
             //将log数据刷入缓冲区中
             editLogBuffer.write(log);
         }
      
         logSync();
      }
      
      /**
      * 将内存缓冲中的数据刷入磁盘文件中
      * 在这里允许某一个线程一次性将内存缓冲中的数据刷入磁盘
      */
      private void logSync() {
         //再次尝试加锁,这里加锁后,上面的logEdit的synchronized代码块也不会进入就会卡住
         synchronized (this) {
             //当前正好有人刷内存进磁盘就要用标记位控制住
             if (isSyncRunning) {
                 long txid = localTxid.get();
                 //如果当前线程的txid小于正再刷入磁盘的最大txid数据,
                 // 就证明当前txid的数据也是正在刷入磁盘的
                 //例如:syncBuffer中的txid=1、2、3、4、5,最大的txid就是5,
                 //如果当前线程的txid=3,线程就不需要等待着去刷盘了,
                 //已经有线程正在把这个数据刷磁盘了,直接返回就可以
                 if (txid < syncMaxTxid) {
                     return;
                 }
                 //当txid=6,就需要做一些等待,随后进行刷盘。
             }
             //这里需要交换缓冲区数据
             editLogBuffer.setReadyToSync();
             //保存当前要同步到磁盘中去的最大的txid
             syncMaxTxid = editLogBuffer.getMaxTxid();
             //设置标志位,告诉其他线程现在有人正在把数据刷入磁盘
             isSyncRunning = true;
         }
         //同步内存缓冲的数据进入磁盘文件,这个过程就很慢
         editLogBuffer.flush();
      }