三、ZooKeeper 数据结构与监听机制

ZooKeeper的数据模型 Znode

  • 在ZooKeeper中,数据信息被保存在一个个数据节点上,这些节点被称为znode。ZNode 是Zookeeper 中最小数据单位,在 ZNode 下面又可以再挂 ZNode,这样一层层下去就形成了一个层次化命名空间 ZNode 树,我们称为 ZNode Tree,它采用了类似文件系统的层级树状结构进行管理。见下图示例:

image.png

  • 在 Zookeeper 中,每一个数据节点都是一个 ZNode,上图根目录下有两个节点,分别是:app1 和app2,其中 app1 下面又有三个子节点,所有ZNode按层次化进行组织,形成这么一颗树,ZNode的节点路径标识方式和Unix文件系统路径非常相似,都是由一系列使用斜杠(/)进行分割的路径表示,开发人员可以向这个节点写入数据,也可以在这个节点下面创建子节点

1、ZNode 的类型

  • Zookeeper 节点类型可以分为三大类:

    • 持久性节点(Persistent)
    • 临时性节点(Ephemeral)
    • 顺序性节点(Sequential)
  • 在开发中在创建节点的时候通过组合可以生成以下四种节点类型:持久节点、持久顺序节点、临时节点、临时顺序节点。不同类型的节点则会有不同的生命周期

    • 持久节点:是Zookeeper中最常见的一种节点类型,所谓持久节点,就是指节点被创建后会一直存在服务器,直到删除操作主动清除
    • 持久顺序节点:就是有顺序的持久节点,节点特性和持久节点是一样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序。
    • 临时节点:就是会被自动清理掉的节点,它的生命周期和客户端会话绑在一起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点!!!
    • 临时顺序节点:就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀
  • 事务ID

    • 首先,先了解,事务是对物理和抽象的应用状态上的操作集合。往往在现在的概念中,狭义上的事务通常指的是数据库事务,一般包含了一系列对数据库有序的读写操作,这些数据库事务具有所谓的ACID特性,即原子性(Atomic)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。
    • 在ZooKeeper中,事务是指能够改变ZooKeeper服务器状态的操作,我们也称之为事务操作或更新操作,一般包括数据节点创建与删除、数据节点内容更新等操作。对于每一个事务请求,ZooKeeper都会为其分配一个全局唯一的事务ID,用 ZXID 来表示,通常是一个 64 位的数字。每一个 ZXID 对应一次更新操作,从这些ZXID中可以间接地识别出ZooKeeper处理这些更新操作请求的全局顺序
    • zk中的事务指的是对zk服务器状态改变的操作(create、update data、更新字节点);zk对这些事务操作都会编号,这个编号是自增长的,被称为ZXID

      2、ZNode的状态信息

      ```shell

      首先打开zk客户端来连接zk集群

      sh bin/zkCli.sh

获取当前根目录的zookeeper节点上存有的数据,命令:get /zookeeper

[zk: localhost:2181(CONNECTED) 1] get /zookeeper

得到以下结果:(第一行空着,是因为第一行存的是数据信息,此时没有数据,因此空行,之后的信息都属于节点的状态信息)

cZxid = 0x0 ctime = Wed Dec 31 19:00:00 EST 1969 mZxid = 0x0 mtime = Wed Dec 31 19:00:00 EST 1969 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1

查看子节点

ls /zookeeper

  1. 整个 ZNode 节点内容包括两部分:节点数据内容和节点状态信息。数据内容是空,其他的属于状态信息。那么这些状态信息都有什么含义呢?
  2. - cZxid 就是 Create ZXID,表示节点被创建时的事务ID
  3. - ctime 就是 Create Time,表示节点创建时间。
  4. - mZxid 就是 Modified ZXID,表示节点最后一次被修改时的事务ID
  5. - mtime 就是 Modified Time,表示节点最后一次被修改的时间。
  6. - pZxid 表示该节点的子节点列表最后一次被修改时的事务 ID。只有子节点列表变更才会更新 pZxid,子节点内容变更不会更新。
  7. - cversion 表示子节点的版本号。
  8. - dataVersion 表示内容版本号。
  9. - aclVersion 标识acl版本
  10. - ephemeralOwner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0
  11. - dataLength 表示数据长度。
  12. - numChildren 表示直系子节点数。
  13. <a name="RtBLs"></a>
  14. ## 3、Watcher 机制
  15. - Zookeeper使用Watcher机制实现分布式数据的**发布/订阅**功能
  16. - 一个典型的发布/订阅模型系统定义了一种 **一对多** 的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。
  17. - ZooKeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher,那么Zk就会向指定客户端发送一个事件通知来实现分布式的通知功能。
  18. - 整个Watcher注册与通知过程如图所示
  19. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607669183592-1e9c3288-c67c-4675-b6a4-89b54f4b9fae.png#align=left&display=inline&height=200&margin=%5Bobject%20Object%5D&name=image.png&originHeight=314&originWidth=582&size=36306&status=done&style=shadow&width=370)<br />Zookeeper的Watcher机制主要包括 **客户端线程**、**客户端WatcherManager**、**Zookeeper服务器**** **三部分。具体工作流程为:
  20. - 客户端在向Zookeeper服务器注册的同时,会将Watcher对象存储在客户端的WatcherManager当中
  21. - Zookeeper服务器触发Watcher事件后,会向客户端发送通知
  22. - 客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑
  23. ---
  24. <a name="UJ8PG"></a>
  25. # 五、ZooKeeper内部原理
  26. <a name="5UkJk"></a>
  27. ## 1、leader选举
  28. **选举机制**
  29. - **半数机制**:集群中半数以上机器存活,集群可用,所以Zookeeper适合安装 **奇数** 台服务器
  30. - Zookeeper虽然在配置文件中并没有指定MasterSlave,但是Zookeeper工作时,是有一个节点为Leader,其它为FollowerLeader是通过内部的选举机制产生的
  31. **集群首次启动**
  32. - 假设有五台服务器组成的Zookeeper集群,它们的id1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么
  33. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607684973902-c85d4c07-f4a1-4a42-a173-a9585106566d.png#align=left&display=inline&height=219&margin=%5Bobject%20Object%5D&name=image.png&originHeight=549&originWidth=1446&size=186308&status=done&style=shadow&width=576)<br />**Zookeeper的选举机制**<br />(1)服务器1启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是LOOKING状态 <br />(2)服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器同意选举它(这个例子中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。<br />(3)服务器3启动,根据前面的理论分析,服务器3成为服务器1、2、3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader<br />(4)服务器4启动,根据前面的分析,理论上服务器4应该是服务器1、2、3、4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了<br />(5)服务器5启动,同4一样称为follower
  34. **集群非首次启动**
  35. - 每个节点在选举时都会参考自身节点的zxid值(事务ID);
  36. - **优先选择 zxid 值大的节点称为Leader**
  37. <a name="OTuUG"></a>
  38. ## 2、ZAB一致性协议
  39. <a name="nGvxO"></a>
  40. ### ① 分布式数据一致性问题
  41. - 将数据复制到分布式部署的多台机器中,可以消除单点故障,防止系统由于某台(些)机器宕机导致的不可用
  42. - 通过负载均衡技术,能够让分布在不同地方的数据副本全都对外提供服务。有效提高系统性能
  43. - 在分布式系统中引入数据复制机制后,多台数据节点之间由于网络等原因很容易产生数据不一致的情况
  44. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607685191884-56004488-c76b-49af-acee-b4fca836ef2f.png#align=left&display=inline&height=258&margin=%5Bobject%20Object%5D&name=image.png&originHeight=515&originWidth=810&size=52299&status=done&style=shadow&width=405)
  45. <a name="xzCf9"></a>
  46. ### ② ZAB协议
  47. - ZK就是分布式一致性问题的工业解决方案,paxos是其底层理论算法(以晦涩难懂著称),其中zabraft和众多开源算法是对paxos的工业级实现。ZK没有完全采用paxos算法,而是使用了一种称为 **Z****ookeeper ****A****tomic ****B****roadcast**(ZABZookeeper原子消息广播协议)的协议作为其数据一致性的核心算法
  48. - ZAB 协议是为分布式协调服务 Zookeeper 专门设计的一种支持**崩溃恢复**和**原子广播**的协议
  49. **主备模式保证一致性**
  50. - ZK怎么处理集群中的数据?所有客户端写入数据都是写入Leader中,然后由 Leader 复制到Follower中。ZAB会将服务器数据的状态变更以事务Proposal的形式广播到所有的副本进程上,ZAB协议能够保证了事务操作的一个全局的变更序号(ZXID)
  51. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607685410605-1c026e61-caa9-4d9e-9e44-ea2631b0ab80.png?x-oss-process=image%2Fwatermark%2Ctype_d3F5LW1pY3JvaGVp%2Csize_10%2Ctext_TGFuY2VNYWk%3D%2Ccolor_FFFFFF%2Cshadow_50%2Ct_80%2Cg_se%2Cx_10%2Cy_10#align=left&display=inline&height=424&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1188&originWidth=1547&size=257915&status=done&style=shadow&width=552)
  52. **广播消息**
  53. - ZAB 协议的消息广播过程类似于 **二阶段提交过程**。对于客户端发送的写请求,全部由 Leader 接收,Leader 将请求封装成一个事务 Proposal(提议),将其发送给所有 Follwer ,如果收到超过半数反馈ACK,则执行 Commit 操作(先提交自己,再发送 Commit 给所有 Follwer
  54. - **不能正常反馈的Follower在恢复正常后会进入数据同步阶段最终与Leader保持一致**
  55. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607685590443-bfdff7b6-ebbe-43e2-9a3c-aac1e3a2aec2.png?x-oss-process=image%2Fwatermark%2Ctype_d3F5LW1pY3JvaGVp%2Csize_10%2Ctext_TGFuY2VNYWk%3D%2Ccolor_FFFFFF%2Cshadow_50%2Ct_80%2Cg_se%2Cx_10%2Cy_10#align=left&display=inline&height=302&margin=%5Bobject%20Object%5D&name=image.png&originHeight=829&originWidth=1746&size=181580&status=done&style=shadow&width=637)
  56. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607685605396-5c1a2180-6efa-4fa8-85ca-f1e1d119d8c1.png?x-oss-process=image%2Fwatermark%2Ctype_d3F5LW1pY3JvaGVp%2Csize_10%2Ctext_TGFuY2VNYWk%3D%2Ccolor_FFFFFF%2Cshadow_50%2Ct_80%2Cg_se%2Cx_10%2Cy_10#align=left&display=inline&height=287&margin=%5Bobject%20Object%5D&name=image.png&originHeight=751&originWidth=1727&size=169111&status=done&style=shadow&width=660)
  57. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607685623118-9b6cded2-a2cc-4f01-b95d-41b14bfcf2d3.png?x-oss-process=image%2Fwatermark%2Ctype_d3F5LW1pY3JvaGVp%2Csize_10%2Ctext_TGFuY2VNYWk%3D%2Ccolor_FFFFFF%2Cshadow_50%2Ct_80%2Cg_se%2Cx_10%2Cy_10#align=left&display=inline&height=316&margin=%5Bobject%20Object%5D&name=image.png&originHeight=816&originWidth=1795&size=185323&status=done&style=shadow&width=696)
  58. - **具体说明**
  59. - Leader接收到Client请求之后,会将这个请求封装成一个事务,并给这个事务分配一个全局递增的唯一 ID,称为事务IDZXID),ZAB 协议要求保证事务的顺序,因此必须将每一个事务按照 ZXID进行**先后排序后处理**
  60. - ZK集群为了保证任何事务操作能够有序的顺序执行,只能是 Leader 服务器接受**写**请求,即使是Follower 服务器接受到客户端的请求,也会转发到 Leader 服务器进行处理
  61. <a name="zIMVW"></a>
  62. ### leader崩溃问题
  63. - Leader宕机后,ZK集群无法正常工作,ZAB协议提供了一个高效且可靠的leader选举算法
  64. - Leader宕机后,被选举的新Leader需要解决的问题
  65. - ZAB 协议确保那些已经在 Leader 提交的事务最终会被所有服务器提交。
  66. - ZAB 协议确保丢弃那些只在 Leader 提出/复制,但没有提交的事务。
  67. - 基于上面的目的,ZAB协议设计了一个选举算法:能够确保已经被Leader提交的事务被集群接受,丢弃还没有提交的事务
  68. - **这个选举算法的关键点:保证选举出的新Leader拥有集群中所有节点最大编号(ZXID)的事务!!!**
  69. ---
  70. <a name="0EOL1"></a>
  71. # 六、ZooKeeper应用实践
  72. - ZooKeeper是一个典型的发布/订阅模式的分布式数据管理与协调框架,我们可以使用它来进行分布式数据的发布与订阅。另一方面,通过对ZooKeeper中丰富的数据节点类型进行交叉使用,配合Watcher事件通知机制,可以非常方便地构建一系列分布式应用中都会涉及的核心功能,如**数据发布/订阅**、**命名服务**、**集群管理**、**Master选举**、**分布式锁**和**分布式队列**等。那接下来就针对这些典型的分布式应用场景来做下介绍
  73. - **Zookeeper的两大特性**
  74. 1. 客户端如果对Zookeeper的数据节点注册Watcher监听,那么当该数据节点的内容或是其子节点列表发生变更时,Zookeeper服务器就会向订阅的客户端发送变更通知。
  75. 1. 对在Zookeeper上创建的临时节点,一旦客户端与服务器之间的会话失效,那么临时节点也会被自动删除
  76. - 利用其两大特性,可以实现集群机器存活监控系统,若监控系统在/clusterServers节点上注册一个Watcher监听,那么但凡进行动态添加机器的操作,就会在/clusterServers节点下创建一个临时节点:/clusterServers/[Hostname],这样,监控系统就能够实时监测机器的变动情况。
  77. <a name="VA5bY"></a>
  78. ## 1、服务器动态上下线监听
  79. - 分布式系统中,主节点会有多台,主节点可能因为任何原因出现宕机或者下线,而任意一台客户端都要能实时感知到主节点服务器的上下线
  80. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/2322054/1607694136678-ce853101-773a-4a3c-bebc-caa48a9c735d.png?x-oss-process=image%2Fwatermark%2Ctype_d3F5LW1pY3JvaGVp%2Csize_10%2Ctext_TGFuY2VNYWk%3D%2Ccolor_FFFFFF%2Cshadow_50%2Ct_80%2Cg_se%2Cx_10%2Cy_10#align=left&display=inline&height=298&margin=%5Bobject%20Object%5D&name=image.png&originHeight=864&originWidth=1890&size=365329&status=done&style=shadow&width=652)
  81. <a name="ZOOlQ"></a>
  82. ### 具体实现
  83. - 服务端
  84. ```java
  85. // 提供客户端需要的时间查询服务,服务端向zk建立临时节点
  86. public class Server {
  87. //获取zkClient
  88. ZkClient zkClient = null;
  89. private void connectZk() {
  90. //创建zkClient
  91. zkClient = new ZkClient("linux121:2181,linux122:2181");
  92. //创建服务端建立临时节点的目录
  93. if (!zkClient.exists("/servers")) {
  94. zkClient.createPersistent("/servers");
  95. }
  96. }
  97. //告知zk服务器相关信息
  98. private void saveServerInfo(String ip, String port) {
  99. zkClient.createEphemeralSequential("/servers/server", ip + ":" + port);
  100. System.out.println("---------------->>> 服务器: " + ip + ":" + port + "向zk服务器保存信息成功,成功上线可接受client查询");
  101. }
  102. public static void main(String[] args) {
  103. //准备两个服务器启动上线(采用多线程模拟,一个线程代表一个服务器)
  104. Server server = new Server();
  105. server.connectZk();
  106. server.saveServerInfo(args[0], args[1]);
  107. //提供时间服务的线程还没有启动,创建一个线程类,可以接受socket请求
  108. TimeService timeService = new TimeService(Integer.parseInt(args[1]));
  109. timeService.start();
  110. }
  111. }
  • 服务端提供时间查询的线程类

    public class TimeService extends Thread{
    
      private int port = 0;
    
      public TimeService(int port) {
          this.port = port;
      }
    
      @Override
      public void run() {
          // 通过socket与client进行交流,启动serverSocket监听请求
          try {
              //指定监听的端口
              ServerSocket serverSocket = new ServerSocket(port);
    
              //保证服务端一直运行
              while (true) {
                  Socket socket = serverSocket.accept();
                  //获取client发送过来的内容,server只考虑发送一个时间值
                  OutputStream out = socket.getOutputStream();
                  out.write(new Date().toString().getBytes());
              }
    
          } catch (IOException e) {
              e.printStackTrace();
          }
      }
    }
    
  • 客户端

    //注册监听zk指定目录
    //维护自己本地一个servers信息,收到通知要进行更新
    //发送时间查询请求并接受服务端返回的数据
    public class Client {
    
      //获取zkClient
      ZkClient zkClient = null;
    
      //维护一个servers信息集合
      ArrayList<String> infos = new ArrayList<>();
    
      private void connectZk() {
          //创建zkClient
          zkClient = new ZkClient("linux121:2181,linux122:2181");
          //第一次获取服务器信息, 所有的子节点
          List<String> children = zkClient.getChildren("/servers");
          for (String child : children) {
              //存储着 ip + 端口
              Object o = zkClient.readData("/servers/" + child);
              infos.add(String.valueOf(o));
          }
    
          //对servers目录进行监听
          zkClient.subscribeChildChanges("/servers", new IZkChildListener() {
              @Override
              public void handleChildChange(String path, List<String> children) throws Exception {
                  //接收到通知,说明节点发生了变化,Client需要更新infos集合中的数据
                  ArrayList<String> list = new ArrayList<>();
                  //遍历更新过后的所有节点信息
                  for (String child : children) {
                      list.add(String.valueOf(child));
                  }
                  //新数据覆盖老数据
                  infos = list;
                  System.out.println("--》接收到通知,最新服务器信息为:" + infos);
              }
          });
      }
    
      //发送时间查询的请求
      public void sendRequest() throws IOException {
          //目标服务器地址
          Random random = new Random();
          int i = random.nextInt(infos.size());
          String ipPort = infos.get(i);
          String[] ipAndPort = ipPort.split(":");
    
          //建立socket连接
          Socket socket = new Socket(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
          OutputStream out = socket.getOutputStream();
          InputStream in = socket.getInputStream();
    
          //发送数据
          out.write("query time".getBytes());
          out.flush();
    
          //接收返回结果
          byte[] bytes = new byte[1024];
          int result = in.read(bytes);
          System.out.println("client端接收到server:+" + ipPort + "+返回结果:" + new String(bytes));
    
          //释放资源
          in.close();
          out.close();
          socket.close();
      }
    
      public static void main(String[] args) throws IOException, InterruptedException {
    
          Client client = new Client();
          client.connectZk(); //监听器逻辑
    
          while (true) {
              client.sendRequest(); //发送请求
              //每隔几秒钟发送一次请求
              Thread.sleep(2000);
          }
      }
    }
    

    2、分布式锁

    什么是锁

  • 在单机程序中,当存在多个线程可以同时改变某个变量(可变共享变量)时,为了保证线程安全(数据不能出现脏数据)就需要对变量或代码块做同步,使其在修改这种变量时能够串行执行消除并发修改变量。

  • 对变量或者堆代码码块做同步本质上就是加锁。目的就是实现多个线程在一个时刻同一个代码块只能有一个线程可执行

    分布式锁

  • 分布式的环境中会不会出现脏数据的情况呢?类似单机程序中线程安全的问题。观察下面的例子

image.png
image.png
image.png
image.png

zk实现分布式锁

实现思路

  • 利用Zookeeper可以创建临时带序号节点的特性来实现一个分布式锁
  • 锁就是zk指定目录下序号最小的临时序列节点,多个系统的多个线程都要在此目录下创建临时的顺序节点,因为Zk会为我们保证节点的顺序性,所以可以利用节点的顺序进行锁的判断。
  • 每个线程都是先创建临时顺序节点,然后获取当前目录下最小的节点(序号),判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。
  • 获取锁失败的线程获取当前节点上一个临时顺序节点,并对对此节点进行监听,当该节点删除的时候(上一个线程执行结束删除或者是掉线zk删除临时节点)这个线程会获取到通知,代表获取到了锁

image.png

  • main 方法 ```java package com.lagou.zk.dislock;

//zk实现分布式锁 public class DisLockTest { public static void main(String[] args) {

    //使用10个线程模拟分布式环境
    for (int i = 0; i < 10; i++) {
        new Thread(new DisLockRunnable()).start();//启动线程
    }
}
static class DisLockRunnable implements Runnable {

    public void run() {
        //每个线程具体的任务就是抢锁
        final DisClient client = new DisClient();
        client.getDisLock();

        //模拟获取锁之后的其它动作
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //释放锁
        client.deleteLock();
    }
}

}


- **核心实现**
```java
package com.lagou.zk.dislock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

//抢锁
//1. 去zk创建临时序列节点,并获取到序号
//2. 判断自己创建节点序号是否是当前节点最小序号,如果是则获取锁执行相关操作,最后要释放锁
//3. 不是最小节点,当前线程需要等待,等待你的前一个序号的节点被删除,然后再次判断自己是否是最小节点。。。
public class DisClient {

    public DisClient() {
        //初始化zk的/distrilocl节点,会出现线程安全问题,需要用 synchronized 来锁住每一个进程
        synchronized (DisClient.class){
            if (!zkClient.exists("/distrilock")) {
                zkClient.createPersistent("/distrilock");
            }
        }
    }

    String beforeNodePath;
    String currentNodePath;
    CountDownLatch countDownLatch = null;

    //获取到zkClient
    private ZkClient zkClient = new ZkClient("linux121:2181,linux122:2181");

    //把抢锁过程分为两部分,一部分是创建节点,用于比较序号,另一部分是等待锁

    //完整获取锁方法
    public void getDisLock() {

        //获取到当前线程名称
        final String threadName = Thread.currentThread().getName();

        //首先调用tryGetLock
        if (tryGetLock()) {
            //说明获取到锁
            System.out.println(threadName + ":获取到了锁");
        } else {
            // 没有获取到锁
            System.out.println(threadName + ":获取锁失败,进入等待状态");
            waitForLock();
            //递归获取锁
            getDisLock();
        }
    }

    //尝试获取锁
    public boolean tryGetLock() {
        //创建临时顺序节点,/distrilock/序号
        if (null == currentNodePath || "".equals(currentNodePath)) {
            currentNodePath = zkClient.createEphemeralSequential("/distrilock/", "lock");
        }

        //获取到/distrilock下所有的子节点
        final List<String> childs = zkClient.getChildren("/distrilock");

        //对节点信息进行排序
        Collections.sort(childs); //默认是升序
        final String minNode = childs.get(0);

        //判断自己创建节点是否与最小序号一致
        if (currentNodePath.equals("/distrilock/" + minNode)) {
            //说明当前线程创建的就是序号最小节点
            return true;
        } else {
            //说明最小节点不是自己创建,要监控自己当前节点序号前一个的节点
            final int i = Collections.binarySearch(childs, currentNodePath.substring("/distrilock/".length()));
            //前一个(lastNodeChild是不包括父节点)
            String lastNodeChild = childs.get(i - 1);
            beforeNodePath = "/distrilock/" + lastNodeChild;
        }
        return false;
    }

    //等待之前节点释放锁,如何判断锁被释放,需要唤醒线程继续尝试tryGetLock
    public void waitForLock() {
        //准备一个监听器
        final IZkDataListener iZkDataListener = new IZkDataListener() {

            public void handleDataChange(String s, Object o) throws Exception {
            }

            //删除
            public void handleDataDeleted(String s) throws Exception {

                //提醒当前线程再次获取锁
                countDownLatch.countDown();//把值减1变为0,唤醒之前await线程
            }
        };

        //监控前一个节点
        zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener);

        //在监听的通知没来之前,该线程应该是等待状态,先判断一次上一个节点是否还存在
        if (zkClient.exists(beforeNodePath)) {
            //开始等待,CountDownLatch:线程同步计数器
            countDownLatch = new CountDownLatch(1); // ???
            try {
                countDownLatch.await();//阻塞,countDownLatch值变为 0
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //解除监听
        zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener);
    }

    //释放锁
    public void deleteLock() {
        if (zkClient != null) {
            zkClient.delete(currentNodePath);
            zkClient.close();
        }
    }
}
  • 注:分布式锁的实现可以是 Redis、Zookeeper,相对来说生产环境如果使用分布式锁可以考虑使用 **Redis **实现