Redisson 分布式锁优化
https://redisson.org/
https://github.com/redisson/redisson
## github 中文
https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95
依赖于配置
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.1</version>
</dependency>
测试
可重入
主从一致性
异步秒杀优化
- 思路
1. 将库存信息存放至redis中 [size : 100] 用户下单时将库存减一 [decr : 1] 保证库存安全
2. 将购买过思维用户id存入 redis set 集合中 j
-- 如果set集合中不存在就添加 返回一
if(redis.call('sismember',KEYS[1],ARGV[1]) == 0) then redis.call('sadd',KEYS[1],ARGV[1]) return 1 end return 0
-- 注意 then 后面的冒号在redis EVAL 中不需要 但是时LUA读起来更加规范
-- 判断value的值是否大于0 就减一
local size = redis.call('get',KEYS[1]) if(tonumber(size) > 0) then redis.call('decr',KEYS[1]) return 1 end return 0
代码1
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
对应的java业务层
private final IVoucherService voucherService;
private final ISeckillVoucherService seckillVoucherService;
private final RedisIdWorker idWorker;
private final StringRedisTemplate stringRedisTemplate;
private final RedissonClient redissonClient;
private final static DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript();
SECKILL_SCRIPT.setResultType(Long.class);
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
}
private BlockingQueue<VoucherOrder> taskQueue = new LinkedBlockingQueue<>(1024 * 1024);
private static final ExecutorService SCKILL_SERVICE = Executors.newSingleThreadExecutor();
// 当前类初始化完毕执行
@PostConstruct
private void init() {
// 提交线程任务
SCKILL_SERVICE.submit(new VoucherOrderHandler());
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
int resultValue = result.intValue();
if (resultValue == 1) {
return Result.fail("已售空");
}
if (resultValue == 2) {
return Result.fail("您已购买过此卷");
}
if (resultValue != 0) {
return Result.fail("服务器错误");
}
// 将订单加入阻塞队列 异步执行
long order = idWorker.nextId("order");
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(order);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
voucherOrder.setStatus(1);
voucherOrder.setUpdateTime(LocalDateTime.now());
voucherOrder.setCreateTime(LocalDateTime.now());
voucherOrder.setPayType(1);
// 将任务加入任务队列
taskQueue.add(voucherOrder);
return Result.ok("订单号\t" + order);
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// take 会阻塞
VoucherOrder voucherOrder = taskQueue.take();
// 创建订单
createVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
e.printStackTrace();
log.error("error", e);
}
}
}
private void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
RLock redisLock = redissonClient.getLock("lock:order" + userId);
// 注意 这里锁的对象还是用户id解决一人一单问题
boolean isLock = redisLock.tryLock();
if (!isLock) {
// TODO 注意 这里的锁对象是不同 进程 同一个 用户 的id 不同用户不会产生争抢锁现象 只会判断 where stock > 0
log.error("不允许重复下单");
return;
}
try {
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("不允许重复下单");
return;
}
boolean update = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if (!update) {
log.error("库存不足");
return;
}
// 保存订单
save(voucherOrder);
} finally {
redisLock.unlock();
}
}
}
消息队列优化
1 基于List
2 PubSub
3 Stream
# 创建一个队列
XADD s1 * k1 v1 k2 v2
# 在这个队列读取消息 从0 开始 4 条
XREAD COUNT 4 BLOCK 2000 STREAMS s1 0
XREAD COUNT 4 BLOCK 0 STREAMS s1 $
# 根据队列创建一个消费者组
XGROUP CREATE s1 wjl 0
# 读取未被消费过的消息
XREADGROUP GROUP wjl c1 COUNT 10 BLOCK 0 STREAMS s1 >
# 从0开始 读取消费过但是未确认的消息
XREADGROUP GROUP wjl c1 COUNT 10 BLOCK 0 STREAMS s1 0
# 确认消息
XACK s1 wjl 消息id ..
# s1 队列名 wjl组名 IDEL 空闲时间毫秒 - + id所有范围 10 count c1 指定消费者 每个消费者都有XPENDING
127.0.0.1:6379> XPENDING s1 wjl IDLE 1000 - + 10 c1
代码实现
-- 手动创建一个队列和消费组
127.0.0.1:6379> XGROUP CREATE stream.orders g1 0 MKSTREAM
OK
XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 0 STREAMS stream.orders >
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 发送消息到队列中 [stream.orders]
redis.call('xadd','stream.orders','*','userId',userId,'id',voucherId,'orderId',orderId)
return 0
-- redis中声明一个队列和消费组
private final IVoucherService voucherService;
private final ISeckillVoucherService seckillVoucherService;
private final RedisIdWorker idWorker;
private final StringRedisTemplate stringRedisTemplate;
private final RedissonClient redissonClient;
private final static DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript();
SECKILL_SCRIPT.setResultType(Long.class);
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
}
private static final ExecutorService SCKILL_SERVICE = Executors.newSingleThreadExecutor();
// 当前类初始化完毕执行
@PostConstruct
private void init() {
// 提交线程任务
SCKILL_SERVICE.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
String streamQueue = "stream.orders";
while (true) {
try {
// take 会阻塞
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders >
List<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
// ReadOffset.lastConsumed() == '>'
StreamOffset.create(streamQueue, ReadOffset.lastConsumed())
);
if (mapRecords == null || mapRecords.isEmpty()) {
continue;
}
MapRecord<String, Object, Object> entries = mapRecords.get(0);
Map<Object, Object> entriesValue = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(entriesValue, new VoucherOrder(), true);
// 消息确认
stringRedisTemplate.opsForStream().acknowledge(streamQueue,"g1",entries.getId());
// 创建订单
createVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("error", e);
handlePendingList();
}
}
}
private void handlePendingList() {
String streamQueue = "stream.orders";
while (true) {
try {
// take 会阻塞
// XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.orders 0
List<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),// 处理出异常集合不需要阻塞
// ReadOffset.lastConsumed() == '>'
StreamOffset.create(streamQueue, ReadOffset.from("0"))
);
if (mapRecords == null || mapRecords.isEmpty()) {
// 如果为null 说明pending-list没有异常消息 结束循环
break;
}
MapRecord<String, Object, Object> entries = mapRecords.get(0);
Map<Object, Object> entriesValue = entries.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(entriesValue, new VoucherOrder(), true);
// 消息确认
stringRedisTemplate.opsForStream().acknowledge(streamQueue,"g1",entries.getId());
// 创建订单
createVoucherOrder(voucherOrder);
} catch (Exception e) {
log.error("处理pending-list订单异常", e);
}
}
}
private void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
RLock redisLock = redissonClient.getLock("lock:order" + userId);
// 注意 这里锁的对象还是用户id解决一人一单问题
boolean isLock = redisLock.tryLock();
if (!isLock) {
// TODO 注意 这里的锁对象是不同 进程 同一个 用户 的id 不同用户不会产生争抢锁现象 只会判断 where stock > 0
log.error("不允许重复下单");
return;
}
try {
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0) {
log.error("不允许重复下单");
return;
}
boolean update = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if (!update) {
log.error("库存不足");
return;
}
// 保存订单
save(voucherOrder);
} finally {
redisLock.unlock();
}
}
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 订单id
long orderId = idWorker.nextId("order");
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
orderId
);
int resultValue = result.intValue();
if (resultValue == 1) {
return Result.fail("已售空");
}
if (resultValue == 2) {
return Result.fail("您已购买过此卷");
}
if (resultValue != 0) {
return Result.fail("服务器错误");
}
return Result.ok("订单号\t" + orderId);
}