什么是分布式锁

单体应用锁:在一个JVM进程内有效,无法跨JVM、跨进程

分布式锁:可以跨越多个JVM、跨越多个进程的锁

分布式锁的设计思路

由于Tomcat是由Java启动的,所以每个Tomcat可以看成一个JVM,JVM内部的锁是无法跨越多个进程的。所以,我们要实现分布式锁,我们只能在这些JVM之外去寻找,通过其他的组件来实现分布式锁。系统的架构如图所示:
分布式锁学习 - 图1

两个Tomcat通过第三方的组件实现跨JVM、跨进程的分布式锁。这就是分布式锁的解决思路,找到所有JVM可以共同访问的第三方组件,通过第三方组件实现分布式锁。

分布式锁方案

分布式锁都是通过第三方组件来实现的,目前比较流行的分布式锁的解决方案有:

  • 数据库,通过数据库可以实现分布式锁,但是在高并发的情况下对数据库压力较大,所以很少使用。
  • Redis,借助Redis也可以实现分布式锁,而且Redis的Java客户端种类很多,使用的方法也不尽相同。
  • Zookeeper,Zookeeper也可以实现分布式锁,同样Zookeeper也存在多个Java客户端,使用方法也不相同。

超卖

  • 什么是超卖?

    • 商品卖出数量超过库存数量
  • 超卖现象一

    • 系统中库存1,但是产生两笔订单

    • 商品存库1,A和B同时看到商品,加入购物车,同时提交订单

    • 产生原因:

      • 扣减库存的动作,在程序中进行,在程序中计算剩余库存
        image-20200430132741687
    • 解决方法:

      • 扣减库存不在程序中进行,而是通过数据库
      • 向数据库传递库存增量,扣减一个库存,增量为-1
      • 在数据库update语句计算库存,通过update行锁解决并发
  • 超卖现象二

    • 系统中库存变为-1

    • 卖家不知所措,询问平台客服

    • 产生原因:

      • 并发检验库存,造成库存充足的假象

      • update更新库存,导致库存为负数 分布式锁学习 - 图3

    • 解决方法:

      • 校验库存、扣减库存统一加锁
      • 使之成为原子性的操作
      • 并发时,只有获得锁的线程才能校验、扣减库存
      • 扣减库存后,释放锁
      • 确保库存不会扣成负数
    • 基于Synchronized锁解决超卖问题(最原始的锁)

    • 基于ReentrantLock锁解决并发超卖问题(并发包中的锁)

基于数据库悲观锁的分布式锁

  • 多个进程、多个线程访问共同组件数据库

  • 通过select…….for update访问一条数据

  • for update锁定数据,其他线程只能等待

    1. <select id="selectDistributeLock" resultType="com.yy.distributelock.model.DistributeLock">
    2. select * from distribute_lock
    3. where business_code = #{businessCode,jdbcType=VARCHAR}
    4. for update
    5. </select>
  1. @RequestMapping("dbLock")
  2. @Transactional(rollbackFor = Exception.class)
  3. public String dbLock() throws Exception {
  4. log.info("我进入了方法!");
  5. DistributeLock distributeLock = distributeLockMapper.selectDistributeLock("demo");
  6. if (distributeLock==null) throw new Exception("分布式锁找不到");
  7. log.info("我进入了锁!");
  8. try {
  9. Thread.sleep(20000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. return "我已经执行完成!";
  14. }
  • 基于数据库设计分布式锁优缺点

    • 优点:简单方便、易于理解、易于操作
    • 缺点:并发量大时,对数据库压力较大
    • 建议:作为锁的数据库与业务数据库分开

基于Redis的Setnx实现分布式锁

  • 实现原理

    • 获取锁的Redis命令

    • SET key my_random_value NX PX 30000

      • key:资源名称,可根据不同的业务区分不同的锁
      • my_random_value:随机值,每个线程的随机值都不同,用于释放锁时的校验
      • NX:key不存在时设置成功,key存在时设置不成功
      • PX:自动失效的时间,出现异常情况,锁可以过期失效
    • 利用NX的原子性,多个线程并发时,只有一个线程可以设置成功(因为Redis单线程)

    • 设置成功即获得锁,可以执行后续的业务处理

    • 如果出现异常,过了锁的有效期,锁自动释放

    • 释放锁采用Redis的delete命令

    • 锁释放时校验之前设置的随机数,相同才能释放

    • 释放锁的LUA脚本

      1. if redis.call("get",KEYS[1]) == ARGV[1] then
      2. return redis.call("del",KEYS[1])
      3. else
      4. return 0
      5. end
  • 代码实现
    1. @RequestMapping("redisLock")
    2. public String redisLock(){
    3. log.info("我进入了方法!");
    4. try (RedisLock redisLock = new RedisLock(redisTemplate,"redisKey",30)){
    5. if (redisLock.getLock()) {
    6. log.info("我进入了锁!!");
    7. Thread.sleep(15000);
    8. }
    9. } catch (InterruptedException e) {
    10. e.printStackTrace();
    11. } catch (Exception e) {
    12. e.printStackTrace();
    13. }
    14. log.info("方法执行完成");
    15. return "方法执行完成";
    16. }
  1. @Slf4j
  2. public class RedisLock implements AutoCloseable {
  3. private RedisTemplate redisTemplate;
  4. private String key;
  5. private String value;
  6. //单位:秒
  7. private int expireTime;
  8. public RedisLock(RedisTemplate redisTemplate,String key,int expireTime){
  9. this.redisTemplate = redisTemplate;
  10. this.key = key;
  11. this.expireTime=expireTime;
  12. this.value = UUID.randomUUID().toString();
  13. }
  14. /**
  15. * 获取分布式锁
  16. * @return
  17. */
  18. public boolean getLock(){
  19. RedisCallback<Boolean> redisCallback = connection -> {
  20. //设置NX
  21. RedisStringCommands.SetOption setOption = RedisStringCommands.SetOption.ifAbsent();
  22. //设置过期时间
  23. Expiration expiration = Expiration.seconds(expireTime);
  24. //序列化key
  25. byte[] redisKey = redisTemplate.getKeySerializer().serialize(key);
  26. //序列化value
  27. byte[] redisValue = redisTemplate.getValueSerializer().serialize(value);
  28. //执行setnx操作
  29. Boolean result = connection.set(redisKey, redisValue, expiration, setOption);
  30. return result;
  31. };
  32. //获取分布式锁
  33. Boolean lock = (Boolean)redisTemplate.execute(redisCallback);
  34. return lock;
  35. }
  36. /**
  37. * 释放分布式锁
  38. */
  39. public boolean unLock() {
  40. String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
  41. " return redis.call(\"del\",KEYS[1])\n" +
  42. "else\n" +
  43. " return 0\n" +
  44. "end";
  45. RedisScript<Boolean> redisScript = RedisScript.of(script,Boolean.class);
  46. List<String> keys = Arrays.asList(key);
  47. Boolean result = (Boolean)redisTemplate.execute(redisScript, keys, value);
  48. log.info("释放锁的结果:"+result);
  49. return result;
  50. }
  51. @Override
  52. public void close() throws Exception {
  53. unLock();
  54. }
  55. }

基于Zookeeper的瞬时节点实现分布式锁

  • Zookeeper的数据结构 分布式锁学习 - 图4

  • Zookeeper的下载安装

  • Zookeeper的观察器 分布式锁学习 - 图5

  • 实现原理

    • 利用Zookeeper的瞬时有序节点的特性

    • 多线程并发创建瞬时节点时,得到有序的序列

    • 序号最小的线程获得锁

    • 其他的线程则监听自己序号的前一个序号

    • 前一个线程执行完成,删除自己序号的节点

    • 下一个序号的线程得到通知,继续执行

    • 以此类推

    • 创建节点时,已经确定了线程的执行顺序 分布式锁学习 - 图6

  • 代码实现

    1. @Slf4j
    2. public class ZkLock implements AutoCloseable, Watcher {
    3. private ZooKeeper zooKeeper;
    4. private String znode;
    5. public ZkLock() throws IOException {
    6. this.zooKeeper = new ZooKeeper("localhost:2181",
    7. 10000,this);
    8. }
    9. public boolean getLock(String businessCode) {
    10. try {
    11. //创建业务 根节点 持久节点
    12. Stat stat = zooKeeper.exists("/" + businessCode, false);
    13. if (stat==null){
    14. zooKeeper.create("/" + businessCode,businessCode.getBytes(),
    15. ZooDefs.Ids.OPEN_ACL_UNSAFE,
    16. CreateMode.PERSISTENT);
    17. }
    18. //创建瞬时有序节点 /order/order_00000001
    19. znode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(),
    20. ZooDefs.Ids.OPEN_ACL_UNSAFE,
    21. CreateMode.EPHEMERAL_SEQUENTIAL);
    22. //获取业务节点下 所有的子节点
    23. List<String> childrenNodes = zooKeeper.getChildren("/" + businessCode, false);
    24. //子节点排序
    25. Collections.sort(childrenNodes);
    26. //获取序号最小的(第一个)子节点
    27. String firstNode = childrenNodes.get(0);
    28. //如果创建的节点是第一个子节点,则获得锁
    29. if (znode.endsWith(firstNode)){
    30. return true;
    31. }
    32. //不是第一个子节点,则监听前一个节点
    33. String lastNode = firstNode;
    34. for (String node:childrenNodes){
    35. if (znode.endsWith(node)){
    36. zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
    37. break;
    38. }else {
    39. lastNode = node;
    40. }
    41. }
    42. synchronized (this){
    43. wait();
    44. }
    45. return true;
    46. } catch (Exception e) {
    47. e.printStackTrace();
    48. }
    49. return false;
    50. }
    51. @Override
    52. public void close() throws Exception {
    53. zooKeeper.delete(znode,-1);
    54. zooKeeper.close();
    55. log.info("我已经释放了锁!");
    56. }
    57. @Override
    58. public void process(WatchedEvent event) {
    59. if (event.getType() == Event.EventType.NodeDeleted){
    60. synchronized (this){
    61. notify();
    62. }
    63. }
    64. }
    65. }
  1. @RequestMapping("zkLock")
  2. public String zookeeperLock(){
  3. log.info("我进入了方法!");
  4. try (ZkLock zkLock = new ZkLock()) {
  5. if (zkLock.getLock("order")){
  6. log.info("我获得了锁");
  7. Thread.sleep(10000);
  8. }
  9. } catch (IOException e) {
  10. e.printStackTrace();
  11. } catch (Exception e) {
  12. e.printStackTrace();
  13. }
  14. log.info("方法执行完成!");
  15. return "方法执行完成!";
  16. }

基于Zookeeper的Curator客户端实现分布式锁

  • 引入curator客户端
    1. <dependency>
    2. <groupId>org.apache.curator</groupId>
    3. <artifactId>curator-recipes</artifactId>
    4. <version>4.2.0</version>
    5. </dependency>
  • curator已经实现了分布式锁的方法

  • 直接调用即可

    1. @SpringBootApplication
    2. public class DistributeZkLockApplication {
    3. public static void main(String[] args) {
    4. SpringApplication.run(DistributeZkLockApplication.class, args);
    5. }
    6. @Bean(initMethod="start",destroyMethod = "close")
    7. public CuratorFramework getCuratorFramework() {
    8. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    9. CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
    10. return client;
    11. }
    12. }
  1. @Autowired
  2. private CuratorFramework client;
  3. @RequestMapping("curatorLock")
  4. public String curatorLock(){
  5. log.info("我进入了方法!");
  6. InterProcessMutex lock = new InterProcessMutex(client, "/order");
  7. try{
  8. if (lock.acquire(30, TimeUnit.SECONDS)){
  9. log.info("我获得了锁!!");
  10. Thread.sleep(10000);
  11. }
  12. } catch (IOException e) {
  13. e.printStackTrace();
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. }finally {
  17. try {
  18. log.info("我释放了锁!!");
  19. lock.release();
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. log.info("方法执行完成!");
  25. return "方法执行完成!";
  26. }

基于Redis的Redisson客户端实现分布式锁

  • 引入Redisson的jar包

  • 进行Redisson与Redis的配置

  • 使用分布式锁

  • 通过JAVA API方式引入Redisson

    1. <dependency>
    2. <groupId>org.redisson</groupId>
    3. <artifactId>redisson</artifactId>
    4. <version>3.11.2</version>
    5. </dependency>
  1. public void testRedissonLock() {
  2. Config config = new Config();
  3. config.useSingleServer().setAddress("redis://192.168.73.130:6379");
  4. RedissonClient redisson = Redisson.create(config);
  5. RLock rLock = redisson.getLock("order");
  6. try {
  7. rLock.lock(30, TimeUnit.SECONDS);
  8. log.info("我获得了锁!!!");
  9. Thread.sleep(10000);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }finally {
  13. log.info("我释放了锁!!");
  14. rLock.unlock();
  15. }
  16. }
  • Spring项目引入Redisson
    1. <dependency>
    2. <groupId>org.redisson</groupId>
    3. <artifactId>redisson</artifactId>
    4. <version>3.11.2</version>
    5. </dependency>
  1. @SpringBootApplication
  2. @ImportResource("classpath*:redisson.xml")
  3. public class RedissonLockApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(RedissonLockApplication.class, args);
  6. }
  7. }
  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:context="http://www.springframework.org/schema/context"
  4. xmlns:redisson="http://redisson.org/schema/redisson"
  5. xsi:schemaLocation="
  6. http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans.xsd
  8. http://www.springframework.org/schema/context
  9. http://www.springframework.org/schema/context/spring-context.xsd
  10. http://redisson.org/schema/redisson
  11. http://redisson.org/schema/redisson/redisson.xsd
  12. ">
  13. <redisson:client>
  14. <redisson:single-server address="redis://192.168.73.130:6379"/>
  15. </redisson:client>
  16. </beans>
  • Spring Boot项目引入Redisson
    1. <dependency>
    2. <groupId>org.redisson</groupId>
    3. <artifactId>redisson-spring-boot-starter</artifactId>
    4. <version>3.11.2</version>
    5. </dependency>
  1. spring.redis.host=192.168.73.130
  1. @Autowired
  2. private RedissonClient redisson;
  3. @RequestMapping("redissonLock")
  4. public String redissonLock() {
  5. RLock rLock = redisson.getLock("order");
  6. log.info("我进入了方法!!");
  7. try {
  8. rLock.lock(30, TimeUnit.SECONDS);
  9. log.info("我获得了锁!!!");
  10. Thread.sleep(10000);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }finally {
  14. log.info("我释放了锁!!");
  15. rLock.unlock();
  16. }
  17. log.info("方法执行完成!!");
  18. return "方法执行完成!!";
  19. }

基于分布式锁解决定时任务重复问题

  1. @Service
  2. @Slf4j
  3. public class SchedulerService {
  4. @Autowired
  5. private RedisTemplate redisTemplate;
  6. @Scheduled(cron = "0/5 * * * * ?")
  7. public void sendSms(){
  8. try(RedisLock redisLock = new RedisLock(redisTemplate,"autoSms",30)) {
  9. if (redisLock.getLock()){
  10. log.info("向138xxxxxxxx发送短信!");
  11. }
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }

分布式锁实现方案对比

分布式锁学习 - 图7

分布式锁学习 - 图8

分布式锁技术落地-应用到天天吃货

  • 引入Redisson pom依赖
    1. <!-- 分布式锁【1】引入 redisson 依赖 -->
    2. <dependency>
    3. <groupId>org.redisson</groupId>
    4. <artifactId>redisson-spring-boot-starter</artifactId>
    5. <version>3.12.0</version>
    6. </dependency>
  • 引入RedissonClient客户端依赖
    1. //分布式锁【2】自动注入
    2. @Autowired
    3. private RedissonClient redissonClient;
  • 加锁

    1. /**
    2. * 分布式锁【3】 编写业务代码
    3. * 1、Redisson是基于Redis,使用Redisson之前,项目必须使用Redis
    4. * 2、注意getLock方法中的参数,以specId作为参数,每个specId一个key,和
    5. * 数据库中的行锁是一致的,不会是方法级别的锁
    6. */
    7. RLock rLock = redissonClient.getLock("SPECID_" + specId);
    8. try {
    9. /**
    10. * 1、获取分布式锁,锁的超时时间是5秒get
    11. * 2、获取到了锁,进行后续的业务操作
    12. */
    13. rLock.lock(5, TimeUnit.HOURS);
    14. int result = itemsMapperCustom.decreaseItemSpecStock(specId, buyCounts);
    15. if (result != 1) {
    16. throw new RuntimeException("订单创建失败,原因:库存不足!");
    17. }
    18. } catch (Exception e) {
    19. log.error(e.getMessage(), e);
    20. throw new RuntimeException(e.getMessage(), e);
    21. } finally {
    22. /**
    23. * 不管业务是否操作正确,随后都要释放掉分布式锁
    24. * 如果不释放,过了超时时间也会自动释放
    25. */
    26. rLock.unlock();
    27. }