什么叫分布式锁呢? 比如说“进程1”在使用该资源的时候,会先去获得锁,“进程1”获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,“进程1”用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们就把这个分布式环境下的这个锁叫做分布式锁。

E-ZK分布式锁案例 - 图1

原生Zookeeper实现分布式锁案例

分布式锁实现

  1. package com.zh.lock2;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import java.util.Collections;
  5. import java.util.List;
  6. import java.util.concurrent.CountDownLatch;
  7. /**
  8. * author: zhanghui
  9. * project: big-data-learning
  10. * package: com.zh.lock2
  11. * filename: DistributedLock
  12. * date: 2022/3/19 12:54
  13. * description: 分布式锁案例
  14. */
  15. public class DistributedLock {
  16. // zookeeper server列表
  17. private String connectString = "linux102:2181,linux103:2181,linux104:2181";
  18. // 超时时间
  19. private int sessionTimeout = 2000;
  20. private ZooKeeper zk;
  21. private String rootNode = "locks";
  22. private String subNode = "seq-";
  23. // 当前client等待的子节点
  24. private String waitPath;
  25. // ZooKeeper连接
  26. private CountDownLatch connectLatch = new CountDownLatch(1);
  27. // Zookeeper节点等待
  28. private CountDownLatch waitLatch = new CountDownLatch(1);
  29. // 当前Client创建的节点
  30. private String currentNode;
  31. // 和Zk服务建立连接,并创建根节点
  32. public DistributedLock() throws Exception {
  33. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  34. @Override
  35. public void process(WatchedEvent event) {
  36. // 连接建立时,打开latch,唤醒wait在该latch上的线程
  37. if (event.getState() == Event.KeeperState.SyncConnected) {
  38. connectLatch.countDown();
  39. }
  40. // 发生了waitPath的删除事件
  41. if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
  42. waitLatch.countDown();
  43. }
  44. }
  45. });
  46. // 等待连接建立
  47. connectLatch.await();
  48. // 获取跟节点状态
  49. Stat stat = zk.exists("/" + rootNode, false);
  50. // 如果跟节点不存在,则创建根节点,根节点类型为永久节点
  51. if (stat == null) {
  52. System.out.println("根节点不存在!");
  53. zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  54. }
  55. }
  56. // 加锁方法
  57. public void zkLock() {
  58. try {
  59. // 在根节点下创建临时顺序节点,返回值为创建的节点路径
  60. currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  61. // wait一小会,让结果更清晰一些
  62. Thread.sleep(10);
  63. // 注意,没有必要监听“/locks”的子节点的变化情况
  64. List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
  65. // 列表中只有一个子节点,那肯定就是currentNode,说明client获得锁
  66. if (childrenNodes.size() == 1) {
  67. return;
  68. } else {
  69. // 对跟节点下的所有临时顺序接地那进行从小到大排序
  70. Collections.sort(childrenNodes);
  71. // 当前节点名称
  72. String thisNode = currentNode.substring(("/" + rootNode + "/").length());
  73. int index = childrenNodes.indexOf(thisNode);
  74. if (index == -1) {
  75. System.out.println("数据异常");
  76. } else if (index == 0) {
  77. // index == 0, 说明thisNode在列表中最小,当前client获得锁
  78. return;
  79. } else {
  80. // 获得排名比currentNode前1位的节点
  81. this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
  82. // 在waitPath上注册监听器,当waitPath被删除时,Zookeeper会回调监听器的process方法
  83. zk.getData(waitPath, true, new Stat());
  84. // 进入等待锁状态
  85. waitLatch.await();
  86. return;
  87. }
  88. }
  89. } catch (InterruptedException e) {
  90. e.printStackTrace();
  91. } catch (KeeperException e) {
  92. e.printStackTrace();
  93. }
  94. }
  95. // 解锁方法
  96. public void zkUnlock() {
  97. try {
  98. zk.delete(this.currentNode, -1);
  99. } catch (InterruptedException e) {
  100. e.printStackTrace();
  101. } catch (KeeperException e) {
  102. e.printStackTrace();
  103. }
  104. }
  105. }

分布式锁测试

创建两个线程

  1. package com.zh.lock2;
  2. /**
  3. * author: zhanghui
  4. * project: big-data-learning
  5. * package: com.zh.lock2
  6. * filename: DistributedLockTest
  7. * date: 2022/3/19 14:08
  8. * description: 分布式锁案例测试
  9. */
  10. public class DistributedLockTest {
  11. public static void main(String[] args) throws Exception {
  12. // 创建分布式锁1
  13. final DistributedLock lock1 = new DistributedLock();
  14. // 创建分布式锁2
  15. final DistributedLock lock2 = new DistributedLock();
  16. new Thread(new Runnable() {
  17. @Override
  18. public void run() {
  19. // 获取锁对象
  20. try {
  21. lock1.zkLock();
  22. System.out.println("线程1获取锁");
  23. Thread.sleep(5 * 1000);
  24. lock1.zkUnlock();
  25. System.out.println("线程1释放锁");
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }).start();
  31. new Thread(new Runnable() {
  32. @Override
  33. public void run() {
  34. // 获取锁对象
  35. try {
  36. lock2.zkLock();
  37. System.out.println("线程2获取锁");
  38. Thread.sleep(5 * 1000);
  39. lock2.zkUnlock();
  40. System.out.println("线程2释放锁");
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. }).start();
  46. }
  47. }

观察控制台变化

线程2获取锁
线程2释放锁
线程1获取锁
线程1释放锁

Curator框架实现分布式锁案例

原生的java API 开发存在的问题

  1. 会话连接是异步的,需要自己去处理。比如使用CountDownLatch
  2. Watch需要重复注册,不然就不能生效
  3. 开发的复杂性还是比较高的
  4. 不支持多节点删除和创建。需要自己去递归

Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题

curator官方文档

Curator案例实操

添加依赖

<!-- Curator依赖 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
</dependency>

代码实现

package com.zh.lock;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.CountDownLatch;

/**
 * author: zhanghui
 * project: big-data-learning
 * package: com.zh.lock
 * filename: CuratorLockTest
 * date: 2022/3/19 14:22
 * description:
 */
public class CuratorLockTest {

    private String rootNode = "/locks";

    // zookeeper server列表
    private String connectSting = "linux102:2181,linux103:2181,linux104:2181";
    // connection超时时间
    private int connectTimeout = 2000;
    // session超时时间
    private int sessionTimeout = 2000;

    public static void main(String[] args) {
        new CuratorLockTest().test();
    }

    private void test() {
        // 创建分布式锁1
        final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);

        // 创建分布式锁2
        final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock1.acquire();
                    System.out.println("线程1获得锁");

                    // 测试锁重入
                    lock1.acquire();
                    System.out.println("线程1再次获得锁");
                    Thread.sleep(5 * 1000);
                    lock1.release();
                    System.out.println("线程1释放锁");
                    lock1.release();
                    System.out.println("线程1再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock2.acquire();
                    System.out.println("线程2获得锁");

                    // 测试锁重入
                    lock2.acquire();
                    System.out.println("线程2再次获得锁");
                    Thread.sleep(5 * 1000);
                    lock2.release();
                    System.out.println("线程2释放锁");
                    lock2.release();
                    System.out.println("线程2再次释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    // 分布式锁初始化
    public CuratorFramework getCuratorFramework() {
        // 重试策略,初试时间3秒,重试3次
        RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);

        // 通过工厂创建Curator
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectSting)
                .connectionTimeoutMs(connectTimeout)
                .sessionTimeoutMs(sessionTimeout)
                .retryPolicy(policy).build();
        // 开启连接
        client.start();
        System.out.println("zookeeper 初始化完成...");
        return client;
    }
}

观察控制台变化

线程1获得锁
线程1再次获得锁
线程1释放锁
线程1再次释放锁
线程2获得锁
线程2再次获得锁
线程2释放锁
线程2再次释放锁