一、分布式锁应用场景:
1.互联网秒杀
2.抢优惠卷
3.接口幂等性校验
4.其它一些场景
二、几大最常见的秒杀减库存的例子与问题分析:
1.用synchronized锁
//方式1:用synchronized锁控制并发
@RequestMapping("/deduct_stock1")
public String deductStock1() {
String lockKey = "product_101";
synchronized (this){
int count = Integer.parseInt(stringRedisTemplate.opsForValue().get(lockKey));
if(count<=0){
return "库存为0";
}
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "azhi");
if (!result) {
return "error_code";
}
}
return "end";
}
** 问题:如果部署是在单机tomcat情况下,这个代码是没有问题的,synchronized确保了在多线程情况下同时只会有一个线程执行到代码块里的代码,但如果是分布式部署,那在高并发场景下就不可控了
2.用redis分布式锁
//方式2:用redis分布式锁,拿到锁的才能操作,操作完再释放锁给后面的人去拿锁
@RequestMapping("/deduct_stock2")
public String deductStock2() {
String lockKey = "product_101";
try {
//jedis.setnx(k,v)
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "azhi");
if (!result) {
return "error_code";
}
//这里执行其它的一些逻辑。。。
} finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
** 问题:如果在拿到锁并在执行其它逻辑过程中死机了,代码没执行到finally,锁没解到,那么锁就一直在,后面的人会一直拿不到锁
3.用redis分布式锁,并给锁加一个过期时间
//方式3:用redis分布式锁,并给锁加一个过期时间
@RequestMapping("/deduct_stock3")
public String deductStock3() {
String lockKey = "product_101";
try {
//stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "azhi", 10, TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
//这里执行其它的一些逻辑。。。
} finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
** 问题:如在执行其它逻辑的代码块用时过多,锁的过期时间到了,锁自动失效了,其它线程成功拿到了锁,但最后finally里却把其它用户加的锁给删除了。
加入clientId实现谁加的锁谁来删除
//方式3的优化:加入clientId来防止最后finally里不小心把其它用户加的锁给删除的问题
@RequestMapping("/deduct_stock33")
public String deductStock33() {
String lockKey = "product_101";
//这里生成一个uuid,记录锁的生成者。
String clientId = UUID.randomUUID().toString();
try {
//stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
//这里执行其它的一些逻辑。。。
} finally {
//这里判断锁是不是由自己这个线程加的,如果不是的话就不删除锁,
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete(lockKey);
}
}
return "end";
}
问题:如果逻辑处理代码确实要处理15秒,那么锁过期之后其它用户还是一样能拿到锁,这样依然是会有bug存在,如超卖之类的问题。也就是锁过期时间我们并不好根据逻辑代码去控制。
优化思路1:可以在主线程中加锁,然后开启一个分线程去判断给锁续命, 如果锁到期了依然还没执行到finally的话给锁续时间。这个思路既可以解决方法1的问题,又可以解决到方法2中的问题,但实现起来会比较麻烦,坑多,推荐用redisson
** 优化思路2:用lua脚本实现
4.用lua脚本去实现
//方式4,用lua脚本实现
@RequestMapping("/deduct_stock4")
public String deductStock4
{
String script = " local count = redis.call('get', KEYS[1]) " +
" local a = tonumber(count) " +
" local b = tonumber(ARGV[1]) " +
" if a >= b then " +
" redis.call('set', KEYS[1], a-b) " +
" return 1 " +
" end " +
" return 0 ";
//商品product_101,减10个库存
Object obj = jedis.eval(script, Arrays.asList("product_101"), Arrays.asList("10"));
}
redis不仅仅可以执行命令,也可以执行带逻辑运算的lua脚本,在lua脚本里的运算都是带原子性的,要么就全部不执行要么就全部执行,一般也是用lua脚本去代替redis的事务。而且在高并发情况下多个提交上去的lua脚本都会在redis队列里排队等待被执行的,也就不存在并发时数据超卖的问题了。
5.用redisson分布式锁实现
@Autowired
private RedissonClient redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
/***
* 方式5:用redisson
* @return
*/
@RequestMapping("/deduct_stock5")
public String deductStock5() {
String lockKey = "product_101";
RLock redissonLock = redisson.getLock(lockKey);
try {
//加锁,实现锁续命功能,默认是30秒过期,每过10会进行一次判断 setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
redissonLock.lock();
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
redissonLock.unlock();
}
return "end";
}
redisson引用与配置
引用了redisson-spring-boot-starter就不需要再引用spring-boot-starter-data-redis了,如果用jedis的话那需要另外引入jedis的依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>
</dependencies>
<!--打包jar-->
<build>
<finalName>test2</finalName>
<plugins>
<!--spring-boot-maven-plugin-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.4.2</version>
<!--解决打包出来的jar文件中没有主清单属性问题-->
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
单机模式和集群模式配置类:
@Configuration
public class MyRedissonConfig {
@Bean(destroyMethod="shutdown")
RedissonClient redisson() throws IOException {
//1、单机模式
Config config = new Config();
config.useSingleServer()
.setAddress("redis://xxxx:6379").setPassword("xxxx").setDatabase(1);
//2、集群模式
/*config.useClusterServers()
.addNodeAddress("").setPassword("")
.addNodeAddress("").setPassword("")
.addNodeAddress("").setPassword("")
.addNodeAddress("").setPassword("")
.addNodeAddress("").setPassword("")
.addNodeAddress("").setPassword("");*/
return Redisson.create(config);
}
}
如下,并发请求下不会出现超卖情况。
6.redisson实现分布式锁原理
7.redis分段锁
场景:商品IP12准备搞促销,redis采用的是cluster集群,IP12分配到集群节点A上,库存1000个,需要并发性能提升10倍左右。
解决方案:可以通过设置商品key的hash值让1000个库存平均分配到10个不同的redis集群节点上。
8.Redis分布式锁与Zookeeper分布式锁区别
上面这个场景其实还可以用zookeeper分布式锁来实现,zookeeper的数据也是写在内存里,而且zookeeper分布式锁是强一致性的,zookeeper有写数据过半机制,也就是一个写数据操作要过半机器都写入成功才会给客户端返回成功,从而保证了数据的强一致性,当然性能就没有Redis那么高了,毕竟每一个写操作都要走过半机制是很影响性能的。
而Redis没有写数据过半机制,只要主节点写成功了就会返回成功给客户端,从节点的数据同步是异步进行处理的,如果主节点写入成功,而从节点还没同步到数据的情况下主节点掉线了redis选举了一个从节点当做主节点了,那么就会出现锁丢失数据不一致的情况。但Redis提供了配置min-replicas-to-write N参数,让master停止写入的方式,当健康的slave的个数小于N,mater就禁止写入,这个配置虽然不能保证N个slave都一定能接收到master的写操作,但是能避免没有足够健康的slave的时候,master拒绝写入只能读来尽可能避免数据的丢失,但是这样也没有办法百分百保证数据的一致。
所以对于分布式锁这个场景到底是要选择Redis还是选择Zookeeper呢?
这个问题就要看你是如何进行取舍的了,如果不是很注重高可用,一定要百分百的一致性的话那就用Zookeeper,如果觉得更加注重系统的高可用,小概率的不一致可以接受的话那就用Redis。