一、最终一致性库存解锁逻辑
1、可靠消息+最终一致性
1)下订单成功,但锁库存失败;或者锁库存成功,但订单后续出现异常导致订单回滚,库存不回滚。为了让库存也回滚,引入Seata,但效率极低。Seata只适用于内部管理系统,不适应高并发前台系统
2)为了保证高并发,一旦订单回滚,库存服务一并回滚。可以在订单出现异常时,发消息给库存服务,告诉库存服务出现异常需要回滚。
3)库存服务本身也可以使用自动解锁模式,此时订单服务不需要给库存服务发送消息。无论订单成功或失败库存服务都会自动解锁。使用消息队列完成这一功能
2、增强版逻辑锁库存
1、每锁定一次库存,将信息保存至库存工作单和库存工作单详情表中
2、锁库存本身失败,库存会全部回滚
3、库存锁定成功,但订单服务后续出现异常,需要回滚解锁库存。此时数据库(库存工作单和库存工作单详情表)中有当时锁库存的记录,此时可以使用定时任务,每隔十分钟扫描数据库,检查哪些订单已经被取消或者已经被回滚,但锁库存消息仍在。就将这些消息拿出,重新解锁库存。
但是定时任务扫描全库很麻烦,所以引入延时队列。
4、延迟队列做的是定时功能,库存已锁定成功,防止订单失败,库存要自己实现解锁。将锁定成功的消息发送给消息队列,让消息队列先别往外发,在队列中暂存三十分钟,此时订单有可能是成功,也有可能是失败了。无论订单成功或失败,三十分钟后订单只要没支付就关闭订单。三十分钟后将锁库存消息发送给解锁库存服务。解锁库存服务检查订单是否已取消,若已取消,则自动解锁当时锁定的库存。库存的解锁消息在订单三十分钟失效后才开始到达解锁库存服务
二、RabbitMQ延时队列
1、消息队列流程
2、延时队列使用场景
2.1 下订单操作
下订单成功后,30分钟未支付,系统会关闭订单。使用定时任务,每隔30分钟扫描数据库,检查未支付的到期订单,并关闭此类订单
2.2 锁库存操作
下订单会调用锁库存方法,若下订单和锁库存均成功,但订单调用其他方法失败,此时订单会自动回滚(此时就不存在关闭订单问题),库存若使用seata也会回滚,但高并发不适应seata。
若采用定时任务,40分钟后检查订单的库存状态,若订单不存在(订单服务存在异常已自动回滚)或被取消(30分钟未支付被关闭订单),此时需要将先前锁定的库存解锁回来
2.3 定时任务的时效性问题
定时任务存在的缺点:消耗系统内存、增加了数据库的压力、存在较大的时间误差。
2.4 使用延时队列
下订单成功后,给消息队列发送消息表明哪个单已下成功,队列中的消息30分钟后才会被监听者收到,若存在一个服务来监听这个队列,若订单30分钟后仍未支付,将关闭订单。整个过程无需定时任务。相当于把消息暂缓存一段时间。
锁库存成功后,给消息队列发送消息,MQ将消息保存一段时间,先别着急往外发,然后时间到后,MQ自动将消息发出去,解锁库存服务拿到这个消息后,检查订单,若订单未支付或订单不存在,此时就解锁库存
使用延时队列,基本上就能解决定时任务大面积的时效性问题,延时队列可能时效性为秒级,而定时任务可能需要几十分钟,所以我们基于以上考虑采用延时队列处理下订单、关闭订单和锁库存、解锁库存操作,最终保证事务一致性。
引入MQ的第一个目的就是解决事务的最终一致性问题,因为订单最终还是要关闭的,所有使用MQ暂缓存一段时间消息,不占用系统的任何资源,只是多架设一个MQ服务器,等时间到了以后,能够保证数据的最终一致。
RabbitMQ的消息TTL和死信Exchange结合
3、消息的TTL(Time To Live)
3.1 消息的TTL就是消息的存活时间。消息在指定的时间没有被消费,称为死信。服务器默认会将它丢弃
3.2 RabbitMQ可以对队列和消息分别设置TTL。
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。 超过了这个时间,我们认为这个消息就死了,称之为死信。
如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。
4、死信路由Dead Letter Exchanges(DLX)
4.1 一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用(basic.reject/ basic.nack)requeue=false
- 上面的消息的TTL到了,消息过期了。
- 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
4.2 Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去
4.3 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
4.4 手动ack&异常消息统一放在一个队列处理建议的两种方式
- catch异常后,手动发送到指定队列 , 然后使用channel给rabbitmq确认消息已消费
- 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
5、延时队列实现-设置队列过期时间
6、延时队列实现-设置消息过期时间
RabbitMQ采用惰性检查机制,首先检查先进队列的消息1是否过期,若未过期就将消息扔回队列,5分钟后再来检查
而此时消息2和消息3可能早已过期,但都需要5分钟后才会存到死信路由中。而我们预期想要消息3首先弹出队列
所以推荐使用设置队列过期时间
三、延时队列模拟定时关单
1、订单服务创建队列、交换机、绑定关系
package com.atguigu.gulimall.order.config;
@Configuration
public class MyMQConfig {
/**
* 注解@Bean注解:可以在容器中自动创建Binding、Queue、Exchange(RabbitMQ没有的情况)
* RabbitMQ只要存在,@Bean声明的属性发生变化,重启服务后也不会覆盖先前的数据,只有删除队列才行
* 所有的消息默认会先抵达延时队列order.delay.queue,延迟一段时间后,才会抵达order.release.order.queue队列。然后才被消费掉
* @return
*/
// 创建延时队列(过了存活时间消息就会变成死信,然后将信息抛给orderReleaseOrderQueue队列,orderReleaseOrderQueue队列处理的数据都是过期的信息)
@Bean
public Queue orderDelayQueue() {
Map<String, Object> arguments = new HashMap<>();
// 指定死信路由
arguments.put("x-dead-letter-exchange", "order-event-exchange");
// 指定死信路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
// 消息过期时间(毫秒)
arguments.put("x-message-ttl", 60000);
// 参数:队列名称、是否持久化、是否排他、是否自动删除、自定义参数
Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
return orderDelayQueue;
}
// 创建普通队列,死信路由将过期消息传递给普通队列
@Bean
public Queue orderReleaseOrderQueue() {
return new Queue("order.release.order.queue", true, false, false);
}
// 创建topic类型交换机,一个微服务最好只创建一个交换机
@Bean
public Exchange orderEventExchange() {
// 参数:交换机名称、是否持久化、是否自动删除
return new TopicExchange("order-event-exchange", true, false);
}
// 创建交换机与延时队列order.delay.queue的绑定关系。
@Bean
public Binding orderCreateBingding() {
// 参数:目的地(队列名称)、目的地类型、交换机名称、路由键、自定义参数
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE,
"order-event-exchange", "order.create.order", null);
}
// 创建交换机与普通队列order.release.order.queue的绑定关系
@Bean
public Binding orderReleaseBingding() {
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,
"order-event-exchange", "order.release.order", null);
}
// 监听队列,接收过期消息
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的订单信息,准备关闭订单:" + orderEntity);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@Bean
public Binding orderReleaseOtherBingding() {
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
2、模拟下单成功
package com.atguigu.gulimall.order.web;
@Controller
public class HelloController {
@Autowired
RabbitTemplate rabbitTemplate;
// 测试创建订单,Rabbit监听订单
@ResponseBody
@GetMapping("/test/createOrder")
public String createOrderTest(){
// 模拟订单下单成功
OrderEntity entity = new OrderEntity();
entity.setOrderSn(UUID.randomUUID().toString());
entity.setModifyTime(new Date());
// 给MQ发送消息。订单创建成功后,首先是通过路由键order.create.order给延时队列order.delay.queue发送消息
// 经过一段延迟时间后,消费者的队列order.release.order.queue里面就能收到延迟以后的消息
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", entity);
return "ok";
}
}
3、RabbitMQ监听消息
package com.atguigu.gulimall.order.config;
@Configuration
public class MyMQConfig {
// 监听普通队列,接收过期消息
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的订单信息,准备关闭订单:" + orderEntity);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
4、测试收发消息
5、连续发送5个消息
6、1分钟后收到5个消息
四、库存自动解锁
1、库存服务创建交换机、队列、绑定关系
package com.atguigu.gulimall.ware.config;
@Configuration
public class MyRabbitConfig {
// 使用JSON序列化机制,进行消息转换
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
// 创建topic类型交换机。路由键采用模糊匹配。TopicExchange(交换机名称,是否持久化,是否自动删除)
@Bean
public Exchange stockEventExchange() {
return new TopicExchange("stock-event-exchange", true, false);
}
// 创建普通队列。
// Queue(队列名称,是否持久化,是否排他(只允许单个连接它,若支持多个连接,则谁抢到消息算谁的),是否自动删除)
@Bean
public Queue stockReleaseStockQueue() {
return new Queue("stock.release.stock.queue", true, false, false);
}
// 创建延迟队列。库存锁定成功后,消息先发给延迟队列,等待消息过期后,再发给普通队列
@Bean
public Queue stockDelayQueue() {
Map<String, Object> arguments = new HashMap<>();
// 设置死信路由,表示消息过期后交给哪个交换机
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
// 设置死信路由键,表示消息过期后交给哪个路由键
arguments.put("x-dead-letter-routing-key", "stock.release");
// 设置消息过期时间
arguments.put("x-message-ttl", 120000);
return new Queue("stock.delay.queue", true, false, false, arguments);
}
// 创建交换机与普通队列的绑定关系
@Bean
public Binding stockReleaseBinding() {
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,
"stock-event-exchange", "stock.release.#", null);
}
// 创建交换机与延时队列的绑定关系
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,
"stock-event-exchange", "stock.locked", null);
}
// 第一次监听消息时,idea会连接RabbitMQ,此时才会创建RabbitMQ中没有的队列、交换机和绑定关系
// 如果没有监听消息操作,RabbitMQ中就不会创建队列、交换机和绑定关系
// 如果需要修改rabbitMQ中已存在的队列交换机,需要先删除,然后再次创建
// 需要注释掉@RabbitListener,否则此时有两个在监听stock.release.stock.queue队列,导致消息消费异常
@RabbitListener(queues = "stock.release.stock.queue")
public void listener(WareInfoEntity entity, Channel channel, Message msg) throws IOException, IOException {
System.out.println("收到过期的订单信息:准备关闭订单" + entity.getId());
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
}
}
2、锁定库存
在库存锁定是添加以下逻辑
- 由于可能订单回滚的情况,所以为了能够得到库存锁定的信息,在锁定时需要记录库存工作单,其中包括订单信息和锁定库存时的信息(仓库id,商品id,锁了几件…)
在锁定成功后,向延时队列发送消息,带上库存锁定的相关信息
// 库存锁定成功后,给延时队列发送消息
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", stockLockedTo);
/** 为某个订单锁定库存
* @Transactional(rollbackFor = NoStockException.class):执行要回滚NoStockException异常。
* 可以不用加。因为默认只要是运行时异常都会回滚
*
* 库存解锁的场景:
* 1、下订单成功,订单过期没有支付被系统自动取消、被用户手动取消。都要解锁库存
* 2、下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚
* 之前锁定的库存就要自动解锁
*/
@Transactional(rollbackFor = NoStockException.class)
@Override
public Boolean orderLockStock(WareSkuLockVo vo) {
// 保存库存工作单详情,方便追溯。
WareOrderTaskEntity taskEntity = new WareOrderTaskEntity();
taskEntity.setOrderSn(vo.getOrderSn()); //为哪个订单号锁的库存
orderService.save(taskEntity);
// 1、获取每个商品在每个仓库的库存详情:SkuWareHasStock
List<OrderItemVo> locks = vo.getLocks();
List<SkuWareHasStock> collect = locks.stream().map(item -> {
SkuWareHasStock stock = new SkuWareHasStock();
Long skuId = item.getSkuId();
stock.setSkuId(skuId);
stock.setNum(item.getCount());
// 查询当前商品在哪些仓库有库存
List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId);
stock.setWareId(wareIds);
return stock;
}).collect(Collectors.toList());
// 2、锁定库存
for (SkuWareHasStock hasStock : collect) {
Boolean skuStocked = false;
Long skuId = hasStock.getSkuId();
List<Long> wareIds = hasStock.getWareId();
Integer num = hasStock.getNum();
// 2.1 若仓库为空。即没有任何仓库有这个商品的库存,直接抛异常
if (wareIds == null || wareIds.size() == 0) {
throw new NoStockException(skuId);
}
// 2.2 遍历仓库,扣库存(每个仓库依次扣库存)
// 如果每个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ
// 锁定失败,前面保存的工作单信息就回滚了。发送出去的消息,即使要解锁记录,由于去数据库查不到id,所以就不用解锁
for (Long wareId : wareIds) {
// 2.3 锁定库存。stock_locked加num
// 返回1表示成功(库存表1行受影响),返回0表示失败(0行受影响)
Long count = wareSkuDao.lockSkuStock(skuId, wareId, num);
if (count == 1) {
// 库存锁定成功
skuStocked = true;
// 将数据保存到数据库wms_ware_order_task_detail表
WareOrderTaskDetailEntity detailEntity = new WareOrderTaskDetailEntity(
null, skuId, "", hasStock.getNum(), taskEntity.getId(), wareId, 1);
orderDetailService.save(detailEntity);
StockLockedTo stockLockedTo = new StockLockedTo();
stockLockedTo.setId(taskEntity.getId());
StockDetailTo detailTo = new StockDetailTo();
BeanUtils.copyProperties(detailEntity, detailTo);
// 防止回滚之后找不到数据,所以保存完整库存单
stockLockedTo.setDetail(detailTo);
// 库存锁定成功后,给延时队列发送消息
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", stockLockedTo);
break;
} else {
// 当前仓库库存不足,尝试锁下一个仓库
}
}
// 当前商品所有仓库都无货(没锁住库存)
if (skuStocked == false) {
throw new NoStockException(skuId);
}
}
// 所有商品的库存都是锁定成功才会返回true
return true;
}
3、监听队列
1)延迟队列会将过期的消息路由至”stock.release.stock.queue”,通过监听该队列实现库存的解锁
2)为保证消息的可靠到达,我们使用手动确认消息的模式,在解锁成功后确认消息,若出现异常则重新归队 ```java package com.atguigu.gulimall.ware.listener;
// 监听解锁库存队列 @RabbitListener(queues = “stock.release.stock.queue”) @Service public class StockReleaseListener {
@Autowired
WareSkuService wareSkuService;
/**
* 监听解锁库存功能
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
System.out.println("收到接收解锁库存的信息......");
try {
// 当前消息是否被第二次及以后(重新)派发过来了
Boolean redelivered = message.getMessageProperties().getRedelivered();
// 解锁库存
wareSkuService.unlockStock(stockLockedTo);
// 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
System.out.println("rabbitMQ错误:"+e.getMessage());
// 只要有任何异常,回退消息。拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
@RabbitHandler
public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
try {
System.out.println("订单关闭,准备解锁库存......");
// 解锁库存
wareSkuService.unlockStock(orderTo);
// 不批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
System.out.println("rabbitMQ错误"+e.getMessage());
// 将消息重新回队
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
<a name="xGBwe"></a>
## 4、解锁库存
1)如果工作单详情不为空,说明该库存锁定成功<br />查询最新的订单状态,如果**订单不存在**,说明订单提交出现异常回滚,或者**订单处于已取消**的状态,需要对类已锁定的库存进行解锁<br />2)如果**工作单详情为空**,说明库存未锁定,自然**无需解锁**<br />3)为保证幂等性,我们分别对订单的状态和工作单的状态都进行了判断,只有当订单过期且工作单显示当前库存处于锁定的状态时,才进行库存的解锁
```java
/**
* 1、库存自动解锁
* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚,之前锁定的库存就要自动解锁
* 订单失败:锁库存失败
*
* 只要解锁库存的消息失败,一定要告诉服务解锁失败,需要手动ACK,回复消息
*
* 解锁库存逻辑
* 查询数据库关于这个订单的锁定库存信息wms_ware_order_task、wms_ware_order_task_detail
* 若有数据,即表示库存锁定成功。此时根据订单情况判断是否需要解锁库存
* 1、订单不存在,表示订单数据自身已回滚,此时必须解锁库存
* 2、订单存在,根据订单状态确认是否需要解锁库存
* 1)订单状态为已取消,此时需要解锁库存
* 2)订单状态未取消,此时不能解锁库存
* 若没有数据,表示库存锁定失败,库存已回滚,这种情况就无需解锁库存
*/
@Override
public void unlockStock(StockLockedTo stockLockedTo){
System.out.println("收到解锁库存的消息......");
StockDetailTo detail = stockLockedTo.getDetail();
Long detailId = detail.getId();
// 解锁库存
// 1.查询关于这个订单的锁定库存信息
WareOrderTaskDetailEntity orderTaskDetailEntity = orderDetailService.getById(detailId);
if (orderTaskDetailEntity != null) {
// 有锁定库存信息,即库存锁定成功,根据订单情况解锁
Long id = stockLockedTo.getId(); //库存工作单wms_ware_order_task表的Id
WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
String orderSn = taskEntity.getOrderSn();
// 远程调用订单服务,根据订单号获取订单实体
R r = orderFeignService.getOrderStatus(orderSn);
if (r.getCode() == 0) {
OrderVo data = r.getData(new TypeReference<OrderVo>() { });
if (data == null || data.getStatus() == 4) {
// 订单不存在(订单数据已经回滚) 或者 有订单但订单状态是已取消,才可以解锁库存
// 只有状态是1(已锁定),才能解锁
if (orderTaskDetailEntity.getLockStatus() == 1) {
unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);
}
}
} else {
// 其它状态(包含订单成功)不解锁
// 拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
throw new RuntimeException("远程服务失败....."); // 需要重新解锁。监听器中已实现
}
} else {
// 若不存在锁定库存信息,即库存锁定失败,库存回滚,这种情况无需解锁
}
}
public void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
// 库存解锁,将锁定库存数stock_locked减去num。即恢复原库存
wareSkuDao.unlockStock(skuId, wareId, num);
// 更新库存工作单状态
WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
entity.setId(taskDetailId);
entity.setLockStatus(2); // 2-已解锁
orderDetailService.updateById(entity);
}
五、定时关单
1、提交订单
/**
* 下单操作。创建订单、验令牌、验价格、锁库存
* @param vo
* @return
*/
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {
// 1、验证令牌。令牌的对比和删除必须保证原子性
// 2、创建订单,订单项等信息
// 3、验价:根据订单计算的价格与界面提交的价格进行对比(可能因后台修改商品价格出现不一致的情况)
// 4、保存订单数据至数据库
// 5、锁定库存、只要有异常就回滚订单数据
// 5.1 远程锁库存
// 6、远程扣减积分
/**
* 7、订单创建成功,将每个订单信息发送给MQ
* 通过路由键order.create.order发送给延迟队列order.create.order。
* 30分钟后延迟队列将消息通过交换机order-event-exchange、路由键order.release.order
* 发送给order.release.order.queue、order.release.coupon.queue等队列
* 此时需要查询订单的最新状态,判断当前订单是正常关单还是异常关单,若是异常关单,需要回滚相关数据
*/
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());
}
2、监听队列
创建订单的消息会进入延迟队列,最终发送至队列order.release.order.queue,因此我们对该队列进行监听,进行订单的关闭
package com.atguigu.gulimall.order.listener;
// 监听order.release.order.queue队列中的消息(30分钟后(订单30分钟未支付场景)才会到达这个队列)
@Component
@RabbitListener(queues = "order.release.order.queue")
public class OrderCloseListener {
@Autowired
OrderService orderService;
@RabbitHandler
public void listener(OrderEntity entity, Channel channel, Message msg) throws IOException {
try {
System.out.println("收到过期的订单信息,准备关闭订单:" + entity.getOrderSn());
// 关闭订单
orderService.closeOrder(entity);
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.out.println("订单关闭异常,库存解锁异常:" + e.getMessage());
// 拒绝消息,让其重新回到消息队列
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
}
}
}
3、关闭订单
1)由于要保证幂等性,因此要查询最新的订单状态判断是否需要关单
2)关闭订单后也需要解锁库存,因此发送消息进行库存、会员服务对应的解锁
/**
* 关闭订单(30分钟未支付)
* @param entity
*/
@Override
public void closeOrder(OrderEntity entity) {
// 查询订单的最新状态(30分钟内容订单状态可能有变化,所以要取最新状态)
OrderEntity orderEntity = this.getById(entity.getId());
if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) { // 订单状态为待付款
// 关闭订单,将订单状态修改为已取消
OrderEntity update = new OrderEntity();
update.setId(entity.getId());
update.setStatus(OrderStatusEnum.CANCLED.getCode());
this.updateById(update);
OrderTo orderTo = new OrderTo();
BeanUtils.copyProperties(orderEntity, orderTo);
// 保证消息一定会发送出去,每个消息都可以做好日志记录(给数据库保存每个消息的详细信息)
// 定期扫描数据库将失败的消息再发送一遍
try {
// 订单关闭后,将消息通过订单交换机order-event-exchange、路由键order.release.other发送给库存队列stock.release.stock.queue
// 监听库存队列,收到消息后判断该订单的状态,若为已取消,则主动释放库存
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
} catch (Exception e) {
// 将没法送成功的消息进行重试发送
}
}
}
4、解锁库存
4.1 建立订单交换机与解锁库存队列的绑定关系
package com.atguigu.gulimall.order.config;
@Configuration
public class MyMQConfig {
// 创建订单交换机与订单关闭队列order.release.order.queue的绑定关系
@Bean
public Binding orderReleaseBingding() {
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,
"order-event-exchange", "order.release.order", null);
}
/**
* 订单释放直接和库存释放进行绑定。
* 场景:订单创建成功后,由于服务卡顿、消息延迟或订单解锁功能异常导致30分钟仍未取消订单。
* 而此时库存处理解锁流程时,获取到的订单状态仍为新建,此时库存解锁流程就直接消费掉消息,而不去释放库存。
* 订单服务正常后,取消订单,导致订单(已取消)与库存(未释放)不一致的情况
* 所以需要订单释放直接与库存释放进行绑定,订单释放后主动发一条消息给库存释放队列,
* 然后监听库存释放队列,有消息就根据订单状态判断是否需要解锁库存
*
* 创建订单交换机与库存释放队列stock.release.stock.queue的绑定关系
*/
@Bean
public Binding orderReleaseOtherBingding() {
return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,
"order-event-exchange", "order.release.other.#", null);
}
}
4.2 监听解锁库存队列
package com.atguigu.gulimall.ware.listener;
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {
@Autowired
WareSkuService wareSkuService;
/**
* 监听解锁库存功能
*/
@RabbitHandler
public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
try {
System.out.println("订单关闭,准备解锁库存......");
// 解锁库存(重载方法)
wareSkuService.unlockStock(orderTo);
// 不批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
System.out.println("rabbitMQ错误"+e.getMessage());
// 将消息重新回队
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
4.3 解锁库存
/**
* 防止因为订单服务故障,导致订单状态消息一直未消费,
* 此时库存消息优先到期。查询订单状态仍为新建,所以解锁库存消息就直接消费掉,不会解锁库存,
* 导致因故障受影响的订单,永远也不能解锁库存
*/
@Transactional
@Override
public void unlockStock(OrderTo orderTo) {
String orderSn = orderTo.getOrderSn();
// 查询最新库存状态,防止重复解锁库存
WareOrderTaskEntity task = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);
Long id = task.getId();
// 按照工作单找到所有 没有解锁的库存,进行解锁
List<WareOrderTaskDetailEntity> list = orderDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>()
.eq("task_id", id)
.eq("lock_status", 1));
for (WareOrderTaskDetailEntity entity : list) {
unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());
}
}
public void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
// 库存解锁,将锁定库存数stock_locked减去num。即恢复原库存
wareSkuDao.unlockStock(skuId, wareId, num);
// 更新库存工作单状态
WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
entity.setId(taskDetailId);
entity.setLockStatus(2); // 2-已解锁
orderDetailService.updateById(entity);
}