talk is cheap, show me the code

ZK实现分布式锁简介

zk分布式锁,其实可以做的比较简单,本质上就是某个节点尝试创建临时znode,此时创建成功了就获取了这个锁,这个时候,别的客户端来创建锁会失败,只能注册一个监听器来监听这个锁,释放锁就是删除这个znode,一旦释放掉就会通知客户端,然后有一个等待着的客户端就可以重新加锁。
zk是以paxos算法为基础的分布式应用程序协调服务,zk的数据节点和文件目录类似,所以我们可以用此特性实现分布式锁。
我们以某个资源为目录,然后这个目录下面的节点就是我们需要获取锁的客户端,未获取到锁的客户端注册需要注册watcher到上一个客户端,可以用下图表示:
image.png
/lock是我们用于加锁的目录,/resource_name是我们锁定的资源,其下面的节点按照我们加锁的顺序排列。

Curator

Curator封装了ZK底层的api,使我们更加容易方便的对zk进行操作,并且封装了他的分布式锁功能,这样我们就不需要自己的实现了。
Curator实现了可重入锁(InterProcessMutex)也实现了不可重入锁(InterProcessSemaphoreMutex)在可重入锁中还实现了读写锁。

InterProcessMutex

InterProcessMutex是Curator实现的可重入锁,我们可以通过下面一段代码实现可重入锁:
image.png
我们利用acquire加锁,利用release释放锁
加锁流程如下:

  • 首先进行可重入的判定:这里的可重入锁记录在ConcurrentMapthreadData这个map里,如果threadData.get(currentThread)是有值的那么就证明是可重入锁,然后记录就会加1,
  • 然后在资源目录下创建一个节点,比如这里创建一个/00000000002这个节点,这个节点需要设置为EPHEMERAL_SEQUENTIAL也就是临时节点并且有序。
  • 获取当前目录下所有子节点,判断自己的节点是否位于子节点第一个。
  • 如果是第一个,则获取到锁,可以返回
  • 如果不是第一个,则证明前面已经有人获取到锁了,那么需要获取自己节点的前一个节点。/000002的前一个节点是/00000001,我们获取到这个节点之后,在上面注册一个watcher(这里的watcher其实调用的是notifyAll,用来解除阻塞)
  • object.wait(timeout)或object.wait()进入阻塞。和上一步的watcher相对应

解锁流程如下:

  • 首先进行可重入锁的判定:如果有可重入锁只需要次数减1即可,减1之后加收次数为0的话继续下面步骤,否则返回0
  • 删除当前节点
  • 删除threadDataMap里面的可重入锁的数据。

读写锁

Curator 提供了读写锁,其实现类是 InterProcessReadWriteLock,这里的每个节点都会加上前缀:
privatestaticfinalString READLOCKNAME = “READ“;
privatestaticfinalString WRITE_LOCK_NAME = “__WRIT
“;
根据不同的前缀区分是读锁还是写锁,对于读锁,如果发现前面有写锁,那么需要将 Watcher 注册到和自己最近的写锁。写锁的逻辑和我们之前分析可重入锁加锁的流程保持不变。

锁超时

ZooKeeper 不需要配置锁超时,由于我们设置节点是临时节点,我们的每个机器维护着一个 ZK 的 Session,通过这个 Session,ZK 可以判断机器是否宕机。
如果我们的机器挂掉的话,那么这个临时节点对应的就会被删除,所以我们不需要关心锁超时。

ZK小结

  • 优点:ZK 可以不需要关心锁超时时间,实现起来有现成的第三方包,比较方便,并且支持读写锁,ZK 获取锁会按照加锁的顺序,所以其是公平锁。对于高可用利用 ZK 集群进行保证。
  • 缺点:ZK 需要额外维护,增加维护成本(毕竟第三方依赖),性能和 MySQL 相差不大,依然比较差。并且需要开发人员了解 ZK 是什么。

下面是zk实现分布式锁的一个demo,用于学习理解

  1. import org.apache.zookeeper.*;
  2. import org.apache.zookeeper.data.Stat;
  3. import java.util.concurrent.CountDownLatch;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * ZooKeeperSession
  7. *
  8. */
  9. public class ZooKeeperSession {
  10. private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
  11. private ZooKeeper zookeeper;
  12. private CountDownLatch latch;
  13. public ZooKeeperSession() {
  14. try {
  15. this.zookeeper = new ZooKeeper(
  16. "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181",
  17. 50000,
  18. (Watcher) new ZooKeeperWatcher());
  19. try {
  20. connectedSemaphore.await();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println("ZooKeeper session established......");
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. /**
  30. * 获取分布式锁
  31. *
  32. * @param productId
  33. */
  34. public Boolean acquireDistributedLock(Long productId) {
  35. String path = "/product-lock-" + productId;
  36. try {
  37. zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  38. return true;
  39. } catch (Exception e) {
  40. while (true) {
  41. try {
  42. // 相当于是给node注册一个监听器,去看看这个监听器是否存在
  43. Stat stat = this.zookeeper.exists(path, true);
  44. long waitTime = 50000L;
  45. if (stat != null) {
  46. this.latch = new CountDownLatch(1);
  47. this.latch.await(waitTime, TimeUnit.MILLISECONDS);
  48. this.latch = null;
  49. }
  50. zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
  51. return true;
  52. } catch (Exception ee) {
  53. continue;
  54. }
  55. }
  56. }
  57. }
  58. /**
  59. * 释放掉一个分布式锁
  60. *
  61. * @param productId
  62. */
  63. public void releaseDistributedLock(Long productId) {
  64. String path = "/product-lock-" + productId;
  65. try {
  66. zookeeper.delete(path, -1);
  67. System.out.println("release the lock for product[id=" + productId + "]......");
  68. } catch (Exception e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. /**
  73. * 建立zk session的watcher
  74. *
  75. * @author bingo
  76. * @since 2018/11/29
  77. */
  78. private class ZooKeeperWatcher implements Watcher {
  79. public void process(WatchedEvent event) {
  80. System.out.println("Receive watched event: " + event.getState());
  81. if (Event.KeeperState.SyncConnected == event.getState()) {
  82. connectedSemaphore.countDown();
  83. }
  84. if (connectedSemaphore != null) {
  85. connectedSemaphore.countDown();
  86. }
  87. }
  88. }
  89. /**
  90. * 封装单例的静态内部类
  91. *
  92. * @author bingo
  93. * @since 2018/11/29
  94. */
  95. private static class Singleton {
  96. private static ZooKeeperSession instance;
  97. static {
  98. instance = new ZooKeeperSession();
  99. }
  100. public static ZooKeeperSession getInstance() {
  101. return instance;
  102. }
  103. }
  104. /**
  105. * 获取单例
  106. *
  107. * @return
  108. */
  109. public static ZooKeeperSession getInstance() {
  110. return Singleton.getInstance();
  111. }
  112. /**
  113. * 初始化单例的便捷方法
  114. */
  115. public static void init() {
  116. getInstance();
  117. }
  118. }

redis 分布式锁和 zk 分布式锁的对比

  • redis 分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。
  • zk 分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。

另外一点就是,如果是 redis 获取锁的那个客户端 出现 bug 挂了,那么只能等待超时时间之后才能释放锁;而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁。
redis 分布式锁大家没发现好麻烦吗?遍历上锁,计算时间等等……zk 的分布式锁语义清晰实现简单。
所以先不分析太多的东西,就说这两点,我个人实践认为 zk 的分布式锁比 redis 的分布式锁牢靠、而且模型简单易用。