1. 原理
1.1 高效分布式锁
当我们在设计分布式锁的时候,我们应该考虑分布式锁至少要满足的一些条件,同时考虑如何高效的设计分布式锁,这里我认为以下几点是必须要考虑的。
1、互斥
在分布式高并发的条件下,我们最需要保证,同一时刻只能有一个线程获得锁,这是最基本的一点。
2、防止死锁
在分布式高并发的条件下,比如有个线程获得锁的同时,还没有来得及去释放锁,就因为系统故障或者其它原因使它无法执行释放锁的命令,导致其它线程都无法获得锁,造成死锁。
所以分布式非常有必要设置锁的有效时间,确保系统出现故障后,在一定时间内能够主动去释放锁,避免造成死锁的情况。
3、性能
对于访问量大的共享资源,需要考虑减少锁等待的时间,避免导致大量线程阻塞。所以在锁的设计时,需要考虑两点。
锁的颗粒度要尽量小。比如你要通过锁来减库存,那这个锁的名称你可以设置成是商品的ID,而不是任取名称。这样这个锁只对当前商品有效,锁的颗粒度小。锁的范围尽量要小。比如只要锁2行代码就可以解决问题的,那就不要去锁10行代码了。4、重入
我们知道 ReentrantLock 是可重入锁,那它的特点就是:同一个线程可以重复拿到同一个资源的锁。重入锁非常有利于资源的高效利用。关于这点之后会做演示。
针对以上Redisson都能很好的满足,下面就来分析下它。
1.2 Redisson原理分析
为了更好的理解分布式锁的原理,我这边自己画张图通过这张图来分析。

1、加锁机制
- 线程去获取锁,获取成功: 执行 lua 脚本,保存数据到 redis 数据库。
- 线程去获取锁,获取失败: 一直通过 while 循环尝试获取锁,获取成功后,执行 lua 脚本,保存数据到 redis 数据库。
2、watch dog 自动延期机制
这个比较难理解,找了些许资料感觉也并没有解释的很清楚。这里我自己的理解就是:
在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(不设置默认30秒),这样的目的主要是防止死锁的发生。
但在实际开发中会有下面一种情况:
//设置锁1秒过去redissonLock.lock("redisson", 1);/*** 业务逻辑需要咨询2秒*/redissonLock.release("redisson");/*** 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,* 在 线程1 执行1秒后,这个锁就自动过期了,* 那么这个时候 线程2 进来了。那么就存在 线程1 和 线程2 同时在这段业务逻辑里执行代码,这是不合理的。* 而且如果是这种情况,那么在解锁时系统会抛异常,因为解锁和加锁已经不是同一线程了,具体后面代码演示。*/
所以这个时候看门狗就出现了,它的作用就是 线程1 业务还没有执行完,时间就过了,线程1 还想持有锁的话,就会启动一个 watch dog 后台线程,不断的延长锁 key 的生存时间。
注意 正常这个看门狗线程是不启动的,还有就是这个看门狗启动后对整体性能也会有一定影响,所以不建议开启看门狗。
3、为啥要用lua脚本呢?
这个不用多说,主要是如果你的业务逻辑复杂的话,通过封装在 lua 脚本中发送给 redis,而且 redis 是单线程的,这样就保证这段复杂业务逻辑执行的原子性。
4、可重入加锁机制
Redisson 可以实现可重入加锁机制的原因,我觉得跟两点有关:
- Redis 存储锁的数据类型是 Hash 类型
- Hash 数据类型的 key 值包含了当前线程信息。
下面是redis存储的数据
这里表面数据类型是 Hash 类型,Hash 类型相当于我们 java 的 <key,<key1,value>> 类型,这里 key 是指 「redisson」
它的有效期还有9秒,我们再来看里们的key1值为078e44a3-5f95-4e24-b6aa-80684655a15a:45它的组成是:「guid + 当前线程的ID」。后面的 value 是就和可重入加锁有关。(GUID:Globally Unique Identifier,即全局唯一标识)
举图说明
上面这图的意思就是可重入锁的机制,它最大的优点就是相同线程不需要在等待锁,而是可以直接进行相应操作。
5、Redis分布式锁的缺点
Redis分布式锁会有个缺陷,就是在 Redis 哨兵模式下:
客户端1 对某个master节点写入了redisson锁,此时会异步复制给对应的 slave节点。但是这个过程中一旦发生 master节点宕机,主备切换,slave 节点从变为了 master 节点。
这时客户端2 来尝试加锁的时候,在新的 master 节点上也能加锁,此时就会导致多个客户端对同一个分布式锁完成了加锁。
这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。
缺陷在哨兵模式或者主从模式下,如果 master 实例宕机的时候,可能导致多个客户端同时完成加锁。
- 解决方式:参考 Redis 的 Redlock 算法
2. 源码解析
Redisson 分布式锁的实现是基于 RLock 接口,RedissonLock 实现 RLock 接口。
2.1 RLock 接口
1. 概念
public interface RLock extends Lock, RExpirable, RLockAsync
很明显 RLock 是继承 Lock 锁,所以他有 Lock 锁的所有特性,比如 lock、unlock、trylock 等特性,同时它还有很多新特性:强制锁释放,带有效期的锁。
2. RLock锁API
public interface RRLock {//----------------------Lock接口方法-----------------------/*** 加锁 锁的有效期默认30秒*/void lock();/*** tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false .*/boolean tryLock();/*** tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,* 在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。** @param time 等待时间* @param unit 时间单位 小时、分、秒、毫秒等*/boolean tryLock(long time, TimeUnit unit) throws InterruptedException;/*** 解锁*/void unlock();/*** 中断锁 表示该锁可以被中断 假如A和B同时调这个方法,A获取锁,B为获取锁,那么B线程可以通过* Thread.currentThread().interrupt(); 方法真正中断该线程*/void lockInterruptibly();//----------------------RLock接口方法-----------------------/*** 加锁 上面是默认30秒这里可以手动设置锁的有效时间** @param leaseTime 锁有效时间* @param unit 时间单位 小时、分、秒、毫秒等*/void lock(long leaseTime, TimeUnit unit);/*** 这里比上面多一个参数,多添加一个锁的有效时间** @param waitTime 等待时间* @param leaseTime 锁有效时间* @param unit 时间单位 小时、分、秒、毫秒等*/boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;/*** 检验该锁是否被线程使用,如果被使用返回True*/boolean isLocked();/*** 检查当前线程是否获得此锁(这个和上面的区别就是该方法可以判断是否当前线程获得此锁,而不是此锁是否被线程占有)* 这个比上面那个实用*/boolean isHeldByCurrentThread();/*** 中断锁 和上面中断锁差不多,只是这里如果获得锁成功,添加锁的有效时间* @param leaseTime 锁有效时间* @param unit 时间单位 小时、分、秒、毫秒等*/void lockInterruptibly(long leaseTime, TimeUnit unit);}
RLock相关接口,主要是新添加了 leaseTime 属性字段,主要是用来设置锁的过期时间,避免死锁。
2.2 RedissonLock实现类
public class RedissonLock extends RedissonExpirable implements RLock
RedissonLock 实现了 RLock 接口,所以实现了接口的具体方法。这里我列举几个方法说明下
1. void lock() 方法
@Overridepublic void lock() {try {lockInterruptibly();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
发现lock锁里面进去其实用的是lockInterruptibly(中断锁,表示可以被中断),而且捕获异常后用 Thread.currentThread().interrupt()来真正中断当前线程,其实它们是搭配一起使用的。
具体有关 lockInterruptibly() 方法讲解推荐一个博客。博客:Lock的lockInterruptibly()
接下来执行流程,这里理下关键几步
/*** 1、带上默认值调另一个中断锁方法*/@Overridepublic void lockInterruptibly() throws InterruptedException {lockInterruptibly(-1, null);}/*** 2、另一个中断锁的方法*/void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException/*** 3、这里已经设置了锁的有效时间默认为30秒 (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()=30)*/RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);/*** 4、最后通过lua脚本访问Redis,保证操作的原子性*/<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));}
那么void lock(long leaseTime, TimeUnit unit)方法其实和上面很相似了,就是从上面第二步开始的。
2. tryLock(long waitTime, long leaseTime, TimeUnit unit)
接口的参数和含义上面已经说过了,现在我们开看下源码,这里只显示一些重要逻辑。
@Overridepublic boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(leaseTime, unit, threadId);//1、 获取锁同时获取成功的情况下,和lock(...)方法是一样的 直接返回True,获取锁False再往下走if (ttl == null) {return true;}//2、如果超过了尝试获取锁的等待时间,当然返回false 了。time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(threadId);return false;}// 3、订阅监听redis消息,并且创建RedissonLockEntry,其中RedissonLockEntry中比较关键的是一个 Semaphore属性对象,用来控制本地的锁请求的信号量同步,返回的是netty框架的Future实现。final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);// 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。// 只有await返回true,才进入循环尝试获取锁if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {@Overridepublic void operationComplete(Future<RedissonLockEntry> future) throws Exception {if (subscribeFuture.isSuccess()) {unsubscribe(subscribeFuture, threadId);}}});}acquireFailed(threadId);return false;}//4、如果没有超过尝试获取锁的等待时间,那么通过While一直获取锁。最终只会有两种结果//1)、在等待时间内获取锁成功 返回true。2)等待时间结束了还没有获取到锁那么返回false。while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(leaseTime, unit, threadId);// 获取锁成功if (ttl == null) {return true;}// 获取锁失败time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(threadId);return false;}}}
重点 tryLock 一般用于特定满足需求的场合,但不建议作为一般需求的分布式锁,一般分布式锁建议用 void lock(long leaseTime, TimeUnit unit)。因为从性能上考虑,在高并发情况下后者效率是前者的好几倍
3. unlock()
解锁的逻辑很简单。1
@Overridepublic void unlock() {// 1.通过 Lua 脚本执行 Redis 命令释放锁Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; " +"end;" +"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +"return nil;" +"end; " +"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +"if (counter > 0) then " +"redis.call('pexpire', KEYS[1], ARGV[2]); " +"return 0; " +"else " +"redis.call('del', KEYS[1]); " +"redis.call('publish', KEYS[2], ARGV[1]); " +"return 1; "+"end; " +"return nil;",Arrays.<Object>asList(getName(), getChannelName()),LockPubSub.unlockMessage, internalLockLeaseTime,getLockName(Thread.currentThread().getId()));// 2.非锁的持有者释放锁时抛出异常if (opStatus == null) {throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + Thread.currentThread().getId());}// 3.释放锁后取消刷新锁失效时间的调度任务if (opStatus) {cancelExpirationRenewal();}}
使用 EVAL 命令执行 Lua 脚本来释放锁:
- key 不存在,说明锁已释放,直接执行
publish命令发布释放锁消息并返回1。 - key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回
nil。 - 因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行
hincrby对锁的值减一。 - 释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回
0;如果刚才释放的已经是最后一把锁,则执行del命令删除锁的 key,并发布锁释放消息,返回1。
注意这里有个实际开发过程中,容易出现很容易出现上面第二步异常,非锁的持有者释放锁时抛出异常。比如下面这种情况
//设置锁1秒过去redissonLock.lock("redisson", 1);/*** 业务逻辑需要咨询2秒*/redissonLock.release("redisson");/*** 线程1 进来获得锁后,线程一切正常并没有宕机,但它的业务逻辑需要执行2秒,这就会有个问题,在 线程1 执行1秒后,这个锁就自动过期了,* 那么这个时候 线程2 进来了。在线程1去解锁就会抛上面这个异常(因为解锁和当前锁已经不是同一线程了)*/
3. 项目代码
3.1 项目概述
1. 技术架构
项目总体技术选型 SpringBoot2.1.5 + Maven3.5.4 + Redisson3.5.4 + lombok(插件)
2. 加锁方式
该项目支持 自定义注解加锁 和 常规加锁 两种模式
自定义注解加锁
@DistributedLock(value="goods", leaseTime=5)public String lockDecreaseStock(){//业务逻辑}
常规加锁
//1、加锁redissonLock.lock("redisson", 10);//2、业务逻辑//3、解锁redissonLock.unlock("redisson");
3. Redis部署方式
该项目支持四种 Redis 部署方式
1、单机模式部署2、集群模式部署3、主从模式部署4、哨兵模式部署
该项目已经实现支持上面四种模式,你要采用哪种只需要修改配置文件application.properties,项目代码不需要做任何修改。
4、项目整体结构
redis-distributed-lock-core # 核心实现|---src|---com.jincou.redisson|# 通过注解方式 实现分布式锁---annotation|# 配置类实例化RedissonLock---config|# 放置常量信息---constant|# 读取application.properties信息后,封装到实体---entity|# 支持单机、集群、主从、哨兵 代码实现---strategyredis-distributed-lock-web-test # 针对上面实现类的测试类|---src|---java|---com.jincou.controller|# 测试 基于注解方式实现分布式锁---AnnotatinLockController.java|# 测试 基于常规方式实现分布式锁---LockController.java---resources| # 配置端口号 连接redis信息(如果确定部署类型,那么将连接信息放到core项目中)---application.properties
3.2 测试
模拟1秒内100个线程请求接口,来测试结果是否正确。同时测试三种不同的锁:lock锁、trylock锁、注解锁。
1. lock 锁
/*** 模拟这个是商品库存*/public static volatile Integer TOTAL = 10;@GetMapping("lock-decrease-stock")public String lockDecreaseStock() throws InterruptedException {redissonLock.lock("lock", 10);if (TOTAL > 0) {TOTAL--;}Thread.sleep(50);log.info("======减完库存后,当前库存===" + TOTAL);//如果该线程还持有该锁,那么释放该锁。如果该线程不持有该锁,说明该线程的锁已到过期时间,自动释放锁if (redissonLock.isHeldByCurrentThread("lock")) {redissonLock.unlock("lock");}return "=================================";}
压测结果:
没问题,不会超卖!
2. tryLock 锁
/*** 模拟这个是商品库存*/public static volatile Integer TOTAL = 10;@GetMapping("trylock-decrease-stock")public String trylockDecreaseStock() throws InterruptedException {if (redissonLock.tryLock("trylock", 10, 5)) {if (TOTAL > 0) {TOTAL--;}Thread.sleep(50);redissonLock.unlock("trylock");log.info("====tryLock===减完库存后,当前库存===" + TOTAL);} else {log.info("[ExecutorRedisson]获取锁失败");}return "===================================";}
测试结果:
没有问题 ,不会超卖!
3. 注解锁
/*** 模拟这个是商品库存*/public static volatile Integer TOTAL = 10;@GetMapping("annotatin-lock-decrease-stock")@DistributedLock(value="goods", leaseTime=5)public String lockDecreaseStock() throws InterruptedException {if (TOTAL > 0) {TOTAL--;}log.info("===注解模式=== 减完库存后,当前库存===" + TOTAL);return "=================================";}
测试结果:
没有问题 ,不会超卖!
通过实验可以看出,通过这三种模式都可以实现分布式锁,然后呢?哪个最优。
3.3 三种锁的锁选择
观点 :最完美的就是 **lock** 锁,因为
1、tryLock 锁是可能会跳过减库存的操作,因为当过了等待时间还没有获取锁,就会返回 false,这显然很致命!2、注解锁只能用于方法上,颗粒度太大,满足不了方法内加锁。
1. lock PK tryLock 性能的比较
模拟5秒内1000个线程分别去压测这两个接口,看报告结果!
1)lock锁
压测结果 1000个线程平均响应时间为31324。吞吐量 14.7/sec
2)tryLock锁
压测结果 1000个线程平均响应时间为28628。吞吐量 16.1/sec
这里只是单次测试,有很大的随机性。从当前环境单次测试来看,tryLock 稍微高点。
2. 常见异常 attempt to unlock lock, not ······
在使用RedissonLock锁时,很容易报这类异常,比如如下操作
//设置锁1秒过去redissonLock.lock("redisson", 1);/*** 业务逻辑需要咨询2秒*/redissonLock.release("redisson");
上面在并发情况下就会这样
造成异常原因:
线程1 进来获得锁后,但它的业务逻辑需要执行2秒,在 线程1 执行1秒后,这个锁就自动过期了,那么这个时候线程2 进来了获得了锁。在线程1去解锁就会抛上面这个异常(因为解锁和当前锁已经不是同一线程了)
所以我们需要注意,设置锁的过期时间不能设置太小,一定要合理,宁愿设置大点。
正对上面的异常,可以通过isHeldByCurrentThread()方法,
//如果为false就说明该线程的锁已经自动释放,无需解锁if (redissonLock.isHeldByCurrentThread("lock")) {redissonLock.unlock("lock");}
