Redisson 分布式锁优化


https://redisson.org/https://github.com/redisson/redisson## github 中文https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
依赖于配置

<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.17.1</version></dependency>

测试

可重入





主从一致性

异步秒杀优化

- 思路1. 将库存信息存放至redis中 [size : 100] 用户下单时将库存减一 [decr : 1] 保证库存安全2. 将购买过思维用户id存入 redis set 集合中 j
-- 如果set集合中不存在就添加 返回一if(redis.call('sismember',KEYS[1],ARGV[1]) == 0) then redis.call('sadd',KEYS[1],ARGV[1]) return 1 end return 0-- 注意 then 后面的冒号在redis EVAL 中不需要 但是时LUA读起来更加规范-- 判断value的值是否大于0 就减一local size = redis.call('get',KEYS[1]) if(tonumber(size) > 0) then redis.call('decr',KEYS[1]) return 1 end return 0

代码1
-- 1.参数列表-- 1.1.优惠券idlocal voucherId = ARGV[1]-- 1.2.用户idlocal userId = ARGV[2]-- 2.数据key-- 2.1.库存keylocal stockKey = 'seckill:stock:' .. voucherId-- 2.2.订单keylocal orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务-- 3.1.判断库存是否充足 get stockKeyif(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2.库存不足,返回1return 1end-- 3.2.判断用户是否下单 SISMEMBER orderKey userIdif(redis.call('sismember', orderKey, userId) == 1) then-- 3.3.存在,说明是重复下单,返回2return 2end-- 3.4.扣库存 incrby stockKey -1redis.call('incrby', stockKey, -1)-- 3.5.下单(保存用户)sadd orderKey userIdredis.call('sadd', orderKey, userId)return 0
对应的java业务层
private final IVoucherService voucherService;private final ISeckillVoucherService seckillVoucherService;private final RedisIdWorker idWorker;private final StringRedisTemplate stringRedisTemplate;private final RedissonClient redissonClient;private final static DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript();SECKILL_SCRIPT.setResultType(Long.class);SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));}private BlockingQueue<VoucherOrder> taskQueue = new LinkedBlockingQueue<>(1024 * 1024);private static final ExecutorService SCKILL_SERVICE = Executors.newSingleThreadExecutor();// 当前类初始化完毕执行@PostConstructprivate void init() {// 提交线程任务SCKILL_SERVICE.submit(new VoucherOrderHandler());}@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());int resultValue = result.intValue();if (resultValue == 1) {return Result.fail("已售空");}if (resultValue == 2) {return Result.fail("您已购买过此卷");}if (resultValue != 0) {return Result.fail("服务器错误");}// 将订单加入阻塞队列 异步执行long order = idWorker.nextId("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(order);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);voucherOrder.setStatus(1);voucherOrder.setUpdateTime(LocalDateTime.now());voucherOrder.setCreateTime(LocalDateTime.now());voucherOrder.setPayType(1);// 将任务加入任务队列taskQueue.add(voucherOrder);return Result.ok("订单号\t" + order);}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// take 会阻塞VoucherOrder voucherOrder = taskQueue.take();// 创建订单createVoucherOrder(voucherOrder);} catch (InterruptedException e) {e.printStackTrace();log.error("error", e);}}}private void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();RLock redisLock = redissonClient.getLock("lock:order" + userId);// 注意 这里锁的对象还是用户id解决一人一单问题boolean isLock = redisLock.tryLock();if (!isLock) {// TODO 注意 这里的锁对象是不同 进程 同一个 用户 的id 不同用户不会产生争抢锁现象 只会判断 where stock > 0log.error("不允许重复下单");return;}try {int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {log.error("不允许重复下单");return;}boolean update = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!update) {log.error("库存不足");return;}// 保存订单save(voucherOrder);} finally {redisLock.unlock();}}}

消息队列优化


1 基于List


2 PubSub


3 Stream








# 创建一个队列XADD s1 * k1 v1 k2 v2# 在这个队列读取消息 从0 开始 4 条XREAD COUNT 4 BLOCK 2000 STREAMS s1 0XREAD COUNT 4 BLOCK 0 STREAMS s1 $# 根据队列创建一个消费者组XGROUP CREATE s1 wjl 0# 读取未被消费过的消息XREADGROUP GROUP wjl c1 COUNT 10 BLOCK 0 STREAMS s1 ># 从0开始 读取消费过但是未确认的消息XREADGROUP GROUP wjl c1 COUNT 10 BLOCK 0 STREAMS s1 0# 确认消息XACK s1 wjl 消息id ..# s1 队列名 wjl组名 IDEL 空闲时间毫秒 - + id所有范围 10 count c1 指定消费者 每个消费者都有XPENDING127.0.0.1:6379> XPENDING s1 wjl IDLE 1000 - + 10 c1


代码实现
-- 手动创建一个队列和消费组127.0.0.1:6379> XGROUP CREATE stream.orders g1 0 MKSTREAMOKXREADGROUP GROUP g1 c1 COUNT 1 BLOCK 0 STREAMS stream.orders >
-- 1.参数列表-- 1.1.优惠券idlocal voucherId = ARGV[1]-- 1.2.用户idlocal userId = ARGV[2]-- 1.3 订单idlocal orderId = ARGV[3]-- 2.数据key-- 2.1.库存keylocal stockKey = 'seckill:stock:' .. voucherId-- 2.2.订单keylocal orderKey = 'seckill:order:' .. voucherId-- 3.脚本业务-- 3.1.判断库存是否充足 get stockKeyif(tonumber(redis.call('get', stockKey)) <= 0) then-- 3.2.库存不足,返回1return 1end-- 3.2.判断用户是否下单 SISMEMBER orderKey userIdif(redis.call('sismember', orderKey, userId) == 1) then-- 3.3.存在,说明是重复下单,返回2return 2end-- 3.4.扣库存 incrby stockKey -1redis.call('incrby', stockKey, -1)-- 3.5.下单(保存用户)sadd orderKey userIdredis.call('sadd', orderKey, userId)-- 发送消息到队列中 [stream.orders]redis.call('xadd','stream.orders','*','userId',userId,'id',voucherId,'orderId',orderId)return 0-- redis中声明一个队列和消费组
private final IVoucherService voucherService;private final ISeckillVoucherService seckillVoucherService;private final RedisIdWorker idWorker;private final StringRedisTemplate stringRedisTemplate;private final RedissonClient redissonClient;private final static DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript();SECKILL_SCRIPT.setResultType(Long.class);SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));}private static final ExecutorService SCKILL_SERVICE = Executors.newSingleThreadExecutor();// 当前类初始化完毕执行@PostConstructprivate void init() {// 提交线程任务SCKILL_SERVICE.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {String streamQueue = "stream.orders";while (true) {try {// take 会阻塞// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >List<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),// ReadOffset.lastConsumed() == '>'StreamOffset.create(streamQueue, ReadOffset.lastConsumed()));if (mapRecords == null || mapRecords.isEmpty()) {continue;}MapRecord<String, Object, Object> entries = mapRecords.get(0);Map<Object, Object> entriesValue = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(entriesValue, new VoucherOrder(), true);// 消息确认stringRedisTemplate.opsForStream().acknowledge(streamQueue,"g1",entries.getId());// 创建订单createVoucherOrder(voucherOrder);} catch (Exception e) {log.error("error", e);handlePendingList();}}}private void handlePendingList() {String streamQueue = "stream.orders";while (true) {try {// take 会阻塞// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders 0List<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),// 处理出异常集合不需要阻塞// ReadOffset.lastConsumed() == '>'StreamOffset.create(streamQueue, ReadOffset.from("0")));if (mapRecords == null || mapRecords.isEmpty()) {// 如果为null 说明pending-list没有异常消息 结束循环break;}MapRecord<String, Object, Object> entries = mapRecords.get(0);Map<Object, Object> entriesValue = entries.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(entriesValue, new VoucherOrder(), true);// 消息确认stringRedisTemplate.opsForStream().acknowledge(streamQueue,"g1",entries.getId());// 创建订单createVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理pending-list订单异常", e);}}}private void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();Long voucherId = voucherOrder.getVoucherId();RLock redisLock = redissonClient.getLock("lock:order" + userId);// 注意 这里锁的对象还是用户id解决一人一单问题boolean isLock = redisLock.tryLock();if (!isLock) {// TODO 注意 这里的锁对象是不同 进程 同一个 用户 的id 不同用户不会产生争抢锁现象 只会判断 where stock > 0log.error("不允许重复下单");return;}try {int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();if (count > 0) {log.error("不允许重复下单");return;}boolean update = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock", 0).update();if (!update) {log.error("库存不足");return;}// 保存订单save(voucherOrder);} finally {redisLock.unlock();}}}@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();// 订单idlong orderId = idWorker.nextId("order");Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),orderId);int resultValue = result.intValue();if (resultValue == 1) {return Result.fail("已售空");}if (resultValue == 2) {return Result.fail("您已购买过此卷");}if (resultValue != 0) {return Result.fail("服务器错误");}return Result.ok("订单号\t" + orderId);}
