在分布式架构中,由于不同的机器(进程)会同时执行同一个业务逻辑,但是不同于单机模式,可以使用 sync 或 lock 来实现对线程的控制,而分布式锁,就是控制进程执行顺序的

MySQL 分布式锁

使用 for update 语句,将行锁住,实现分布式锁

  1. select count(1) from distributed_lock where lock_name = 'LOCK' for update
  • 在表中没有数据的情况下,我们先看一下 explain 上述语句的结果:

image.png

再看一下查询优化器优化上述语句的结果

  1. select count(1) AS `count(1)` from `test`.`distributed_lock` `t` where (NULL = 'LOCK')
  • 在表中只有一条数据的情况下,我们先看一下 explain 上述语句的结果:

image.png

再看一下查询优化器优化上述语句的结果

  1. select count(1) AS `count(1)` from `test`.`distributed_lock` `t` where 1

我们可以看出,LOCK_NAME 虽然最了唯一索引,但是执行上述语句的时候,MySQL 的查询优化器并没有使用索引,而是使用了常量,而 for update 语句,一旦没有使用索引,那么 for update 就不会生效(也就是不能锁住行),那么这种实现就是有问题的,只有在数据量比较大的时候,才会使用到索引,那么这种实现的弊端就是,需要一张有很多数据的表

Redis 分布式锁

Redis 分布式锁,主要是保证原子性的操作,同时要保证 key 在业务期间不能过期

  • ex(expire):设置键的过期时间
  • nx(set if not exists):只在键不存在时,才对键进行设置操作
  1. @Override
  2. public boolean tryLock() {
  3. Jedis jedis = jedisPool.getResource();
  4. SetParams params = new SetParams();
  5. params.ex(5); // 5s 后会过期
  6. params.nx(); // 如果 key 不存在,则创建成功,否则,创建失败
  7. String value = UUID.randomUUID().toString();
  8. // 保证了原子性,将设置过期时间和设置值作为一步操作
  9. String result = jedis.set(LOCK_KEY, value, params);
  10. jedis.close();
  11. if ("OK".equals(result)) {
  12. // 拿到了锁
  13. threadLocal.set(value);
  14. // 启动一个线程,保证 key 在业务期间不过期
  15. if (isHappened.get()) {
  16. Thread thread = new Thread(new MyRunnable(jedisPool));
  17. thread.setDaemon(true);
  18. thread.start();
  19. isHappened.set(false);
  20. }
  21. return true;
  22. }
  23. return false;
  24. }
  25. @Override
  26. public void lock() {
  27. if (!tryLock()) {
  28. lock();
  29. }
  30. }
  31. @Override
  32. public void unlock() {
  33. // 这里使用 lua 脚本,保证原子性操作
  34. String script = "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" +
  35. " return redis.call(\"del\", KEYS[1])\n" +
  36. "else\n" +
  37. " return 0\n" +
  38. "end";
  39. Jedis jedis = jedisPool.getResource();
  40. Object eval = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(threadLocal.get()));
  41. if (Integer.parseInt(eval.toString()) == 0) {
  42. jedis.close();
  43. }
  44. }

ZooKeeper

ZooKeeper是一个集中式服务,用于维护配置信息,命名,提供分布式同步和提供组服务。

  • 数据发布与订阅(配置中心)
  • 负载均衡
  • 命名服务(Naming Service)
  • 分布式通知 / 协调
  • 集群管理与 Master 选举
  • 分布式锁
  • 分布式队列


分布式锁

核心是:临时节点 + 顺序节点 + Watch,每个服务器建立自己的临时节点,然后寻找所有顺序节点并排序,找出最小值,如果最小值就是自己维护的节点的话,就相当于得到了锁,如果不是的话,就阻塞,等待锁被释放,然后继续重复上述步骤

image.png

public class ZkLock implements Lock {

    private static final String LOCK_NAME = "/LOCK";

    private ThreadLocal<ZooKeeper> zk = new ThreadLocal<>();

    private ThreadLocal<String> currentNodeName = new ThreadLocal<>();

    public void init() {
        try {
            if (zk.get() == null)
                zk.set(new ZooKeeper("localhost:2181", 5000, null));
            Stat stat = zk.get().exists(LOCK_NAME, false);
            if (stat == null) {
                zk.get().create(LOCK_NAME, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean tryLock() {

        init();

        String nodeName = LOCK_NAME + "/zk_";
        try {
            // /LOCK/zk_1
            currentNodeName.set(zk.get().create(nodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
            List<String> children = zk.get().getChildren(LOCK_NAME, false);  // zk_1, zk_2, ...
            Collections.sort(children);
            String minNodeName = children.get(0);
            if (currentNodeName.get().equals(LOCK_NAME + "/" + minNodeName)) {
                return true;
            }
            int currentNodeIndex = children.indexOf(currentNodeName.get().substring(currentNodeName.get().lastIndexOf("/") + 1));
            String prevNodeName = children.get(currentNodeIndex - 1);

            CountDownLatch countDownLatch = new CountDownLatch(1);
            zk.get().exists(LOCK_NAME + "/" + prevNodeName, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if (Event.EventType.NodeDeleted.equals(watchedEvent.getType())) {
                        countDownLatch.countDown();
                        System.out.println("【" + Thread.currentThread().getName() + "】被唤醒...");
                    }
                }
            });
            System.out.println("【" + Thread.currentThread().getName() + "】等待锁...");
            countDownLatch.await();
            return true;
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            lock();
        }
    }

    @Override
    public void unlock() {
        try {
            zk.get().delete(currentNodeName.get(), -1);
            currentNodeName.set(null);
            zk.get().close();
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }


    @Override
    public Condition newCondition() {
        return null;
    }
}

分布式配置中心

核心思路:持久节点 + Watch

public class Config {

    private Map<String, String> cache = new HashMap<>();

    private CuratorFramework client;

    private static final String CONFIG_PREFIX = "/CONFIG";

    public Config() {
        this.client = CuratorFrameworkFactory.newClient("localhost:2181", new RetryNTimes(3, 5000));
        this.client.start();

        init();
    }

    public void init() {
        try {
            List<String> childrenNames = client.getChildren().forPath(CONFIG_PREFIX);
            for (String childrenName : childrenNames) {
                String value = new String(client.getData().forPath(CONFIG_PREFIX + "/" + childrenName));
                cache.put(childrenName, value);
            }

            PathChildrenCache watcher = new PathChildrenCache(client, CONFIG_PREFIX, true);
            watcher.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                    String path = event.getData().getPath();
                    if (path.startsWith(CONFIG_PREFIX)) {
                        String name = path.replace(CONFIG_PREFIX + "/", "");
                        if (PathChildrenCacheEvent.Type.CHILD_ADDED.equals(event.getType()) ||
                                PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(event.getType())) {
                            cache.put(name, new String(event.getData().getData()));
                        } else if (PathChildrenCacheEvent.Type.CHILD_REMOVED.equals(event.getType())) {
                            cache.remove(name);
                        }
                    }
                }
            });
            watcher.start();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void save(String name, String value) {
        try {
            String configFullPath = CONFIG_PREFIX + "/" + name;
            Stat stat = client.checkExists().forPath(configFullPath);
            if (stat == null) {
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(configFullPath, value.getBytes());
            } else {
                client.setData().forPath(configFullPath, value.getBytes());
            }
            cache.put(name, value);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String get(String name) {
        return cache.get(name);
    }

    public static void main(String[] args) throws InterruptedException {
        Config config = new Config();
        config.save("port", "8080");

        while (true) {
            System.out.println(config.get("port"));
            TimeUnit.SECONDS.sleep(3);
        }
    }

}