https://mp.weixin.qq.com/s/eJEVXM0FSXMrCd00n5Cf0w

分布式锁应用场景

举个例子,在电商系统中,如果有个商品库存是1,但是同时有两个请求进来,要怎么把这个商品卖给其中一个人,而不是卖给两个人,发生超卖事件。这个就是分布式锁的作用,当两个请求进来时,先给库存进行加锁,然后给其中一个请求钥匙,只有拿到钥匙的人才可以买东西,买了之后再把钥匙给另一个人,另一个人买的时候发现库存就是0了。

分布式锁特征

分布式锁中主要是应用到高并发场景,所以它需要具有以下几个特征:

  • 互斥性:任意时刻只有一个客户能操作业务
  • 超时释放:持有锁可以超时释放,避免死锁
  • 可重入性:一个线程获取了锁之后,可以再次获取锁
  • 高可用高性能:加锁和释放锁开销尽可能低,同时也要保证高可用
  • 安全性:锁只能被持有的客户端删除,不能被其他客户端删除

    实现方式

    目前分布式锁主流的有两种实现方式Zookeeper和Redis,它们各有优点和缺点,我们先看怎么实现

    Redis

    实现方案一:setnx

    ``` //设置分布式锁 String lockKey = “product_001_key”; //语义:如何不存在则存入缓存中,且返回true; //否则已存在,则返回false即加锁失败 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, “product_001_lock”); //———服务器宕机,则超时时间未设置成功———- //设置锁超时时间30s stringRedisTemplate.expire(lockKey,30, TimeUnit.SECONDS); if (!result) { //没有加锁成功,则返回提示等 } try{

}catch() {

}finally{ //释放锁 stringRedisTemplate.delete(lockKey); }

  1. **问题一:**这里有个问题,就是如果在加锁后就岩机了,那么也会造成死锁的问题。所以将加锁和超时设置改成原子操作

Boolean result = stringRedisTemplate.opsForValue() .setIfAbsent(lockKey,”product_001_lock”, 30, TimeUnit.SECONDS)

  1. **问题二:**但是这里还是有个问题,例如A线程执行到结束需要40秒,而锁在30秒后就释放了,那么A在释放锁的时就会把B的锁给释放了。<br />解决方案:设置线程随机ID,释放锁时判断是否为当前线程加的锁,即使存在线程A因线程执行时间超时被动释放其锁,但至少保证当前超时线程不会释放其他线程加的锁。但是面对线程执行时间大于设置的超时时间,也是会存在并发问题。

String lockKey = “product_001”; String clientId = UUID.randomUUID().toString(); //设置超时时间,且加锁和设置线程ID Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,clientId, 30, TimeUnit.SECONDS)`

if (!result) { //没有加锁成功,则返回提示等 } try{

}catch() {

}finally{ //释放锁:加锁线程ID和当前执行线程ID相同,才允许释放锁 if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))){ stringRedisTemplate.delete(lockKey); } }

  1. **问题三:**针对任务还未执行完成的超时情况,增加续命的流程,Redisson<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/364337/1619432565674-9d4a4e0d-1d6c-4d41-8578-a0fbb9b449ab.png#clientId=ueaad1e4e-fd82-4&from=paste&height=587&id=uba6d0406&margin=%5Bobject%20Object%5D&name=image.png&originHeight=587&originWidth=751&originalType=binary&size=311287&status=done&style=none&taskId=u1e7559c4-e7a3-4bef-9b0b-c2d0f027e0d&width=751)
  2. <a name="V3vq5"></a>
  3. ## 实现方案二:Redisson
  4. 添加配置

package com.zbc.cache.springbootcache.config;

import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

/**

  • @Author zbc
  • @Date 2021/4/26 11:18
  • @Version 1.0
  • @Description **/ @Configuration public class RedissonConfig {

    @Value(“${spring.redis.host}”) private String host;

    @Value(“${spring.redis.port}”) private String port;

    @Value(“${spring.redis.password}”) private String password;

    @Bean public RedissonClient getRedissonClient(){

    1. //创建配置
    2. Config config=new Config();
    3. config.useSingleServer().setAddress("redis://" + host + ":" + port);
    4. config.useSingleServer().setIdleConnectionTimeout(10000);
    5. config.useSingleServer().setPingTimeout(1000);
    6. config.useSingleServer().setConnectTimeout(10000);
    7. config.useSingleServer().setTimeout(3000);
    8. config.useSingleServer().setRetryAttempts(3);
    9. config.useSingleServer().setRetryInterval(1500);
    10. config.useSingleServer().setReconnectionTimeout(3000);
    11. config.useSingleServer().setFailedAttempts(3);
    12. config.useSingleServer().setClientName(null);
    13. config.useSingleServer().setSubscriptionConnectionMinimumIdleSize(1);
    14. config.useSingleServer().setSubscriptionConnectionPoolSize(10);
    15. config.useSingleServer().setSubscriptionsPerConnection(5);
    16. config.useSingleServer().setConnectionMinimumIdleSize(4);
    17. config.useSingleServer().setConnectionPoolSize(20);
    18. return Redisson.create(config);

    } }

    1. ```
    2. RLock lock = redisson.getLock(getLockName(Thread.currentThread().getId()));
    3. // 1. 最常见的使用方法
    4. //lock.lock();
    5. // 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁
    6. //lock.lock(10, TimeUnit.SECONDS);
    7. // 3. 尝试加锁,最多等待2秒,2秒内拿不到锁就是失败,上锁以后8秒自动解锁
    8. try {
    9. boolean res = lock.tryLock(2, 10, TimeUnit.MINUTES);
    10. System.out.println(lock.getHoldCount());
    11. // boolean res2 = lock.tryLock();//重入锁测试
    12. System.out.println(lock.getHoldCount());
    13. if (res) {//加锁成功
    14. //业务处理
    15. }
    16. } catch (InterruptedException e) {
    17. e.printStackTrace();
    18. } finally {
    19. //释放锁
    20. lock.unlock();
    21. }

    实现方案三:

    Zookeeper

    总结