一、介绍
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。wiki
驻内存数据网格:
- 数据是分布式存储在多台server上的。
- 每台server都是active模式。
- 数据模型一般是面向对象和非关系型的。
- 依据须要,常常会增减server。
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
二、Redisson 解决的问题
2.1 redis 的死锁
在使用Redis的过程中,可能发生死锁:
- redis宕机
a. 客户端a向主节点请求加锁,并加锁成功。
b. 当主节点向从节点同步时,主节点故障宕机,此时客户端a的锁仍未同步到从节点。从节点此时被选为新的主节点。
c. 客户端b向新的主节点请求同一个锁,此时新的主节点不存在该锁,客户端b加锁成功。此时同时存在两个客户端得到了同一个锁。 - 业务超时
a. 服务器从redis获取一个锁,但是业务代码执行时间超过了锁的超时时间。
b. 另一个服务请求锁,此时锁已过期,加锁成功,引发线程问题。 - 机器宕机
a. 某个线程获取锁成功。
b. 机器宕机,此时锁一直未释放,造成死锁。其实这个问题,也可以在加锁的同时加上超时时间来解决,不一定要用redisson
String result = jedis.set(lockKey, uid,"NX" "PX", expireTime);
其实就等于在redis中执行了 :set key value nx px 10000
2.2 删除锁时非原子性
https://www.cnblogs.com/number7/p/8320259.html
根本问题还是保证操作的原子性,因为是两步操作,即便判断到是当前线程的锁,但是也有可能再删除之前刚好过期,这样删除的就是其他线程的锁。
public static void wrongUnLock1(Jedis jedis, String lockKey, String requestId) {
// 判断加锁与解锁是不是同一个线程
if (requestId.equals(jedis.get(lockKey))) {
// lockkey锁失效,下一步删除的就是别人的锁
jedis.del(lockKey);
}
}
2.2 Redisson 解决死锁的方案
- 针对redis服务宕机
redisson内部实现了 红锁算法的redLock。
红锁算法 :
假设共有n个redis master节点,且这些master节点互相独立,不存在主从复制或者其他集群协调机制。 客户端索取锁需要如下:
- 获取当前机器的时间戳 start。
- 尝试向n个节点用同一个key获取锁,获取前设置网络超时时间 network overtime和锁的超时时间 lock time(超时时间应远小于锁的超时时间,避免节点宕机等情况下一直等待)。
- 当且仅当大于一半(> n/2)的节点获取到锁且耗时(当前时间戳-start)在锁的有效期(lock time)之内,客户端才加锁成功。此时锁的失效时间为 ( lock time - 耗时 )。
- 如果获取锁失败,则在所有节点上执行解锁操作(即使步骤二未成功)。
针对业务执行超长
若某个线程获取锁后,若业务未处理完,WatchDog会每隔10s延长锁的超时时间。
针对服务器宕机
在redison加锁时,执行lua脚本中指定了锁的超时时间。默认30s(可以通过lockWatchDogTimeout配置)必然释放。
2.4 Redisson保证删除锁时原子性
使用lua脚本。判断是不是自己的锁和删除锁 在一个线程中执行。
lua 在redis中是单线程执行的。三、简单使用
3.1 准备工作
引入包
```xml
org.redisson redisson 3.15.0 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<a name="LYdij"></a>
#### 配置
```java
@Configuration
public class MyRedissonConfig {
/**
* 所有Redisson的使用都是通过RedissonClient对象
* <p>
* 服务停止之后会调用shutdown方法进行销毁
*
* @return
*/
@Bean(destroyMethod = "shutdown")
RedissonClient redisson() {
Config config = new Config();
// 集群模式
// config.useClusterServers().addNodeAddress("127.0.0.1:7071", "127.0.0.1:7072");
// 单节点模式
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
config.useSingleServer().setDatabase(1);
config.useSingleServer().setPassword("123");
config.useSingleServer().setTimeout(10);
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
3.2 公平、非公平锁
jdk 中的 公平\非公平锁 https://www.yuque.com/wangchao-volk4/fdw9ek/tlrd62
@Autowired
private RedissonClient redissonClient;
public String indexPage() {
// 非公平锁
RLock lock = redissonClient.getLock("my-lock");
// 公平锁
// RLock fairLock = redissonClient.getFairLock("my-lock");
lock.lock(10, TimeUnit.SECONDS);
//如果设定了自动解锁时间,则自动解锁时间一定要大于业务的执行时间。因为如果指定了时间,则不会自动续期,
// 给redis发送脚本进行占锁,默认超时时间就是我们指定的时间。只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】
// internalLockLeaseTime(看门狗时间) / 3 = 10s续期
// 默认30s
// 1、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删除
// 2、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s之内会自动删除
// 最佳实战 lock.lock(10, TimeUnit.SECONDS);指定超时时间,省掉续期操作,手动解锁
try {
System.out.println("加锁成功,执行业务。、。。。" + Thread.currentThread().getId());
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("解锁......" + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}
3.3 读写锁
// 保证一定能读到最新的数据,修改期间,写锁是一个排他锁(互斥锁)。读锁是一个共享锁
// 写锁没释放读就必须等待
@GetMapping("/write")
public String writeValue() {
RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
// 加写锁
RLock wLock = lock.writeLock();
String s = "";
try {
wLock.lock();
s = UUID.randomUUID().toString();
Thread.sleep(3000);
stringRedisTemplate.opsForValue().set("writeValue", s);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
wLock.unlock();
}
return s;
}
@GetMapping("/read")
public String readValue() {
RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
// 加读锁
RLock rLock = lock.readLock();
rLock.lock();
String s = null;
try {
s = stringRedisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
3.4 countDown
如下操作,
- 首先请求 “/lockDoor” 设定count=5,然后进入等待(count数减完)
- 然后请求 “gogogo/{id}”,每请求一次,count数就会减一。
等count数减完之后,lockDoor() 方法等待结束,输出 “放假了。。。”
// ----------闭锁当 door.countDown()全部减完时;door.await()才结束------------- -------
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redissonClient.getCountDownLatch("door");
door.trySetCount(5);
door.await();
return "放假了。。。";
}
@GetMapping("gogogo/{id}")
public String gogogo(@PathVariable("id") Long id) {
RCountDownLatch door = redissonClient.getCountDownLatch("door");
door.countDown();
return id + "班级走了。。。。";
}
3.5 信号量
“/go” 出发两次,”/park” 就能成功执行两次
“/go” 不执行,”/park” 就会等待。@GetMapping("/park")
public String park() throws InterruptedException {
RSemaphore park = redissonClient.getSemaphore("park");
park.acquire();//获取一个信号,获取一个值,占一个车位
return "ok";
}
@GetMapping("/go")
public String go() {
RSemaphore park = redissonClient.getSemaphore("park");
park.release(); // 释放一个车位
return "ok";
}
四、鸣谢