talk is cheap, show me the code
ZK实现分布式锁简介
zk分布式锁,其实可以做的比较简单,本质上就是某个节点尝试创建临时znode,此时创建成功了就获取了这个锁,这个时候,别的客户端来创建锁会失败,只能注册一个监听器来监听这个锁,释放锁就是删除这个znode,一旦释放掉就会通知客户端,然后有一个等待着的客户端就可以重新加锁。
zk是以paxos算法为基础的分布式应用程序协调服务,zk的数据节点和文件目录类似,所以我们可以用此特性实现分布式锁。
我们以某个资源为目录,然后这个目录下面的节点就是我们需要获取锁的客户端,未获取到锁的客户端注册需要注册watcher到上一个客户端,可以用下图表示:
/lock是我们用于加锁的目录,/resource_name是我们锁定的资源,其下面的节点按照我们加锁的顺序排列。
Curator
Curator封装了ZK底层的api,使我们更加容易方便的对zk进行操作,并且封装了他的分布式锁功能,这样我们就不需要自己的实现了。
Curator实现了可重入锁(InterProcessMutex)也实现了不可重入锁(InterProcessSemaphoreMutex)在可重入锁中还实现了读写锁。
InterProcessMutex
InterProcessMutex是Curator实现的可重入锁,我们可以通过下面一段代码实现可重入锁:
我们利用acquire加锁,利用release释放锁
加锁流程如下:
- 首先进行可重入的判定:这里的可重入锁记录在ConcurrentMap
threadData这个map里,如果threadData.get(currentThread)是有值的那么就证明是可重入锁,然后记录就会加1, - 然后在资源目录下创建一个节点,比如这里创建一个/00000000002这个节点,这个节点需要设置为EPHEMERAL_SEQUENTIAL也就是临时节点并且有序。
- 获取当前目录下所有子节点,判断自己的节点是否位于子节点第一个。
- 如果是第一个,则获取到锁,可以返回
- 如果不是第一个,则证明前面已经有人获取到锁了,那么需要获取自己节点的前一个节点。/000002的前一个节点是/00000001,我们获取到这个节点之后,在上面注册一个watcher(这里的watcher其实调用的是notifyAll,用来解除阻塞)
- object.wait(timeout)或object.wait()进入阻塞。和上一步的watcher相对应
解锁流程如下:
- 首先进行可重入锁的判定:如果有可重入锁只需要次数减1即可,减1之后加收次数为0的话继续下面步骤,否则返回0
- 删除当前节点
- 删除threadDataMap里面的可重入锁的数据。
读写锁
Curator 提供了读写锁,其实现类是 InterProcessReadWriteLock,这里的每个节点都会加上前缀:
privatestaticfinalString READLOCKNAME = “READ“;
privatestaticfinalString WRITE_LOCK_NAME = “__WRIT“;
根据不同的前缀区分是读锁还是写锁,对于读锁,如果发现前面有写锁,那么需要将 Watcher 注册到和自己最近的写锁。写锁的逻辑和我们之前分析可重入锁加锁的流程保持不变。
锁超时
ZooKeeper 不需要配置锁超时,由于我们设置节点是临时节点,我们的每个机器维护着一个 ZK 的 Session,通过这个 Session,ZK 可以判断机器是否宕机。
如果我们的机器挂掉的话,那么这个临时节点对应的就会被删除,所以我们不需要关心锁超时。
ZK小结
- 优点:ZK 可以不需要关心锁超时时间,实现起来有现成的第三方包,比较方便,并且支持读写锁,ZK 获取锁会按照加锁的顺序,所以其是公平锁。对于高可用利用 ZK 集群进行保证。
- 缺点:ZK 需要额外维护,增加维护成本(毕竟第三方依赖),性能和 MySQL 相差不大,依然比较差。并且需要开发人员了解 ZK 是什么。
下面是zk实现分布式锁的一个demo,用于学习理解
import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/*** ZooKeeperSession**/public class ZooKeeperSession {private static CountDownLatch connectedSemaphore = new CountDownLatch(1);private ZooKeeper zookeeper;private CountDownLatch latch;public ZooKeeperSession() {try {this.zookeeper = new ZooKeeper("192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181",50000,(Watcher) new ZooKeeperWatcher());try {connectedSemaphore.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("ZooKeeper session established......");} catch (Exception e) {e.printStackTrace();}}/*** 获取分布式锁** @param productId*/public Boolean acquireDistributedLock(Long productId) {String path = "/product-lock-" + productId;try {zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);return true;} catch (Exception e) {while (true) {try {// 相当于是给node注册一个监听器,去看看这个监听器是否存在Stat stat = this.zookeeper.exists(path, true);long waitTime = 50000L;if (stat != null) {this.latch = new CountDownLatch(1);this.latch.await(waitTime, TimeUnit.MILLISECONDS);this.latch = null;}zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);return true;} catch (Exception ee) {continue;}}}}/*** 释放掉一个分布式锁** @param productId*/public void releaseDistributedLock(Long productId) {String path = "/product-lock-" + productId;try {zookeeper.delete(path, -1);System.out.println("release the lock for product[id=" + productId + "]......");} catch (Exception e) {e.printStackTrace();}}/*** 建立zk session的watcher** @author bingo* @since 2018/11/29*/private class ZooKeeperWatcher implements Watcher {public void process(WatchedEvent event) {System.out.println("Receive watched event: " + event.getState());if (Event.KeeperState.SyncConnected == event.getState()) {connectedSemaphore.countDown();}if (connectedSemaphore != null) {connectedSemaphore.countDown();}}}/*** 封装单例的静态内部类** @author bingo* @since 2018/11/29*/private static class Singleton {private static ZooKeeperSession instance;static {instance = new ZooKeeperSession();}public static ZooKeeperSession getInstance() {return instance;}}/*** 获取单例** @return*/public static ZooKeeperSession getInstance() {return Singleton.getInstance();}/*** 初始化单例的便捷方法*/public static void init() {getInstance();}}
redis 分布式锁和 zk 分布式锁的对比
- redis 分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能。
- zk 分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。
另外一点就是,如果是 redis 获取锁的那个客户端 出现 bug 挂了,那么只能等待超时时间之后才能释放锁;而 zk 的话,因为创建的是临时 znode,只要客户端挂了,znode 就没了,此时就自动释放锁。
redis 分布式锁大家没发现好麻烦吗?遍历上锁,计算时间等等……zk 的分布式锁语义清晰实现简单。
所以先不分析太多的东西,就说这两点,我个人实践认为 zk 的分布式锁比 redis 的分布式锁牢靠、而且模型简单易用。
