一、分布式锁分类
二、用redission实现分布式锁
看门狗大致原理图
可重入锁机制
1、分布式锁实现步骤
1.添加依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.8.2</version>
</dependency>
2.添加配置文件:
由于使用的springoot作为开发环境,所有自定义bean进行注入:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient getClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
本次使用单机的redis作为测试用例,只有一个几点,Redisson支持集群模式的redis.
3.实现分布式锁:
RLock lock = redisson.getLock("lockName");
try{
// 1. 最常见的使用方法
//lock.lock();
// 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁
//lock.lock(10, TimeUnit.SECONDS);
// 3. 尝试加锁,最多等待2秒,上锁以后8秒自动解锁
boolean res = lock.tryLock(2, 8, TimeUnit.SECONDS);
if(res){ //成功
//处理业务
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放锁
lock.unlock();
}
4.小案例
long lockTimeOut = 5000L;
String lockName = "lock";
Long setnx = RedisSharedPoolUtil.setnx(lockName, String.valueOf(System.currentTimeMillis() + lockTimeOut));
if (setnx != null && setnx.intValue() == 1) {
//返回值是1,表示成功,获取锁
//do something
} else {
//为获取到锁,继续判断,看能否重置,并且获取到锁.
String s = RedisSharedPoolUtil.get(lockName);
if (s != null && System.currentTimeMillis() > Long.parseLong(s)) {
//不为空,而且已经超时了 //重置锁
//上面的s 和这个set有可能不一样,因为是集群tomcat.返回给定的key,->旧值判断,是否可以获取锁
//当key没有旧值时,即key不存在,返回nil->获取锁
//这里我们set了一个新value,获取旧值
String set = RedisSharedPoolUtil.getSet(lockName, String.valueOf(System.currentTimeMillis() + lockTimeOut));
if (set == null || (set != null && StringUtils.equals(s, set))) {
//真正获取到锁
//do something
} else {
log.info("没有获取到分布式锁:{}", lockName);
}
} else {
log.info("没有获取到分布式锁:{}", lockName);
}
log.info("没有获取到分布式锁:{}", lockName);
}
log.info("完成定时关闭订单的任务");
三、分布式锁相关面试题
1、分布式锁的主要用途
保证一个方法在高并发情况下的同一时间只能被同一个线程执行。
2、分布式锁避免哪些问题
(1)允许多个客户端操作共享资源
这种情况下,对共享资源的操作一定是幂等性操作,无论你操作多少次都不会出现不同结果。在这里使用锁,无外乎就是为了避免重复操作共享资源从而提高效率。
(2)只允许一个客户端操作共享资源
这种情况下,对共享资源的操作一般是非幂等性操作。在这种情况下,如果出现多个客户端操作共享资源,就可能意味着数据不一致,数据丢失。
3.redis的分布式锁如何实现?
正常情况下redis使用setnx lock加锁,使用del lock释放锁就可以了。但是如果还没释放锁,服务中途就挂了,这样锁得不到释放造成死锁。可以加上超时时间,等服务启了在释放锁。
因为setnx和expire指令不是原子性,redis在2.8以后加入set扩展命令,使得这两个命令能一起执行。
问题一:分布式锁不能解决超时问题
如果在加锁或者释放锁的期间,业务逻辑太长了导致超时时间过了业务还没执行完,第二个线程重新持有锁 就造成业务的得不到严格执行。这时候需要lua脚本执行。确保原子性。
问题二:可重入锁
redis使用redission插件,首先客户端会发lua脚本个redis查找是否有同名锁,没有就加锁。有就等待该线程释放锁。锁一旦加上会启动一个watch dog,这是一个后台线程,每过十秒检查是否持有锁,如果还持有就会延长key的生存时间。
如果reids主节点宕机了,那么系统会自动切换到slave节点上。但是主从在切换过程中,主从复制是异步的操作,就导致锁丧失了安全性,可能导致第二个线程占有原先的锁。
问题三:主节点宕机,锁安全得不到保障
也是使用redission,其中有一个redlock红锁。Redlock的算法大致是这样。假设有N个主节点。在此我们假设N为5。这些节点相互独立,互不影响。在不同服务器上运行实例。保证不会同时宕机。
客户端应该执行以下操作:
获取当前时间,精准到毫秒
使用相同的key和value依次请求5个redis获取锁。这时候客户端应该建立请求响应时间和超时时间。超时时间应该小于锁的失效时间。这样可以避免如果是redis挂了,客户端就没必要等待,继续去下一个redis获取锁。
只有获取超过半数redis的锁并且获取锁的时间小于锁的失效时间才算获取成功。
如果获取锁失败 应该释放全部的锁
问题四:加锁失败怎么处理
抛出异常,通知用户稍后重试
Sleep 一会重试
将请求转移到延时队列。稍后重试
[
](https://blog.csdn.net/My_daidai/article/details/107232107)