对于单机运用我们可以使用JDK为我们提供的“锁”,对于分布式环境下众多服务实例可能存在争抢同一个字段,于是乎分布式锁便应孕而生。这里就以zk来实现分布式锁。
zk的监听机制
我们知道zk可以实时监听某个节点下数据或者子节点发生更改时会回调返回一个信息,现在我们就用java作为客户端来看看zk的watch机制。<br /> 首先导入我们需要的第三方包方便我们操作zk。
<!-- 引入第三方zk包 比官方apache好用 -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
java示例代码
/**
* @author heian
* @create 2020-03-16-11:12 下午
* @description
*/
public class ZkClientDemo {
public static void main(String[] args) {
//内部已经帮我们连接了
ZkClient zkClient = new ZkClient("192.168.0.102:2183");
zkClient.setZkSerializer(new MyZkSerialiZer());
//创建持久化节点 前提是/zk 目录必须存在
zkClient.create("/zk/hmm","hmm", CreateMode.PERSISTENT);
//对该节点下的子节点进行监听
zkClient.subscribeChildChanges("/zk/hmm", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> list) throws Exception {
// create /zk/app3/app3-son1 son1 多了个节点
System.out.println(parentPath + "子节点发生变化");
}
});
//对该节点下的数据进行监听
zkClient.subscribeDataChanges("/zk/hmm", new IZkDataListener() {
@Override
public void handleDataChange(String parentPath, Object o) throws Exception {
//set /zk/hmm newhmm
System.out.println(parentPath + "发生变化了,变化成为:" + o);
}
@Override
public void handleDataDeleted(String parentPath) throws Exception {
//delete /zk/hmm
System.out.println(parentPath + "被删除");
}
});
while (Thread.activeCount()>=1){
LockSupport.parkNanos(1000000000*100L);
}
}
}
/**
* @author heian
* @create 2020-03-21-2:06 下午
* @description
*/
public class MyZkSerialiZer implements ZkSerializer {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
String s = (String) data;
try {
return s.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes,"UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}
首先先连接我们指定的zk服务端,然后通过之前在服务端根节点已经创建的/zk节点下创建hmm节点,并存储hmm数据。并订阅/zk/hmm节点 下的子节点改动事件和本节点数据改动事件。<br /> 启动我们的服务,并对我们的/zk/hmm 节点携带的数据和其子节点进行监听,此时我们直接操作服务端引发数据改动来看下客户端控制台能否打印对应信息。
> create /zk/hmm/hmm2 hmm2
控制台: /zk/hmm子节点发生变化
> delete /zk/hmm/hmm2
控制台: /zk/hmm子节点发生变化
> set /zk/hmm newhmm
控制台: /zk/hmm发生变化了,变化成为:newhmm
> delete /zk/hmm
控制台: /zk/hmm 被删除
实现分布式锁方式一:
zk可以创建持久节点、临时节点、顺序节点三种节点类型,上面我们的Demo演示是创建了持久节点,而我们知道同一个目录下的节点是不允许重名的,这就有点向我们数据库的唯一索引一样,是天然就支持防并发。所以我们可以利用这个特性去做为一把“锁”。当时当我们服务持有节点的时候突然宕机,则锁也应该被删除,防止占着锁,导致某些数据被锁无法操作。所以临时节点就可以做到宕机时(客户端创建然后释放锁会)会自动删除,防止锁的占用。<br /> 所以可以利用 临时节点特性 + 节点不可重名 + watch机制实现
/**
* @author heian
* @create 2020-03-17-10:34 下午
* @description ZK分布式锁:实现原理 临时节点不可重名 + watch
* 缺点:会出现惊群效应,每一次删除其他每个app都会收到消息通知
* 备注:会在指定的一个父节点lockpath 只会创建一个节点
*/
public class ZKDistributeLock1 implements Lock {
private ZkClient zkClient;
private String lockPath;
public ZKDistributeLock1(String lockPath){
if (lockPath == null || lockPath.trim().equals("")){
throw new IllegalArgumentException("字符串不为空");
}
this.lockPath = lockPath;
zkClient = new ZkClient("192.168.0.102:2183");
zkClient.setZkSerializer(new MyZkSerialiZer());
}
@Override
public boolean tryLock() {
try {
//Ephemeral:短暂的 创建临时节点 不带数据
//TODO 此处应该判断ip是不是同一个,是的话就允许重入
zkClient.createEphemeral(lockPath);
}catch (Exception e){
//如果该节点被创建,则会抛出错误,则认为抢锁失败
System.out.println(Thread.currentThread().getName() + "此node节点已被创建");
return false;
}
System.out.println(Thread.currentThread().getName() + "获得分布式锁");
return true;
}
@Override
public void lock() {
if (!tryLock()){
try {
waitForLock();
lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void waitForLock() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
IZkDataListener zkListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) {
System.out.println(Thread.currentThread().getName() + "数据被修改");
}
@Override
public void handleDataDeleted(String s) {
//ZkClient-EventThread-13-192.168.0.102:2183 线程名字 (守护线程)
System.out.println(Thread.currentThread().getName() + " 节点被删除,唤醒另外一个线程");
//节点被删除,有人释放了锁,唤醒阻塞的线程 (网络原因临时节点会被删除)
countDownLatch.countDown();
}
};
zkClient.subscribeDataChanges(lockPath,zkListener);
//zkClient阻塞释放后则取消订阅
System.out.println(Thread.currentThread().getName() + "阻塞");
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "阻塞被打断");
zkClient.unsubscribeDataChanges(lockPath,zkListener);
}
@Override
public void unlock() {
zkClient.delete(lockPath);
System.out.println(Thread.currentThread().getName() + "释放分布式锁");
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition() {
return null;
}
}
public static void main(String[] args) throws InterruptedException {
//商品 id
String productId = "abcd";
ZKDistributeLock1 lock = new ZKDistributeLock1("/zk/"+productId);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "执行");
lock.lock();
LockSupport.parkNanos(1000000000*10L);//类比业务操作去操作被锁住的数
lock.unlock();
}).start();
TimeUnit.SECONDS.sleep(1);
//模拟 另外一个服务
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "执行");
lock.lock();
lock.unlock();
}).start();
}
----控制台-----
Thread-1执行
Thread-1获得分布式锁
Thread-2执行
Thread-2此node节点已被创建
Thread-2阻塞
Thread-1释放分布式锁
ZkClient-EventThread-13-192.168.0.102:2183 节点被删除,唤醒另外一个线程
Thread-2阻塞被打断
Thread-2获得分布式锁
Thread-2释放分布式锁
但是上面的实现方式存在惊群效应,就是占着锁的服务,一旦释放锁会通知到各个抢锁的服务造成网络波动。上述代码没区分服务实例,需要自己手动加上重入锁的机制,这里实现也比较简单,就不做演示。
实现分布式锁方式二:
仿照银行排队叫号系统,当众多服务去争抢资源时每个服务都会根据先来后到拿到排队号码(从小到大),这就类似我们的顺序节点,在同一个临时节点下创建顺序节点,然后每个服务抢锁时判断该节点下的所有临时顺序节点的最小节点是不是与自己所在节点相同,一样则获得锁成功,否则陷入阻塞,并且订阅比我小的上一个节点的节点变化。以此消除惊群效应
/**
* @author heian
* @create 2020-03-17-10:34 下午
* @description ZK分布式锁:实现原理 取号+最小号+ watch 类似银行取排队小票
* 备注:临时节点死掉了会释放 会在指定的一个父节点lockpath 下很多临时顺序的临时子节点
*/
public class ZKDistributeLock2 implements Lock {
//在父节点下创建临时子节点 记住自己当前多少号,关注比自己小的那个号
private ZkClient zkClient;
private String lockPath;
private ThreadLocal<String> currentPath = new ThreadLocal<>();//当前序号
private ThreadLocal<String> beforePath = new ThreadLocal<>();//我前面的一个序号
private ThreadLocal<Integer> count = new ThreadLocal<>();
public ZKDistributeLock2(String lockPath){
if (lockPath == null || lockPath.trim().equals("")){
throw new IllegalArgumentException("字符串不为空");
}
this.lockPath = lockPath;
String ip4 = "";
try {
ip4 = Inet4Address.getLocalHost().getHostAddress();
}catch (Exception e){
System.out.println("ip获取异常:" + ip4);
}
zkClient = new ZkClient(ip4 + ":2183");
zkClient.setZkSerializer(new MyZkSerialiZer());
if (!this.zkClient.exists(lockPath)){
try {
//创建一个持久节点
this.zkClient.createPersistent(lockPath,true);
}catch (Exception e){
System.out.println("异常:节点已存在");
}
}
}
@Override
public boolean tryLock() {
try {
if (this.currentPath.get() == null || !zkClient.exists(this.currentPath.get())){
System.out.println(Thread.currentThread().getName() + "尝试获取分布式锁" + this.currentPath.get());
String path = zkClient.createEphemeralSequential(lockPath+"/","locked");//创建临时顺序节点
currentPath.set(path);
count.set(0);
}
//获得所有的子节点
List<String> children = this.zkClient.getChildren(lockPath);
Collections.sort(children);
//判断当前节点是否是最小的 如果不是则说明我前面还有人
if (currentPath.get().equals(lockPath + "/" + children.get(0))){
//重入次数+1
count.set(count.get() +1);
System.out.println(Thread.currentThread().getName() + "获得分布式锁");
return true;
}
//如果不是,则我把排在我前面的节点找出
for (int i = 0; i < children.size(); i++) {
if (currentPath.get().substring(lockPath.length()+1).equals(children.get(i))){
beforePath.set(lockPath + "/" + children.get(i-1));
System.out.println(Thread.currentThread().getName() + "找到前一个节点" + beforePath.get());
}
}
return false;
}catch (Exception e){
//如果该节点被创建,则会抛出错误,则认为抢锁失败
System.out.println(Thread.currentThread().getName() + "此node节点已被创建");
e.printStackTrace();
return false;
}
}
@Override
public void lock() {
if (!tryLock()){
try {
waitForLock();
lock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName() + "释放分布式锁");
if (count.get() > 1){
//重入次数-1
count.set(count.get() -1);
}
if (currentPath.get() != null){
zkClient.delete(currentPath.get());
currentPath.set(null);
count.set(0);
}
}
private void waitForLock() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
IZkDataListener zkListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) {
//do nothing
System.out.println(Thread.currentThread().getName() + "数据被修改");
}
@Override
public void handleDataDeleted(String s) {
System.out.println(Thread.currentThread().getName() + "节点被删除,唤醒另外一个线程");
//节点被删除,有人释放了锁,唤醒阻塞的线程 (网络原因临时节点会被删除)
countDownLatch.countDown();
}
};
zkClient.subscribeDataChanges(beforePath.get(),zkListener);
if (this.zkClient.exists(this.beforePath.get())){
countDownLatch.await();
}
zkClient.unsubscribeDataChanges(beforePath.get(),zkListener);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
public static void main(String[] args) throws InterruptedException {
//商品 id
String productId = "abcd";
ZKDistributeLock2 lock = new ZKDistributeLock2("/zk/"+productId);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "执行");
lock.lock();
LockSupport.parkNanos(1000000000*10L);//类比业务操作去操作被锁住的数
lock.unlock();
}).start();
TimeUnit.SECONDS.sleep(1);
//模拟 另外一个服务
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "执行");
lock.lock();
lock.unlock();
}).start();
}
Thread-1执行
Thread-1尝试获取分布式锁null
Thread-1获得分布式锁
Thread-2执行
Thread-2尝试获取分布式锁null
Thread-2找到前一个节点/zk/abcd/0000000013
Thread-1释放分布式锁
ZkClient-EventThread-13-192.168.0.102:2183节点被删除,唤醒另外一个线程
Thread-2获得分布式锁
Thread-2释放分布式锁