[案例:]springboot-redis.zip
什么是分布式锁
加锁的目的就是为了互斥访问“临界资源”.(说白了就是同一时刻只允许一个线程访问)分布式环境下如果需要做到对临界资源的互斥访问,就需要加锁,那这把锁就是分布式锁.
如何实现锁
- 单线程环境下,如果要解决资源竞争问题,可以使用synchronized等解决
- 多线程环境下我们可以使用redis,zookeeper,数据库等来解决
分布式锁的特点
- 互斥:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥
- 可重入:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁
- 锁超时:和本地锁一样支持锁超时,防止死锁
- 高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级
- 加锁与解锁是同一个对象
- …
redis如何实现分布式锁
使用redis自带命令实现分布式锁
redis中存在一条命令setnx,如果不存在则更新,对某个资源加锁可以
setNx key value
这里有个问题,加锁了之后如果机器宕机那么这个锁就不会得到释放所以会加入过期时间,加入过期时间需要和setNx同一个原子操作,在Redis2.8之前我们需要使用Lua脚本达到我们的目的,但是redis2.8之后redis支持nx和ex操作是同一原子操作。
set key value ex 10 nx
常见的EX、PX、NX、XX的含义
- EXseconds – 设置键key的过期时间,单位时秒
- PXmilliseconds – 设置键key的过期时间,单位时毫秒
- NX – 只有键key不存在的时候才会设置key的值
- XX – 只有键key存在的时候才会设置key的值
场景示例
现在需要减库存服务,采用的是集群部署.这时候使用传统的synchronized已经不能解决(synchronized是jvm进程内的锁),此时就需要分布式锁来实现.保证同一时刻只有一个线程操作临界资源.
代码如下:
下面是使用redis自带的setnx命令来实现
@GetMapping("/delete_stock")
public String deleteProduct() {
String lockKey = "product_001";
String clientId = UUID.randomUUID().toString();
try {
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
if (result != null && !result) {
return "error001";
}
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get("stock")));
if (stock > 0) {
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减库存成功,剩余:" + realStock);
} else {
System.out.println("库存不足");
}
} finally {
if (clientId.equals(redisTemplate.opsForValue().get(lockKey))) {
redisTemplate.delete(lockKey);
}
}
return "end";
}
如上述所示一个简单的分布式锁就实现了.但是仔细看还是会存在一个问题,就是这个超时时间不好确定.
假如业务执行需要15s,这个超时时间设置为10s.这时候执行10s锁被释放,新的线程进来,还是会存在超卖问题.
Redisson实现分布式锁
Redisson框架举例说明,它已经封装了一套基于Redis的分布式框架,使用起来也很简单.
Redisson依赖
需要在maven中引入redisson的包
<!-- Redisson 实现分布式锁 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.11.5</version>
</dependency>
Redission配置
针对redis 单实例
需要配置redis的地址和密码@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer()
.setAddress(redissonAddress).setPassword(redissonPassword);
RedissonClient redisson = Redisson.create(config);
return redisson;
}
针对redis 哨兵
需要配置redis哨兵的地址和密码@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSentinelServers()
.addSentinelAddress("redis://127.0.0.1:26379")
.addSentinelAddress("redis://127.0.0.2:26379")
.addSentinelAddress("redis://127.0.0.3:26379")
.setPassword(redissonPassword);
RedissonClient redisson = Redisson.create(config);
return redisson;
}
针对redis 集群
需要配置redis集群的地址和密码,并设置集群的扫描间隔时间@Bean
public RedissonClient redissonClient()
{
Config config = new Config();
config.useClusterServers()
// 集群状态扫描间隔时间,单位是毫秒
.setScanInterval(2000)
//cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
.addNodeAddress("redis://127.0.0.1:6379" )
.addNodeAddress("redis://127.0.0.1:6380")
.addNodeAddress("redis://127.0.0.1:6381")
.addNodeAddress("redis://127.0.0.1:6382")
.addNodeAddress("redis://127.0.0.1:6383")
.addNodeAddress("redis://127.0.0.1:6384")
.setPassword(redissonPassword);
RedissonClient redisson = Redisson.create(config);
return redisson;
}
场景示例
当前示例还是扣减库存.将上述示例进行修改
@Autowired
private RedissonClient redissonClient;
@GetMapping("/delete_stock2")
public String deleteProduct2() {
String lockKey = "product_001";
RLock lock = redissonClient.getLock(lockKey);//获取redis锁
lock.lock();//加锁,实现锁续命.默认30s
try {
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get("stock")));
if (stock > 0) {
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减库存成功,剩余:" + realStock);
} else {
System.out.println("库存不足");
}
} finally {
lock.unlock();//解锁
}
return "end";
}
下面测试下
模拟并发100个线程同时抢购商品.这里使用了CyclicBarrier类
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Slf4j
public class RedisDisLockControllerTest {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RedissonClient redissonClient;
@Before
public void init() {
String productNum = "3";
redisTemplate.opsForValue().set("stock", productNum);
System.out.println("初始化商品:" + productNum + "个");
}
@Test
public void deleteProduct() {
int count = 10;//并发线程数
ExecutorService executorService = Executors.newFixedThreadPool(count);
/**
* param1:是参与线程的个数
* param2:第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
*
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(count, () -> System.out.println("==========所有线程准备完毕======="));
for (int i = 0; i < count; i++) {
executorService.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + "-->来到栅栏准备抢购商品");
cyclicBarrier.await();//等待最后一个线程初始化完毕
this.getProducts();//抢购商品
} catch (Exception e) {
e.printStackTrace();
}
});
}
try {
//这里睡10000毫秒是为了主线程不关闭
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown(); // 关闭线程池
}
private void getProducts() {
String lockKey = "product_001";
RLock lock = redissonClient.getLock(lockKey);//获取redis锁
lock.lock();//加锁,实现锁续命.默认30s
try {
int stock = Integer.parseInt(Objects.requireNonNull(redisTemplate.opsForValue().get("stock")));
if (stock > 0) {
int realStock = stock - 1;
redisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减库存成功,剩余:" + realStock);
} else {
System.out.println("库存不足");
}
} finally {
lock.unlock();//解锁
}
}
}
Redisson工具类封装
下面是基于Redisson实现的分布式锁帮助类,可以拿去直接使用,包含加锁、释放锁、带时间的加锁、尝试获取锁等。
@Slf4j
@Component
public class RedisLockHelper {
@Autowired
private RedissonClient redissonClient;
/**
* 加锁
* @param lockKey
* @return
*/
public RLock lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
return lock;
}
/**
* 释放锁
* @param lockKey
*/
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
/**
* 释放锁
* @param lock
*/
public void unlock(RLock lock) {
lock.unlock();
}
/**
* 带超时的锁
* @param lockKey
* @param timeout 超时时间 单位:秒
*/
public RLock lock(String lockKey, int timeout) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, TimeUnit.SECONDS);
return lock;
}
/**
* 带超时的锁
* @param lockKey
* @param unit 时间单位
* @param timeout 超时时间
*/
public RLock lock(String lockKey, TimeUnit unit ,int timeout) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock(timeout, unit);
return lock;
}
/**
* 尝试获取锁
* @param lockKey
* @param waitTime 最多等待时间
* @param leaseTime 上锁后自动释放锁时间
* @return
*/
public boolean tryLock(String lockKey, int waitTime, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
try {
return lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return false;
}
}
/**
* 尝试获取锁
* @param lockKey
* @param unit 时间单位
* @param waitTime 最多等待时间
* @param leaseTime 上锁后自动释放锁时间
* @return
*/
public boolean tryLock(String lockKey, TimeUnit unit, int waitTime, int leaseTime) {
RLock lock = redissonClient.getLock(lockKey);
try {
return lock.tryLock(waitTime, leaseTime, unit);
} catch (InterruptedException e) {
return false;
}
}
}