1、为什么需要分布式锁
在多线程并发的情况下,我们用锁来保证一些代码逻辑在同一时间内只能由一个线程访问。比如我们你是用Java中的synchronized关键字或者ReentrantLock类等。
这样面临一个问题,这样加了锁后,可以保证同一个JVM进程内的多个线程同步执行,但是如果是在分布式的环境下,这样子就会有问题,因为这样加锁,只能在加自己的JVM内,如果请求多了分布式集群中的别的节点,那么是没有锁的。
这种情况可以通过分布式锁来解决。
分布式锁可以通过Redis或者Zookeeper来实现,我们来学习Redis分布式锁。
2、redis实现分布式锁及面临的问题
2.1 实现分布式锁的简单逻辑
实现一个分布式锁,首先要有这三个部分:
- 加锁:可以通过redis的setnx命令来加锁。setnx命令的key是锁的唯一标识,可以根据业务来决定命名,value我们设置成一个随机的UUID或者其他。
此时加锁为:
setnx(key, 1)
当一个线程执行setnx返回了1,说明key原本不存在,该线程得到了锁;如果一个线程执行setnx返回0,说明key已经存在,该线程抢锁失败。
- 解锁:有加锁自然就要对应着解锁。当获得锁的线程执行完任务,需要释放锁,让其他进程进入。可以通过del指令:
del(key)
释放了锁,其他线程就可以继续通过setnx’命令来获得锁。
- 设置锁超时:给加的锁设置超时时间。这一步是很有必要的,否则,当一个得到锁的线程在执行任务中挂掉,没有显示地释放锁,那么资源将会被永远锁住,其他线程无法进来。
如果是执行时因为异常崩掉了,可以把解锁方法在finally代码块中,但是如果是运行时,机器断电,程序突然闪断的情况,finally代码块中也无法执行释放锁的逻辑。
因此,setnx的key必须设置一个超时时间,保证就算没有显示地释放锁,锁在一定时间后也会自动释放。setnx不支持超时参数,所以需要额外的指令:
expire(key, 30)
由加锁、解锁、设置锁超时,我们可以得出,初始版本的分布式锁的代码:
if(setnx(key,1) == 1){
expire(key,30)
try {
// TODO 执行业务逻辑
} finally {
del(key)
}
}
2.2 问题1:setnx和expire的非原子性
设想一个极端的场景,如果一个线程A执行setnx成功得到了锁,但是,还没有直行道expire指令设置锁超时时间时,此时线程A所在的服务挂掉了,那么,锁是成功设置了,但是却没有成功的设置过期时间,此时锁基本上就不会过期,别的线程就没法获得锁了。
如何解决呢:
redis在2.6.12以上版本为set指令增加了可选参数,这样在设置锁的时候,就可以设置超时时间:
set(key 1 EX 30 NX)
这里面四个参数分别代表key、value、设置过期时间、过期时间30秒、仅在key不存在时才设置key。
2.3 问题2:删除锁误删了别人的锁
在2.2中,解决了设置锁和设置超时时间的非原子性,这时又面临一个问题:
一开始我们给给key设置的值是一个固定值。如果某个线程成功的得到了锁,锁的超时间是30秒,如果某些原因导致线程A执行很慢,过了三十秒还没执行完,锁自己过期了,然后业务执行完毕,执行删除锁的逻辑,这时有可能把别的线程正在持有的锁给删除调掉了。
如何解决呢:
可以把在占锁时,把值指定为uuid或者线程名,在删除锁时,先判断是不是自己的锁,是自己的锁再删除。
加锁:
String threadId = Thread.currentThread().getId()
set(key,threadId ,30,NX)
解锁:
if(threadId .equals(redisClient.get(key))){
del(key)
}
注:我觉得还是要用UUID,线程名是很有可能重复的
2.4 问题3:验证锁是否是自己的跟删除锁的非原子性
在2.3的基础上,还会面临新的问题:
判断锁是否是自己线程的锁,跟释放锁是否两个独立操作,不是原子性的。
这时会面临一个问题,假设判断锁是否是自己的逻辑,刚从redis判断是自己的,这时把判断为是,从redis返回给应用程序的过程中,锁已经到了超时时间而过期了,然后另一个线程抢占了锁,上了锁。这种情况下,虽然判断了是自己的锁,但是实际上,删除的是别的线程占用的锁。
这时可以通过Lua脚本实现,Lua脚本查看一下具体细节。
2.5 锁的自动续期问题
我们给key设置了自动过期时间,但是如果线程A获得了锁,却超出了过期时间还没有执行完代码逻辑,怎么办? 就需要给锁进行续期。
这里有两种处理方式:
- 让获得锁的线程开启一个守护线程,用来给快要过期的锁续航;
- 使用Redisson锁。
这个一步一步解决的问题,其实也就是redis实现分布式锁的原理。
3、Redisson
3.1 Redisson是什么
Redission是架设在Redis基础上的一个Java驻内存数据网格。它提供了一系列具有分布式特性的常用工具类。还是先了可重入锁(ReentrantLock)、公平锁(FairLock)、联锁(MultiLock)、红锁(RedLock)、读写锁(ReadWriteLock)等。
3.2 Redisson与SpringBoot整合
导入依赖:
<!-- redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.5</version>
</dependency>
编写配置文件:
spring:
redis:
host: 127.0.0.1
port: 6379
password:
# redisson配置文件路径
redisson:
file: classpath:redisson.yml
在resource下创建redisson.yml
# 单节点配置
singleServerConfig:
# 连接空闲超时,单位:毫秒
idleConnectionTimeout: 10000
# 连接超时,单位:毫秒
connectTimeout: 10000
# 命令等待超时,单位:毫秒
timeout: 3000
# 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
# 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1500
# 密码
password:
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
# 客户端名称
clientName: myredis
# 节点地址
address: redis://127.0.0.1:6379
# 发布和订阅连接的最小空闲连接数
subscriptionConnectionMinimumIdleSize: 1
# 发布和订阅连接池大小
subscriptionConnectionPoolSize: 50
# 最小空闲连接数
connectionMinimumIdleSize: 32
# 连接池大小
connectionPoolSize: 64
# 数据库编号
database: 0
# DNS监测时间间隔,单位:毫秒
dnsMonitoringInterval: 5000
# 线程池数量,默认值: 当前处理核数量 * 2
#threads: 0
# Netty线程池数量,默认值: 当前处理核数量 * 2
#nettyThreads: 0
# 编码
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode : "NIO"
添加配置类:
package com.yuanhai.sbdemo.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
/**
* 本类说明:
*/
@Configuration
public class MyRedissonConfig {
/**
* 所有对Redisson的使用都是通过RedissonClient对象
* @return
* @throws IOException
*/
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient(@Value("${spring.redis.host}") String url) throws IOException {
//1、创建配置
//Redis url should start with redis:// or rediss://
Config config = new Config();
config.useSingleServer().setAddress("redis://"+url+":6379");
//2、根据Config创建出RedissonClient示例
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
3.3 简单测试使用
package com.yuanhai.sbdemo.controller;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 本类说明:
*
*/
@RequestMapping("/redislock")
@RestController
public class RedisLockController {
@Autowired
private RedissonClient redissonClient;
@GetMapping("/lock1")
public String hello() {
// 通过RedissonClient实例调用getLock()方法获取一把锁,
// 只要锁的名字一样,就是同一把锁
RLock myLock = redissonClient.getLock("my-lock");
// 2.执行加锁方法。 用获取到的锁执行lock()方法即可加锁
myLock.lock(); // 这个方法是阻塞式等待,加不到锁会一直等待,知道加上锁
try {
System.out.println("加锁成功,执行业务..."+Thread.currentThread().getId());
Thread.sleep(5000);
}catch (Exception e) {
}finally {
// 3.解锁方法
System.out.println("释放锁..."+Thread.currentThread().getId());
myLock.unlock();
}
return "hello";
}
}
3.4 3.3中的一些细节
在3.3的例子中,假设解锁方法因为服务器宕机等问题没有解锁,也不会出现死锁问题。
这是因为Redisson的lock有如下特点:
- 加锁时,默认加的锁都是30s的时间;
- 锁会自动续期,假设业务执行时间超长,运行期间会自动给锁续上新的30秒,不用担心业务执行时间过长,锁会被自动删掉。
- 加锁业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁也会在三十秒后自动删除。
3.5 Redisson的看门狗机制
Redisson的看门狗WatchDog解决死锁问题。
我们先把例子改为自己指定锁的过期时间:
@Autowired
RedissonClient redisson;
@ResponseBody
@GetMapping("/hello")
public String hello() {
// 通过RedissonClient实例调用getLock()方法获取一把锁,
// 只要锁的名字一样,就是同一把锁
RLock myLock = redisson.getLock("my-lock");
// 2.执行加锁方法。 用获取到的锁执行lock()方法即可加锁
// myLock.lock(); // 这个方法是阻塞式等待,加不到锁会一直等待,直到加上锁,默认加的锁都是30s时间。
//1)、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉
//2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
myLock.lock(10, TimeUnit.SECONDS); //10秒自动解锁,自动解锁时间一定要大于业务的执行时间。
try {
System.out.println("加锁成功,执行业务..."+Thread.currentThread().getId());
Thread.sleep(30000);
}catch (Exception e) {
}finally {
// 3.解锁方法 假设解锁代码没有运行,redisson会不会出现死锁
System.out.println("释放锁..."+Thread.currentThread().getId());
myLock.unlock();
}
return "hello";
}
这里要知道,我们自己设置的自动解锁时间一定要大于业务的执行时间,而且,如果采用自己设置超时时间的方式加锁,那么在锁的时间到了以后,不会自动续期。
对比下自己传入锁超时时间和不指定锁超时时间:
- 如果我们自己传入锁的超时时间:那么redisson底层就是发送redis一个lua执行脚本,进行占锁,默认超时就是我们指定的时间,超时时间到了之后,如果业务没执行完不会续期。
- 如果我们不指定锁的超时时间:那么会默认使用看门狗的默认时间LockWatchdogTimeout,也就是30秒。只要占锁成功,就会启动一个定时任务,这个定时任务用来重新给锁设置过期时间,新的过期时间就是看门狗的默认时间。定时任务每隔十秒就会自动续期,续期成三十秒。
实际开发中,如何使用:
实际开发,还是推荐明显的指定超时时间。
lock.lock(30,TimeUnit.SECONDS);
这样省掉了整个续期操作,手动解锁。
只不过这里自己把指定的时间设的大一点,比如就手动设置成三十秒,用以防止业务执行时间过长而超时。业务执行成功后,就手动解锁。
因为如果业务时间过长,说明中间出问题了,完蛋了。
即:加锁时设置超时时间,业务成功后手动解锁。因为如果业务很长时间都没执行完,那么考虑优化业务性能吧。
3.6 Redisson中的锁的种类
Redisson中的锁与JUC中的锁基本一致,用法也基本一致。但是能够分布式上锁解锁。
- 可重入锁:3.3例子中RedissonClient实例调用getLock方法,获取的是一个可重入锁。
可重入锁:一个线程在执行一个带锁的方法,改方法中又调用了另一个需要相同锁的方法,则该线程可以直接指向调用的方法,无需重新获得锁。
- 公平锁:通过RedissonClient实例调用getFairLock()方法,获取一个公平锁。
公平锁:它保证了多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待五秒钟后继续下一个线程。也就是说,如果前面有五个线程都处于等待状态,那么后面的线程会等待至少25秒。
- 联锁:可以将多个锁对象关联到 一个锁对象。通过创建RedissonMultiLock。
这时,所有的锁都上所才算成功。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
//业务逻辑
lock.unlock();
- 红锁:当有大部分(一半以上)锁加锁成功后,才算真正获得锁。 ```java RLock lock1 = redissonInstance1.getLock(“lock1”); RLock lock2 = redissonInstance2.getLock(“lock2”); RLock lock3 = redissonInstance3.getLock(“lock3”);
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 同时加锁:lock1 lock2 lock3 // 红锁在大部分节点上加锁成功就算成功。 这里有3个锁,至少2个加锁成功,才算真的加锁成功 lock.lock(); … lock.unlock();
- 读写锁:上读锁的时候可以多个线程获取,上写锁的时候只能由一个线程获取,当写锁未释放的时候,读锁会阻塞获取不了,直到写锁释放。通过RedissonClient实例调用getReadWriteLock()方法获得读写锁,然后读写锁实例调用readLock()方法会上读锁,调用writeLock()方法会上写锁。
```java
/**
* 保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁)。读锁是一个共享锁
* 写锁没释放,读就必须等待
*/
@GetMapping("/read")
@ResponseBody
public String readValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.readLock();
try {
rLock.lock();
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return "";
}
@GetMapping("/write")
@ResponseBody
public String writeValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.writeLock();
try {
//1、改数据加写锁,读数据加读锁
rLock.lock();
TimeUnit.SECONDS.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return "";
}
- 闭锁:通过RedissonClient实例调用getCountDownLatch()获取闭锁。
闭锁:确保某个逻辑在它需要的所有资源都具备之后才继续执行。
- 信号量:通过RedissonClient实例调用getSemaphore()来获取信号量。
信号量:一个计数信号量。用来控制同时访问特定资源的线程所里,通过协调各个线程,以保证合理的使用公共资源。一般用于流量控制,特别是公共资源有限的应用场景。例如数据的连接,假设数据库连接上线为10个,并线程并发操作数据库可以使用Semaphore累控制并发操作数据库的线程数量最多为10个。
3.7 Redisson中读写锁ReadWriteLock示例
使用读写锁的好处:
保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁、独享锁)。读锁是一个共享锁。
//写锁没释放读就必须等待
// 读 + 读: 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
// 写 + 读: 等待写锁释放
// 写 + 写: 阻塞方式,下一个 写锁必须等待上一个写锁完全释放,才能进行;
// 读 + 写: 有读锁。写也需要等待。
// 只要有写的存在,都必须等待
@Autowired
RedissonClient redisson;
@Autowired
StringRedisTemplate redisTemplate;
//保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁、独享锁)。读锁是一个共享锁
//写锁没释放读就必须等待
// 读 + 读: 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
// 写 + 读: 等待写锁释放
// 写 + 写: 阻塞方式
// 读 + 写: 有读锁。写也需要等待。
// 只要有写的存在,都必须等待
@GetMapping("/write")
@ResponseBody
public String writeValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
String uuid = "";
RLock writeLock = readWriteLock.writeLock();
try {
// 1.改数据,加写锁;读数据,加读锁
writeLock.lock();
uuid = UUID.randomUUID().toString();
// 休眠30s,模拟写入数据要花费时间
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue",uuid);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
return uuid;
}
@GetMapping("/read")
@ResponseBody
public String readValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
String uuid = "";
// 加读锁
RLock readLock = readWriteLock.readLock();
readLock.lock();
try {
uuid = redisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
return uuid;
}
3.8 Redisson中信号量Semaphore示例
@Autowired
RedissonClient redisson;
/**
* Redisson信号量使用场景模拟
* 车库停车,
* 假设有3车位,来一辆车占用一个车位,走一辆车释放一个车位,想要停车,要看车位够不够
*
* 信号量也可以用作分布式限流,来限制每一个应用的流量;
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
//获取一个信号(可以看作获取一个值),此场景看作占一个车位
// acquire()是阻塞式获取,即一定要获取一个值,获取不到会一直等待获取
// park.acquire();
// tryAcquire()是尝试获取,获取得到就获取,获取不到,就返回false
boolean b = park.tryAcquire();
if(b){
//执行业务
}else {
return "error";
}
return "ok=>"+b;
}
@GetMapping("/go")
@ResponseBody
public String go() {
RSemaphore park = redisson.getSemaphore("park");
park.release(); //释放一个信号,此场景看作释放一个车位
return "ok";
}
3.9 Redisson中闭锁CountDownLatch示例
@Autowired
RedissonClient redisson;
/**
* 闭锁 案例
*
* 放假,锁门
* 1班没人了,2班没人了......
* 5个班全部走完,我们可以锁大门
*/
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await(); //等待闭锁都完成
return "放假了...";
}
@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable("id") Long id){
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown();//计数减一;
// CountDownLatch
return id+"班的人都走了...";
}