分布式锁是控制分布式系统之间同步访问共享资源的一种方式。
下面介绍 zookeeper 如何实现分布式锁,讲解排他锁和共享锁两类分布式锁。
排他锁
排他锁(Exclusive Locks),又被称为写锁或独占锁,如果事务T1对数据对象O1加上排他锁,那么整个加锁期间,只允许事务T1对O1进行读取和更新操作,其他任何事务都不能进行读或写。
定义锁:
/exclusive_lock/lock
实现方式:
利用 zookeeper 的同级节点的唯一性特性,在需要获取排他锁时,所有的客户端试图通过调用 create() 接口,在 /exclusive_lock 节点下创建临时子节点 /exclusive_lock/lock,最终只有一个客户端能创建成功,那么此客户端就获得了分布式锁。同时,所有没有获取到锁的客户端可以在 /exclusive_lock 节点上注册一个子节点变更的 watcher 监听事件,以便重新争取获得锁。
1.Lock接口
public interface ZkLock {boolean lock();void unlock();}
2.分布式排他锁实现
/*** 分布式排他锁* 支持重入*/public class MutexLock implements ZkLock {private static final String MUTEX_LOCK_ROOT = "/mutex-lock-root";//创建的节点pathprivate final String lockName;// 存放线程重入锁private final ConcurrentMap<Thread, LockData> threadLockDataMap = Maps.newConcurrentMap();public MutexLock(String lockName) {this.lockName = PathUtils.validatePath(MUTEX_LOCK_ROOT + "-" + lockName);}//线程锁的数据private static class LockData {final Thread owningThread;final AtomicInteger lockCount;private LockData(Thread owningThread) {this.owningThread = owningThread;lockCount = new AtomicInteger(1);}}@Overridepublic boolean lock() {boolean result = true;Thread currentThread = Thread.currentThread();//获取线程锁LockData lockData = threadLockDataMap.get(currentThread);if (null != lockData) {//重入次数加一int i = lockData.lockCount.incrementAndGet();TestLock.printLog(currentThread.getName(), "加锁 ,重入次数", i);return result;}try {//创建zookeeper锁String lockPath = innerLock(currentThread.getName());if (null != lockPath) {lockData = new LockData(currentThread);//将当前线程放入map中存放TestLock.printLog(currentThread.getName(), "加锁 ,重入次数", 1);threadLockDataMap.put(currentThread, lockData);}} catch (Exception e) {e.printStackTrace();result = false;}return result;}@Overridepublic void unlock() {Thread currentThread = Thread.currentThread();//获取锁数据LockData lockData = threadLockDataMap.get(currentThread);if (null == lockData) {return;}//重入次数减一int count = lockData.lockCount.decrementAndGet();TestLock.printLog(currentThread.getName(), "解锁 ,重入次数", count);//重入次数依然大于0,返回等待继续解锁if (0 < count) {return;}//重入次数减为0,移除zookeeper节点,解锁threadLockDataMap.remove(currentThread);//允许重试次ZooKeeper zooKeeper = LockUtils.newZookeeper();int retry = 5;while (retry-- > 0) {try {zooKeeper.delete(lockName, -1);retry = 0;} catch (InterruptedException | KeeperException e) {e.printStackTrace();}}}private String innerLock(String threadName) throws Exception {ZooKeeper zooKeeper = LockUtils.newZookeeper();CountDownLatch countDownLatch = new CountDownLatch(1);int i = 0;TestLock.printLog(threadName, "尝试上锁 ,次数", ++i);//创建zookeeper节点String result = createPath(zooKeeper, countDownLatch, threadName, i);//await等待上锁成功,或者锁释放countDownLatch.await();//如果上锁失败,会一直尝试上锁while (result == null) {TestLock.printLog(threadName, "尝试上锁 ,次数", ++i);result = createPath(zooKeeper, countDownLatch, threadName, i);// countDownLatch.await();}return result;}private String createPath(ZooKeeper zooKeeper, CountDownLatch countDownLatch, String threadName, int count) {String resultPath = null;try {// 创建临时节点resultPath = zooKeeper.create(lockName, "".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);TestLock.printLog(threadName, "竞争锁", "成功");countDownLatch.countDown();} catch (KeeperException.NodeExistsException e) {// 节点存在(锁获取失败),创建监听节点状态TestLock.printLog(threadName, "竞争锁", "失败, 上锁次数" + count);watchPath(zooKeeper, countDownLatch, threadName, count);} catch (Exception e) {e.printStackTrace();}return resultPath;}private void watchPath(ZooKeeper zooKeeper, CountDownLatch countDownLatch, String threadName, int count) {try {Stat exists = zooKeeper.exists(lockName, event -> {if (Watcher.Event.EventType.NodeDeleted == event.getType()) {TestLock.printLog(threadName, "监听锁释放", "加入竞争, 序次 : " + count);countDownLatch.countDown();}});if (null == exists) {//节点不存在countDownLatch.countDown();}} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}
3.单例zookeeper连接类
public class LockUtils {private static ZooKeeper zooKeeper;private LockUtils() {}public static ZooKeeper newZookeeper() {if (null == zooKeeper) {init();}return zooKeeper;}private static void init() {if (null != zooKeeper) {return;}synchronized (LockUtils.class) {if (null != zooKeeper) {return;}try {CountDownLatch countDownLatch = new CountDownLatch(1);zooKeeper = new ZooKeeper("localhost:2181", 5000, event -> {if (Watcher.Event.KeeperState.SyncConnected == event.getState()) {countDownLatch.countDown();}});countDownLatch.await();System.out.println("===zookeeper init success===");} catch (Exception e) {e.printStackTrace();}}}}
4.多线程测试
public class TestLock {public static void printLog(String threadName, Object tag, Object avg) {SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss SSS");String date = dateFormat.format(new Date());System.out.println(date + ": 线程:" + threadName + " ," + tag + " : " + avg);}public static void main(String[] args) {final ZkLock lock = new MutexLock("Test");CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 3; i++) {int finalI = i;new Thread(() -> {try {printLog("Thread " + finalI , "Waiting", "已就绪");countDownLatch.await();if (lock.lock()) {Thread.sleep(10);if (lock.lock()) {Thread.sleep(20);lock.unlock();}lock.unlock();}} catch (InterruptedException e) {e.printStackTrace();}}).start();}countDownLatch.countDown();printLog("Main", "", "已就绪");}
5.观察结果
17:57:02 615: 线程:Thread 0 ,Waiting : 已就绪17:57:02 615: 线程:Main , : 已就绪17:57:02 615: 线程:Thread 1 ,Waiting : 已就绪17:57:02 615: 线程:Thread 2 ,Waiting : 已就绪===zookeeper init success===17:57:03 580: 线程:Thread-2 ,尝试上锁 ,次数 : 117:57:03 580: 线程:Thread-1 ,尝试上锁 ,次数 : 117:57:03 580: 线程:Thread-0 ,尝试上锁 ,次数 : 117:57:03 602: 线程:Thread-2 ,竞争锁 : 成功17:57:03 603: 线程:Thread-2 ,加锁 ,重入次数 : 117:57:03 614: 线程:Thread-2 ,加锁 ,重入次数 : 217:57:03 616: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数117:57:03 616: 线程:Thread-1 ,竞争锁 : 失败, 上锁次数117:57:03 646: 线程:Thread-2 ,解锁 ,重入次数 : 117:57:03 646: 线程:Thread-2 ,解锁 ,重入次数 : 017:57:03 653: 线程:Thread-1 ,监听锁释放 : 加入竞争, 序次 : 117:57:03 653: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 117:57:03 653: 线程:Thread-1 ,尝试上锁 ,次数 : 217:57:03 654: 线程:Thread-0 ,尝试上锁 ,次数 : 217:57:03 657: 线程:Thread-1 ,竞争锁 : 成功17:57:03 658: 线程:Thread-1 ,加锁 ,重入次数 : 117:57:03 659: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数217:57:03 661: 线程:Thread-0 ,尝试上锁 ,次数 : 317:57:03 666: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数317:57:03 668: 线程:Thread-0 ,尝试上锁 ,次数 : 417:57:03 671: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数417:57:03 673: 线程:Thread-0 ,尝试上锁 ,次数 : 517:57:03 676: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数517:57:03 678: 线程:Thread-1 ,加锁 ,重入次数 : 217:57:03 678: 线程:Thread-0 ,尝试上锁 ,次数 : 617:57:03 682: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数617:57:03 685: 线程:Thread-0 ,尝试上锁 ,次数 : 717:57:03 688: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数717:57:03 690: 线程:Thread-0 ,尝试上锁 ,次数 : 817:57:03 694: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数817:57:03 696: 线程:Thread-0 ,尝试上锁 ,次数 : 917:57:03 700: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数917:57:03 702: 线程:Thread-0 ,尝试上锁 ,次数 : 1017:57:03 705: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数1017:57:03 708: 线程:Thread-0 ,尝试上锁 ,次数 : 1117:57:03 710: 线程:Thread-1 ,解锁 ,重入次数 : 117:57:03 710: 线程:Thread-1 ,解锁 ,重入次数 : 017:57:03 712: 线程:Thread-0 ,竞争锁 : 失败, 上锁次数1117:57:03 714: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 417:57:03 714: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 617:57:03 714: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 717:57:03 715: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 917:57:03 715: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 317:57:03 715: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 817:57:03 715: 线程:Thread-0 ,尝试上锁 ,次数 : 1217:57:03 715: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 217:57:03 716: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 1017:57:03 716: 线程:Thread-0 ,监听锁释放 : 加入竞争, 序次 : 517:57:03 719: 线程:Thread-0 ,竞争锁 : 成功17:57:03 719: 线程:Thread-0 ,加锁 ,重入次数 : 117:57:03 741: 线程:Thread-0 ,加锁 ,重入次数 : 217:57:03 772: 线程:Thread-0 ,解锁 ,重入次数 : 117:57:03 772: 线程:Thread-0 ,解锁 ,重入次数 : 0
共享锁
共享锁(Shared Locks),又称读锁。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁,直到该数据对象上的所有共享锁都释放。
定义锁:
/shared_lock/[hostname]-请求类型W/R-序号
实现方式:
1、客户端调用 create 方法创建类似定义锁方式的临时顺序节点。
2、客户端调用 getChildren 接口来获取所有已创建的子节点列表。
3、判断是否获得锁,对于读请求如果所有比自己小的子节点都是读请求或者没有比自己序号小的子节点,表明已经成功获取共享锁,同时开始执行度逻辑。对于写请求,如果自己不是序号最小的子节点,那么就进入等待。
4、如果没有获取到共享锁,读请求向比自己序号小的最后一个写请求节点注册 watcher 监听,写请求向比自己序号小的最后一个节点注册watcher 监听。
实际开发过程中,可以 curator 工具包封装的API帮助我们实现分布式锁。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>x.x.x</version>
</dependency>
curator 的几种锁方案 :
- 1、InterProcessMutex:分布式可重入排它锁
- 2、InterProcessSemaphoreMutex:分布式排它锁
- 3、InterProcessReadWriteLock:分布式读写锁
下面例子模拟 50 个线程使用重入排它锁 InterProcessMutex 同时争抢锁:
public class InterprocessLock {
public static void main(String[] args) {
CuratorFramework zkClient = getZkClient();
String lockPath = "/lock";
InterProcessMutex lock = new InterProcessMutex(zkClient, lockPath);
//模拟50个线程抢锁
for (int i = 0; i < 50; i++) {
new Thread(new TestThread(i, lock)).start();
}
}
static class TestThread implements Runnable {
private Integer threadFlag;
private InterProcessMutex lock;
public TestThread(Integer threadFlag, InterProcessMutex lock) {
this.threadFlag = threadFlag;
this.lock = lock;
}
@Override
public void run() {
try {
lock.acquire();
System.out.println("第"+threadFlag+"线程获取到了锁");
//等到1秒后释放锁
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "192.168.3.39:2181";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}
控制台每间隔一秒钟输出一条记录:
下载源码
