什么叫分布式锁呢? 比如说“进程1”在使用该资源的时候,会先去获得锁,“进程1”获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,“进程1”用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们就把这个分布式环境下的这个锁叫做分布式锁。

原生Zookeeper实现分布式锁案例
分布式锁实现
package com.zh.lock2;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;/*** author: zhanghui* project: big-data-learning* package: com.zh.lock2* filename: DistributedLock* date: 2022/3/19 12:54* description: 分布式锁案例*/public class DistributedLock {// zookeeper server列表private String connectString = "linux102:2181,linux103:2181,linux104:2181";// 超时时间private int sessionTimeout = 2000;private ZooKeeper zk;private String rootNode = "locks";private String subNode = "seq-";// 当前client等待的子节点private String waitPath;// ZooKeeper连接private CountDownLatch connectLatch = new CountDownLatch(1);// Zookeeper节点等待private CountDownLatch waitLatch = new CountDownLatch(1);// 当前Client创建的节点private String currentNode;// 和Zk服务建立连接,并创建根节点public DistributedLock() throws Exception {zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 连接建立时,打开latch,唤醒wait在该latch上的线程if (event.getState() == Event.KeeperState.SyncConnected) {connectLatch.countDown();}// 发生了waitPath的删除事件if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {waitLatch.countDown();}}});// 等待连接建立connectLatch.await();// 获取跟节点状态Stat stat = zk.exists("/" + rootNode, false);// 如果跟节点不存在,则创建根节点,根节点类型为永久节点if (stat == null) {System.out.println("根节点不存在!");zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}}// 加锁方法public void zkLock() {try {// 在根节点下创建临时顺序节点,返回值为创建的节点路径currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);// wait一小会,让结果更清晰一些Thread.sleep(10);// 注意,没有必要监听“/locks”的子节点的变化情况List<String> childrenNodes = zk.getChildren("/" + rootNode, false);// 列表中只有一个子节点,那肯定就是currentNode,说明client获得锁if (childrenNodes.size() == 1) {return;} else {// 对跟节点下的所有临时顺序接地那进行从小到大排序Collections.sort(childrenNodes);// 当前节点名称String thisNode = currentNode.substring(("/" + rootNode + "/").length());int index = childrenNodes.indexOf(thisNode);if (index == -1) {System.out.println("数据异常");} else if (index == 0) {// index == 0, 说明thisNode在列表中最小,当前client获得锁return;} else {// 获得排名比currentNode前1位的节点this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);// 在waitPath上注册监听器,当waitPath被删除时,Zookeeper会回调监听器的process方法zk.getData(waitPath, true, new Stat());// 进入等待锁状态waitLatch.await();return;}}} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}// 解锁方法public void zkUnlock() {try {zk.delete(this.currentNode, -1);} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}}
分布式锁测试
创建两个线程
package com.zh.lock2;/*** author: zhanghui* project: big-data-learning* package: com.zh.lock2* filename: DistributedLockTest* date: 2022/3/19 14:08* description: 分布式锁案例测试*/public class DistributedLockTest {public static void main(String[] args) throws Exception {// 创建分布式锁1final DistributedLock lock1 = new DistributedLock();// 创建分布式锁2final DistributedLock lock2 = new DistributedLock();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock1.zkLock();System.out.println("线程1获取锁");Thread.sleep(5 * 1000);lock1.zkUnlock();System.out.println("线程1释放锁");} catch (InterruptedException e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {lock2.zkLock();System.out.println("线程2获取锁");Thread.sleep(5 * 1000);lock2.zkUnlock();System.out.println("线程2释放锁");} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}
观察控制台变化
线程2获取锁
线程2释放锁
线程1获取锁
线程1释放锁
Curator框架实现分布式锁案例
原生的java API 开发存在的问题
- 会话连接是异步的,需要自己去处理。比如使用CountDownLatch
- Watch需要重复注册,不然就不能生效
- 开发的复杂性还是比较高的
- 不支持多节点删除和创建。需要自己去递归
Curator是一个专门解决分布式锁的框架,解决了原生Java API开发分布式遇到的问题
Curator案例实操
添加依赖
<!-- Curator依赖 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
代码实现
package com.zh.lock;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.CountDownLatch;
/**
* author: zhanghui
* project: big-data-learning
* package: com.zh.lock
* filename: CuratorLockTest
* date: 2022/3/19 14:22
* description:
*/
public class CuratorLockTest {
private String rootNode = "/locks";
// zookeeper server列表
private String connectSting = "linux102:2181,linux103:2181,linux104:2181";
// connection超时时间
private int connectTimeout = 2000;
// session超时时间
private int sessionTimeout = 2000;
public static void main(String[] args) {
new CuratorLockTest().test();
}
private void test() {
// 创建分布式锁1
final InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 创建分布式锁2
final InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock1.acquire();
System.out.println("线程1获得锁");
// 测试锁重入
lock1.acquire();
System.out.println("线程1再次获得锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("线程1释放锁");
lock1.release();
System.out.println("线程1再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock2.acquire();
System.out.println("线程2获得锁");
// 测试锁重入
lock2.acquire();
System.out.println("线程2再次获得锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("线程2释放锁");
lock2.release();
System.out.println("线程2再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
// 分布式锁初始化
public CuratorFramework getCuratorFramework() {
// 重试策略,初试时间3秒,重试3次
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
// 通过工厂创建Curator
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectSting)
.connectionTimeoutMs(connectTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(policy).build();
// 开启连接
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}
观察控制台变化
线程1获得锁
线程1再次获得锁
线程1释放锁
线程1再次释放锁
线程2获得锁
线程2再次获得锁
线程2释放锁
线程2再次释放锁
