Redisson 分布式锁优化

Redis06 分布式锁优化 - 图1

Redis06 分布式锁优化 - 图2

  1. https://redisson.org/
  2. https://github.com/redisson/redisson
  3. ## github 中文
  4. https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

依赖于配置

Redis06 分布式锁优化 - 图3

  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson</artifactId>
  4. <version>3.17.1</version>
  5. </dependency>

Redis06 分布式锁优化 - 图4

测试

Redis06 分布式锁优化 - 图5

可重入

Redis06 分布式锁优化 - 图6

Redis06 分布式锁优化 - 图7

Redis06 分布式锁优化 - 图8

Redis06 分布式锁优化 - 图9

Redis06 分布式锁优化 - 图10

主从一致性

Redis06 分布式锁优化 - 图11

异步秒杀优化

Redis06 分布式锁优化 - 图12

  1. - 思路
  2. 1. 将库存信息存放至redis [size : 100] 用户下单时将库存减一 [decr : 1] 保证库存安全
  3. 2. 将购买过思维用户id存入 redis set 集合中 j
  1. -- 如果set集合中不存在就添加 返回一
  2. if(redis.call('sismember',KEYS[1],ARGV[1]) == 0) then redis.call('sadd',KEYS[1],ARGV[1]) return 1 end return 0
  3. -- 注意 then 后面的冒号在redis EVAL 中不需要 但是时LUA读起来更加规范
  4. -- 判断value的值是否大于0 就减一
  5. local size = redis.call('get',KEYS[1]) if(tonumber(size) > 0) then redis.call('decr',KEYS[1]) return 1 end return 0

Redis06 分布式锁优化 - 图13

代码1
  1. -- 1.参数列表
  2. -- 1.1.优惠券id
  3. local voucherId = ARGV[1]
  4. -- 1.2.用户id
  5. local userId = ARGV[2]
  6. -- 2.数据key
  7. -- 2.1.库存key
  8. local stockKey = 'seckill:stock:' .. voucherId
  9. -- 2.2.订单key
  10. local orderKey = 'seckill:order:' .. voucherId
  11. -- 3.脚本业务
  12. -- 3.1.判断库存是否充足 get stockKey
  13. if(tonumber(redis.call('get', stockKey)) <= 0) then
  14. -- 3.2.库存不足,返回1
  15. return 1
  16. end
  17. -- 3.2.判断用户是否下单 SISMEMBER orderKey userId
  18. if(redis.call('sismember', orderKey, userId) == 1) then
  19. -- 3.3.存在,说明是重复下单,返回2
  20. return 2
  21. end
  22. -- 3.4.扣库存 incrby stockKey -1
  23. redis.call('incrby', stockKey, -1)
  24. -- 3.5.下单(保存用户)sadd orderKey userId
  25. redis.call('sadd', orderKey, userId)
  26. return 0

对应的java业务层

  1. private final IVoucherService voucherService;
  2. private final ISeckillVoucherService seckillVoucherService;
  3. private final RedisIdWorker idWorker;
  4. private final StringRedisTemplate stringRedisTemplate;
  5. private final RedissonClient redissonClient;
  6. private final static DefaultRedisScript<Long> SECKILL_SCRIPT;
  7. static {
  8. SECKILL_SCRIPT = new DefaultRedisScript();
  9. SECKILL_SCRIPT.setResultType(Long.class);
  10. SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
  11. }
  12. private BlockingQueue<VoucherOrder> taskQueue = new LinkedBlockingQueue<>(1024 * 1024);
  13. private static final ExecutorService SCKILL_SERVICE = Executors.newSingleThreadExecutor();
  14. // 当前类初始化完毕执行
  15. @PostConstruct
  16. private void init() {
  17. // 提交线程任务
  18. SCKILL_SERVICE.submit(new VoucherOrderHandler());
  19. }
  20. @Override
  21. public Result seckillVoucher(Long voucherId) {
  22. Long userId = UserHolder.getUser().getId();
  23. Long result = stringRedisTemplate.execute(
  24. SECKILL_SCRIPT,
  25. Collections.emptyList(),
  26. voucherId.toString(),
  27. userId.toString()
  28. );
  29. int resultValue = result.intValue();
  30. if (resultValue == 1) {
  31. return Result.fail("已售空");
  32. }
  33. if (resultValue == 2) {
  34. return Result.fail("您已购买过此卷");
  35. }
  36. if (resultValue != 0) {
  37. return Result.fail("服务器错误");
  38. }
  39. // 将订单加入阻塞队列 异步执行
  40. long order = idWorker.nextId("order");
  41. VoucherOrder voucherOrder = new VoucherOrder();
  42. voucherOrder.setId(order);
  43. voucherOrder.setUserId(userId);
  44. voucherOrder.setVoucherId(voucherId);
  45. voucherOrder.setStatus(1);
  46. voucherOrder.setUpdateTime(LocalDateTime.now());
  47. voucherOrder.setCreateTime(LocalDateTime.now());
  48. voucherOrder.setPayType(1);
  49. // 将任务加入任务队列
  50. taskQueue.add(voucherOrder);
  51. return Result.ok("订单号\t" + order);
  52. }
  53. private class VoucherOrderHandler implements Runnable {
  54. @Override
  55. public void run() {
  56. while (true) {
  57. try {
  58. // take 会阻塞
  59. VoucherOrder voucherOrder = taskQueue.take();
  60. // 创建订单
  61. createVoucherOrder(voucherOrder);
  62. } catch (InterruptedException e) {
  63. e.printStackTrace();
  64. log.error("error", e);
  65. }
  66. }
  67. }
  68. private void createVoucherOrder(VoucherOrder voucherOrder) {
  69. Long userId = voucherOrder.getUserId();
  70. Long voucherId = voucherOrder.getVoucherId();
  71. RLock redisLock = redissonClient.getLock("lock:order" + userId);
  72. // 注意 这里锁的对象还是用户id解决一人一单问题
  73. boolean isLock = redisLock.tryLock();
  74. if (!isLock) {
  75. // TODO 注意 这里的锁对象是不同 进程 同一个 用户 的id 不同用户不会产生争抢锁现象 只会判断 where stock > 0
  76. log.error("不允许重复下单");
  77. return;
  78. }
  79. try {
  80. int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  81. if (count > 0) {
  82. log.error("不允许重复下单");
  83. return;
  84. }
  85. boolean update = seckillVoucherService.update()
  86. .setSql("stock = stock -1")
  87. .eq("voucher_id", voucherId)
  88. .gt("stock", 0)
  89. .update();
  90. if (!update) {
  91. log.error("库存不足");
  92. return;
  93. }
  94. // 保存订单
  95. save(voucherOrder);
  96. } finally {
  97. redisLock.unlock();
  98. }
  99. }
  100. }

Redis06 分布式锁优化 - 图14

消息队列优化

Redis06 分布式锁优化 - 图15

Redis06 分布式锁优化 - 图16

1 基于List

Redis06 分布式锁优化 - 图17

Redis06 分布式锁优化 - 图18

2 PubSub

Redis06 分布式锁优化 - 图19

Redis06 分布式锁优化 - 图20

3 Stream

Redis06 分布式锁优化 - 图21

Redis06 分布式锁优化 - 图22

Redis06 分布式锁优化 - 图23

Redis06 分布式锁优化 - 图24

Redis06 分布式锁优化 - 图25

Redis06 分布式锁优化 - 图26

Redis06 分布式锁优化 - 图27

Redis06 分布式锁优化 - 图28

  1. # 创建一个队列
  2. XADD s1 * k1 v1 k2 v2
  3. # 在这个队列读取消息 从0 开始 4 条
  4. XREAD COUNT 4 BLOCK 2000 STREAMS s1 0
  5. XREAD COUNT 4 BLOCK 0 STREAMS s1 $
  6. # 根据队列创建一个消费者组
  7. XGROUP CREATE s1 wjl 0
  8. # 读取未被消费过的消息
  9. XREADGROUP GROUP wjl c1 COUNT 10 BLOCK 0 STREAMS s1 >
  10. # 从0开始 读取消费过但是未确认的消息
  11. XREADGROUP GROUP wjl c1 COUNT 10 BLOCK 0 STREAMS s1 0
  12. # 确认消息
  13. XACK s1 wjl 消息id ..
  14. # s1 队列名 wjl组名 IDEL 空闲时间毫秒 - + id所有范围 10 count c1 指定消费者 每个消费者都有XPENDING
  15. 127.0.0.1:6379> XPENDING s1 wjl IDLE 1000 - + 10 c1

Redis06 分布式锁优化 - 图29

Redis06 分布式锁优化 - 图30

代码实现
  1. -- 手动创建一个队列和消费组
  2. 127.0.0.1:6379> XGROUP CREATE stream.orders g1 0 MKSTREAM
  3. OK
  4. XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 0 STREAMS stream.orders >
  1. -- 1.参数列表
  2. -- 1.1.优惠券id
  3. local voucherId = ARGV[1]
  4. -- 1.2.用户id
  5. local userId = ARGV[2]
  6. -- 1.3 订单id
  7. local orderId = ARGV[3]
  8. -- 2.数据key
  9. -- 2.1.库存key
  10. local stockKey = 'seckill:stock:' .. voucherId
  11. -- 2.2.订单key
  12. local orderKey = 'seckill:order:' .. voucherId
  13. -- 3.脚本业务
  14. -- 3.1.判断库存是否充足 get stockKey
  15. if(tonumber(redis.call('get', stockKey)) <= 0) then
  16. -- 3.2.库存不足,返回1
  17. return 1
  18. end
  19. -- 3.2.判断用户是否下单 SISMEMBER orderKey userId
  20. if(redis.call('sismember', orderKey, userId) == 1) then
  21. -- 3.3.存在,说明是重复下单,返回2
  22. return 2
  23. end
  24. -- 3.4.扣库存 incrby stockKey -1
  25. redis.call('incrby', stockKey, -1)
  26. -- 3.5.下单(保存用户)sadd orderKey userId
  27. redis.call('sadd', orderKey, userId)
  28. -- 发送消息到队列中 [stream.orders]
  29. redis.call('xadd','stream.orders','*','userId',userId,'id',voucherId,'orderId',orderId)
  30. return 0
  31. -- redis中声明一个队列和消费组
  1. private final IVoucherService voucherService;
  2. private final ISeckillVoucherService seckillVoucherService;
  3. private final RedisIdWorker idWorker;
  4. private final StringRedisTemplate stringRedisTemplate;
  5. private final RedissonClient redissonClient;
  6. private final static DefaultRedisScript<Long> SECKILL_SCRIPT;
  7. static {
  8. SECKILL_SCRIPT = new DefaultRedisScript();
  9. SECKILL_SCRIPT.setResultType(Long.class);
  10. SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
  11. }
  12. private static final ExecutorService SCKILL_SERVICE = Executors.newSingleThreadExecutor();
  13. // 当前类初始化完毕执行
  14. @PostConstruct
  15. private void init() {
  16. // 提交线程任务
  17. SCKILL_SERVICE.submit(new VoucherOrderHandler());
  18. }
  19. private class VoucherOrderHandler implements Runnable {
  20. @Override
  21. public void run() {
  22. String streamQueue = "stream.orders";
  23. while (true) {
  24. try {
  25. // take 会阻塞
  26. // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
  27. List<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(
  28. Consumer.from("g1", "c1"),
  29. StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
  30. // ReadOffset.lastConsumed() == '>'
  31. StreamOffset.create(streamQueue, ReadOffset.lastConsumed())
  32. );
  33. if (mapRecords == null || mapRecords.isEmpty()) {
  34. continue;
  35. }
  36. MapRecord<String, Object, Object> entries = mapRecords.get(0);
  37. Map<Object, Object> entriesValue = entries.getValue();
  38. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(entriesValue, new VoucherOrder(), true);
  39. // 消息确认
  40. stringRedisTemplate.opsForStream().acknowledge(streamQueue,"g1",entries.getId());
  41. // 创建订单
  42. createVoucherOrder(voucherOrder);
  43. } catch (Exception e) {
  44. log.error("error", e);
  45. handlePendingList();
  46. }
  47. }
  48. }
  49. private void handlePendingList() {
  50. String streamQueue = "stream.orders";
  51. while (true) {
  52. try {
  53. // take 会阻塞
  54. // XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders 0
  55. List<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(
  56. Consumer.from("g1", "c1"),
  57. StreamReadOptions.empty().count(1),// 处理出异常集合不需要阻塞
  58. // ReadOffset.lastConsumed() == '>'
  59. StreamOffset.create(streamQueue, ReadOffset.from("0"))
  60. );
  61. if (mapRecords == null || mapRecords.isEmpty()) {
  62. // 如果为null 说明pending-list没有异常消息 结束循环
  63. break;
  64. }
  65. MapRecord<String, Object, Object> entries = mapRecords.get(0);
  66. Map<Object, Object> entriesValue = entries.getValue();
  67. VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(entriesValue, new VoucherOrder(), true);
  68. // 消息确认
  69. stringRedisTemplate.opsForStream().acknowledge(streamQueue,"g1",entries.getId());
  70. // 创建订单
  71. createVoucherOrder(voucherOrder);
  72. } catch (Exception e) {
  73. log.error("处理pending-list订单异常", e);
  74. }
  75. }
  76. }
  77. private void createVoucherOrder(VoucherOrder voucherOrder) {
  78. Long userId = voucherOrder.getUserId();
  79. Long voucherId = voucherOrder.getVoucherId();
  80. RLock redisLock = redissonClient.getLock("lock:order" + userId);
  81. // 注意 这里锁的对象还是用户id解决一人一单问题
  82. boolean isLock = redisLock.tryLock();
  83. if (!isLock) {
  84. // TODO 注意 这里的锁对象是不同 进程 同一个 用户 的id 不同用户不会产生争抢锁现象 只会判断 where stock > 0
  85. log.error("不允许重复下单");
  86. return;
  87. }
  88. try {
  89. int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
  90. if (count > 0) {
  91. log.error("不允许重复下单");
  92. return;
  93. }
  94. boolean update = seckillVoucherService.update()
  95. .setSql("stock = stock -1")
  96. .eq("voucher_id", voucherId)
  97. .gt("stock", 0)
  98. .update();
  99. if (!update) {
  100. log.error("库存不足");
  101. return;
  102. }
  103. // 保存订单
  104. save(voucherOrder);
  105. } finally {
  106. redisLock.unlock();
  107. }
  108. }
  109. }
  110. @Override
  111. public Result seckillVoucher(Long voucherId) {
  112. Long userId = UserHolder.getUser().getId();
  113. // 订单id
  114. long orderId = idWorker.nextId("order");
  115. Long result = stringRedisTemplate.execute(
  116. SECKILL_SCRIPT,
  117. Collections.emptyList(),
  118. voucherId.toString(),
  119. userId.toString(),
  120. orderId
  121. );
  122. int resultValue = result.intValue();
  123. if (resultValue == 1) {
  124. return Result.fail("已售空");
  125. }
  126. if (resultValue == 2) {
  127. return Result.fail("您已购买过此卷");
  128. }
  129. if (resultValue != 0) {
  130. return Result.fail("服务器错误");
  131. }
  132. return Result.ok("订单号\t" + orderId);
  133. }