• 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
  • 高可用的获取锁与释放锁
  • 高性能的获取锁与释放锁
  • 具备可重入特性
  • 具备锁失效机制,防止死锁
  • 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

数据库实现分布式锁

基于数据库实现分布式锁的核心思想是:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引。想要执行某个方法,首先需要将这个方法名插入表中,成功插入则获取锁,执行完成后删除对应的行数据释放锁。此种方式就是建立在数据库唯一索引的特性基础上的。
表结构如下:
image.png

  • 存在问题:

1、因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;
2、不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;
3、没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;
4、不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。
5、在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。

Zookeeper实现分布式锁

Zookeeper的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做Znode。
Zookeeper中节点分为4种类型:
1.持久节点 (PERSISTENT)
默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在
2.持久顺序节点(PERSISTENT_SEQUENTIAL)
所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号
3.临时节点(EPHEMERAL)
和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除
4.临时顺序节点(EPHEMERAL_SEQUENTIAL)
顾名思义,临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与zookeeper断开连接后,临时节点会被删除

Zookeeper实现分布式锁的原理是基于Zookeeper的临时顺序节点

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。

  1. 客户端获取锁时,在lock节点下创建临时顺序节点。
  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
  3. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
  4. 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作,其中就包括分布式锁的实现。

  • 在Curator中有五种锁方案:
  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量 ```java // 配置Zookeeper @Configuration public class ZkConfig { @Bean public CuratorFramework curatorFramework(){
    1. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
    2. CuratorFramework client = CuratorFrameworkFactory.builder()
    3. .connectString("localhost:2181")
    4. .sessionTimeoutMs(5000)
    5. .connectionTimeoutMs(5000)
    6. .retryPolicy(retryPolicy)
    7. .build();
    8. client.start();
    9. return client;
    } }

// 改造Controller @Autowired private CuratorFramework curatorFramework;

@GetMapping(“/stock”) public String stock() { InterProcessMutex mutex = new InterProcessMutex(curatorFramework,”/mylock”);

  1. try {
  2. //尝试获得锁
  3. boolean locked = mutex.acquire(0, TimeUnit.SECONDS);
  4. if(locked){
  5. int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
  6. if(stock > 0){
  7. stock --;
  8. redisTemplate.opsForValue().set("stock",stock+"");
  9. System.out.println("库存扣减成功,剩余库存:" + stock);
  10. }else {
  11. System.out.println("库存不足!!!");
  12. }
  13. //释放锁
  14. mutex.release();
  15. }else{
  16. System.out.println("没有获取锁,不能执行减库存操作!!!");
  17. }
  18. }catch (Exception ex){
  19. System.out.println("出现异常!!!");
  20. }
  21. return "OK";

}

  1. <a name="lsKBl"></a>
  2. # Redis实现分布式锁
  3. Redis实现分布式锁比较简单,就是调用Redis的**set**命令设置值,能够设置成功则表示加锁成功,即获得锁,通过调用del命令来删除设置的键值,即释放锁。
  4. 需要注意:<br />1、防止死锁:可以在加锁时设置一个过期时间防止死锁。加锁命令:**set** lock_key lock_value **NX** **PX** 5000 解锁命令:**del** lock_key<br />2、加锁与解锁的线程必须是同一个:就是加锁和解锁必须是同一个客户端,所以在加锁时可以设置当前线程id,在释放锁时判断是否为当前线程加的锁,如果是再释放锁即可
  5. ```java
  6. @GetMapping("/stock")
  7. public String stock() {
  8. try {
  9. String threadId = Thread.currentThread().getId()+"";
  10. //尝试加锁
  11. Boolean locked = redisTemplate.opsForValue().setIfAbsent("mylock",threadId,5000,TimeUnit.MILLISECONDS);
  12. if(locked){//加锁成功
  13. int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
  14. if(stock > 0){
  15. stock --;
  16. redisTemplate.opsForValue().set("stock",stock+"");
  17. System.out.println("库存扣减成功,剩余库存:" + stock);
  18. }else {
  19. System.out.println("库存不足!!!");
  20. }
  21. String myValue = redisTemplate.opsForValue().get("mylock");
  22. if(threadId.equals(myValue)){
  23. //释放锁
  24. redisTemplate.delete("mylock");
  25. }
  26. }else{
  27. System.out.println("没有获取锁,不能执行减库存操作!!!");
  28. }
  29. }catch (Exception ex){
  30. System.out.println("出现异常!!!");
  31. }
  32. return "OK";
  33. }
  • Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优势,基于Java实用工具包中常用接口,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。

Redisson已经内置提供了基于Redis的分布式锁实现,此种方式是我们推荐的分布式锁使用方式。

Redission解决的问题:
1、锁续期:若当前线程在key的过期时间内没有执行完毕,则会自动延长key的过期时间
2、可重入性:类似于AQS机制,提供count属性来实现锁重入
3、锁丢失:master节点发生主从切换可能导致锁丢失。Redission提供了redlock机制,从多个节点分别获取锁,当一半以上的节点获取成功,锁才算获取成功

// 编写配置类
@Configuration
public class RedissonConfig {
    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password}")
    private String password;

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":" + port);
        config.useSingleServer().setPassword(password);
        final RedissonClient client = Redisson.create(config);
        return client;
    }
}


// 改造Controller
@Autowired
private RedissonClient redissonClient;

@GetMapping("/stock")
public String stock() {
    //获得分布式锁对象,注意,此时还没有加锁成功
    RLock lock = redissonClient.getLock("mylock");
    try {
        //尝试加锁,如果加锁成功则后续程序继续执行,如果加锁不成功则阻塞等待
        lock.lock(5000,TimeUnit.MILLISECONDS);

        int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock --;
            redisTemplate.opsForValue().set("stock",stock+"");
            System.out.println("库存扣减成功,剩余库存:" + stock);
        }else {
            System.out.println("库存不足!!!");
        }
    }catch (Exception ex){
        System.out.println("出现异常!!!");
    }finally {
        //解锁
        lock.unlock();
    }

    return "OK";
}