一、最终一致性库存解锁逻辑

1、可靠消息+最终一致性

1)下订单成功,但锁库存失败;或者锁库存成功,但订单后续出现异常导致订单回滚,库存不回滚。为了让库存也回滚,引入Seata,但效率极低。Seata只适用于内部管理系统,不适应高并发前台系统
2)为了保证高并发,一旦订单回滚,库存服务一并回滚。可以在订单出现异常时,发消息给库存服务,告诉库存服务出现异常需要回滚。
3)库存服务本身也可以使用自动解锁模式,此时订单服务不需要给库存服务发送消息。无论订单成功或失败库存服务都会自动解锁。使用消息队列完成这一功能

2、增强版逻辑锁库存

image.png
1、每锁定一次库存,将信息保存至库存工作单和库存工作单详情表中
image.png
2、锁库存本身失败,库存会全部回滚
3、库存锁定成功,但订单服务后续出现异常,需要回滚解锁库存。此时数据库(库存工作单和库存工作单详情表)中有当时锁库存的记录,此时可以使用定时任务,每隔十分钟扫描数据库,检查哪些订单已经被取消或者已经被回滚,但锁库存消息仍在。就将这些消息拿出,重新解锁库存。
但是定时任务扫描全库很麻烦,所以引入延时队列。
4、延迟队列做的是定时功能,库存已锁定成功,防止订单失败,库存要自己实现解锁。将锁定成功的消息发送给消息队列,让消息队列先别往外发,在队列中暂存三十分钟,此时订单有可能是成功,也有可能是失败了。无论订单成功或失败,三十分钟后订单只要没支付就关闭订单。三十分钟后将锁库存消息发送给解锁库存服务。解锁库存服务检查订单是否已取消,若已取消,则自动解锁当时锁定的库存。库存的解锁消息在订单三十分钟失效后才开始到达解锁库存服务
image.png

二、RabbitMQ延时队列

1、消息队列流程

消息队列流程.jpg

2、延时队列使用场景

image.png

2.1 下订单操作

下订单成功后,30分钟未支付,系统会关闭订单。使用定时任务,每隔30分钟扫描数据库,检查未支付的到期订单,并关闭此类订单

2.2 锁库存操作

下订单会调用锁库存方法,若下订单和锁库存均成功,但订单调用其他方法失败,此时订单会自动回滚(此时就不存在关闭订单问题),库存若使用seata也会回滚,但高并发不适应seata。
若采用定时任务,40分钟后检查订单的库存状态,若订单不存在(订单服务存在异常已自动回滚)或被取消(30分钟未支付被关闭订单),此时需要将先前锁定的库存解锁回来

2.3 定时任务的时效性问题

定时任务存在的缺点:消耗系统内存、增加了数据库的压力、存在较大的时间误差。
image.png

2.4 使用延时队列

下订单成功后,给消息队列发送消息表明哪个单已下成功,队列中的消息30分钟后才会被监听者收到,若存在一个服务来监听这个队列,若订单30分钟后仍未支付,将关闭订单。整个过程无需定时任务。相当于把消息暂缓存一段时间。
锁库存成功后,给消息队列发送消息,MQ将消息保存一段时间,先别着急往外发,然后时间到后,MQ自动将消息发出去,解锁库存服务拿到这个消息后,检查订单,若订单未支付或订单不存在,此时就解锁库存

使用延时队列,基本上就能解决定时任务大面积的时效性问题,延时队列可能时效性为秒级,而定时任务可能需要几十分钟,所以我们基于以上考虑采用延时队列处理下订单、关闭订单和锁库存、解锁库存操作,最终保证事务一致性。
引入MQ的第一个目的就是解决事务的最终一致性问题,因为订单最终还是要关闭的,所有使用MQ暂缓存一段时间消息,不占用系统的任何资源,只是多架设一个MQ服务器,等时间到了以后,能够保证数据的最终一致。
RabbitMQ的消息TTL和死信Exchange结合

3、消息的TTL(Time To Live)

3.1 消息的TTL就是消息的存活时间。消息在指定的时间没有被消费,称为死信。服务器默认会将它丢弃
3.2 RabbitMQ可以对队列和消息分别设置TTL。

  1. 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。 超过了这个时间,我们认为这个消息就死了,称之为死信。
  2. 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。

    4、死信路由Dead Letter Exchanges(DLX)

    4.1 一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列

  3. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用(basic.reject/ basic.nack)requeue=false

  4. 上面的消息的TTL到了,消息过期了。
  5. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上

4.2 Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去
4.3 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
4.4 手动ack&异常消息统一放在一个队列处理建议的两种方式

  1. catch异常后,手动发送到指定队列 , 然后使用channel给rabbitmq确认消息已消费
  2. 给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败

image.png

5、延时队列实现-设置队列过期时间

image.png

6、延时队列实现-设置消息过期时间

image.png
RabbitMQ采用惰性检查机制,首先检查先进队列的消息1是否过期,若未过期就将消息扔回队列,5分钟后再来检查
而此时消息2和消息3可能早已过期,但都需要5分钟后才会存到死信路由中。而我们预期想要消息3首先弹出队列
所以推荐使用设置队列过期时间

三、延时队列模拟定时关单

image.png
image.png

1、订单服务创建队列、交换机、绑定关系

  1. package com.atguigu.gulimall.order.config;
  2. @Configuration
  3. public class MyMQConfig {
  4. /**
  5. * 注解@Bean注解:可以在容器中自动创建Binding、Queue、Exchange(RabbitMQ没有的情况)
  6. * RabbitMQ只要存在,@Bean声明的属性发生变化,重启服务后也不会覆盖先前的数据,只有删除队列才行
  7. * 所有的消息默认会先抵达延时队列order.delay.queue,延迟一段时间后,才会抵达order.release.order.queue队列。然后才被消费掉
  8. * @return
  9. */
  10. // 创建延时队列(过了存活时间消息就会变成死信,然后将信息抛给orderReleaseOrderQueue队列,orderReleaseOrderQueue队列处理的数据都是过期的信息)
  11. @Bean
  12. public Queue orderDelayQueue() {
  13. Map<String, Object> arguments = new HashMap<>();
  14. // 指定死信路由
  15. arguments.put("x-dead-letter-exchange", "order-event-exchange");
  16. // 指定死信路由键
  17. arguments.put("x-dead-letter-routing-key", "order.release.order");
  18. // 消息过期时间(毫秒)
  19. arguments.put("x-message-ttl", 60000);
  20. // 参数:队列名称、是否持久化、是否排他、是否自动删除、自定义参数
  21. Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);
  22. return orderDelayQueue;
  23. }
  24. // 创建普通队列,死信路由将过期消息传递给普通队列
  25. @Bean
  26. public Queue orderReleaseOrderQueue() {
  27. return new Queue("order.release.order.queue", true, false, false);
  28. }
  29. // 创建topic类型交换机,一个微服务最好只创建一个交换机
  30. @Bean
  31. public Exchange orderEventExchange() {
  32. // 参数:交换机名称、是否持久化、是否自动删除
  33. return new TopicExchange("order-event-exchange", true, false);
  34. }
  35. // 创建交换机与延时队列order.delay.queue的绑定关系。
  36. @Bean
  37. public Binding orderCreateBingding() {
  38. // 参数:目的地(队列名称)、目的地类型、交换机名称、路由键、自定义参数
  39. return new Binding("order.delay.queue", Binding.DestinationType.QUEUE,
  40. "order-event-exchange", "order.create.order", null);
  41. }
  42. // 创建交换机与普通队列order.release.order.queue的绑定关系
  43. @Bean
  44. public Binding orderReleaseBingding() {
  45. return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,
  46. "order-event-exchange", "order.release.order", null);
  47. }
  48. // 监听队列,接收过期消息
  49. @RabbitListener(queues = "order.release.order.queue")
  50. public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
  51. System.out.println("收到过期的订单信息,准备关闭订单:" + orderEntity);
  52. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  53. }
  54. @Bean
  55. public Binding orderReleaseOtherBingding() {
  56. return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null);
  57. }
  58. @Bean
  59. public MessageConverter messageConverter() {
  60. return new Jackson2JsonMessageConverter();
  61. }
  62. }

image.png

2、模拟下单成功

  1. package com.atguigu.gulimall.order.web;
  2. @Controller
  3. public class HelloController {
  4. @Autowired
  5. RabbitTemplate rabbitTemplate;
  6. // 测试创建订单,Rabbit监听订单
  7. @ResponseBody
  8. @GetMapping("/test/createOrder")
  9. public String createOrderTest(){
  10. // 模拟订单下单成功
  11. OrderEntity entity = new OrderEntity();
  12. entity.setOrderSn(UUID.randomUUID().toString());
  13. entity.setModifyTime(new Date());
  14. // 给MQ发送消息。订单创建成功后,首先是通过路由键order.create.order给延时队列order.delay.queue发送消息
  15. // 经过一段延迟时间后,消费者的队列order.release.order.queue里面就能收到延迟以后的消息
  16. rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", entity);
  17. return "ok";
  18. }
  19. }

3、RabbitMQ监听消息

  1. package com.atguigu.gulimall.order.config;
  2. @Configuration
  3. public class MyMQConfig {
  4. // 监听普通队列,接收过期消息
  5. @RabbitListener(queues = "order.release.order.queue")
  6. public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
  7. System.out.println("收到过期的订单信息,准备关闭订单:" + orderEntity);
  8. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  9. }
  10. }

4、测试收发消息图片.png

5、连续发送5个消息图片.png

6、1分钟后收到5个消息图片.png

四、库存自动解锁

image.png

1、库存服务创建交换机、队列、绑定关系

  1. package com.atguigu.gulimall.ware.config;
  2. @Configuration
  3. public class MyRabbitConfig {
  4. // 使用JSON序列化机制,进行消息转换
  5. @Bean
  6. public MessageConverter messageConverter() {
  7. return new Jackson2JsonMessageConverter();
  8. }
  9. // 创建topic类型交换机。路由键采用模糊匹配。TopicExchange(交换机名称,是否持久化,是否自动删除)
  10. @Bean
  11. public Exchange stockEventExchange() {
  12. return new TopicExchange("stock-event-exchange", true, false);
  13. }
  14. // 创建普通队列。
  15. // Queue(队列名称,是否持久化,是否排他(只允许单个连接它,若支持多个连接,则谁抢到消息算谁的),是否自动删除)
  16. @Bean
  17. public Queue stockReleaseStockQueue() {
  18. return new Queue("stock.release.stock.queue", true, false, false);
  19. }
  20. // 创建延迟队列。库存锁定成功后,消息先发给延迟队列,等待消息过期后,再发给普通队列
  21. @Bean
  22. public Queue stockDelayQueue() {
  23. Map<String, Object> arguments = new HashMap<>();
  24. // 设置死信路由,表示消息过期后交给哪个交换机
  25. arguments.put("x-dead-letter-exchange", "stock-event-exchange");
  26. // 设置死信路由键,表示消息过期后交给哪个路由键
  27. arguments.put("x-dead-letter-routing-key", "stock.release");
  28. // 设置消息过期时间
  29. arguments.put("x-message-ttl", 120000);
  30. return new Queue("stock.delay.queue", true, false, false, arguments);
  31. }
  32. // 创建交换机与普通队列的绑定关系
  33. @Bean
  34. public Binding stockReleaseBinding() {
  35. return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,
  36. "stock-event-exchange", "stock.release.#", null);
  37. }
  38. // 创建交换机与延时队列的绑定关系
  39. @Bean
  40. public Binding stockLockedBinding() {
  41. return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE,
  42. "stock-event-exchange", "stock.locked", null);
  43. }
  44. // 第一次监听消息时,idea会连接RabbitMQ,此时才会创建RabbitMQ中没有的队列、交换机和绑定关系
  45. // 如果没有监听消息操作,RabbitMQ中就不会创建队列、交换机和绑定关系
  46. // 如果需要修改rabbitMQ中已存在的队列交换机,需要先删除,然后再次创建
  47. // 需要注释掉@RabbitListener,否则此时有两个在监听stock.release.stock.queue队列,导致消息消费异常
  48. @RabbitListener(queues = "stock.release.stock.queue")
  49. public void listener(WareInfoEntity entity, Channel channel, Message msg) throws IOException, IOException {
  50. System.out.println("收到过期的订单信息:准备关闭订单" + entity.getId());
  51. channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
  52. }
  53. }

image.pngimage.png
修改表wms_ware_order_task_detail
image.png

2、锁定库存

在库存锁定是添加以下逻辑

  • 由于可能订单回滚的情况,所以为了能够得到库存锁定的信息,在锁定时需要记录库存工作单,其中包括订单信息和锁定库存时的信息(仓库id,商品id,锁了几件…)
  • 在锁定成功后,向延时队列发送消息,带上库存锁定的相关信息

    1. // 库存锁定成功后,给延时队列发送消息
    2. rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", stockLockedTo);
    1. /** 为某个订单锁定库存
    2. * @Transactional(rollbackFor = NoStockException.class):执行要回滚NoStockException异常。
    3. * 可以不用加。因为默认只要是运行时异常都会回滚
    4. *
    5. * 库存解锁的场景:
    6. * 1、下订单成功,订单过期没有支付被系统自动取消、被用户手动取消。都要解锁库存
    7. * 2、下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚
    8. * 之前锁定的库存就要自动解锁
    9. */
    10. @Transactional(rollbackFor = NoStockException.class)
    11. @Override
    12. public Boolean orderLockStock(WareSkuLockVo vo) {
    13. // 保存库存工作单详情,方便追溯。
    14. WareOrderTaskEntity taskEntity = new WareOrderTaskEntity();
    15. taskEntity.setOrderSn(vo.getOrderSn()); //为哪个订单号锁的库存
    16. orderService.save(taskEntity);
    17. // 1、获取每个商品在每个仓库的库存详情:SkuWareHasStock
    18. List<OrderItemVo> locks = vo.getLocks();
    19. List<SkuWareHasStock> collect = locks.stream().map(item -> {
    20. SkuWareHasStock stock = new SkuWareHasStock();
    21. Long skuId = item.getSkuId();
    22. stock.setSkuId(skuId);
    23. stock.setNum(item.getCount());
    24. // 查询当前商品在哪些仓库有库存
    25. List<Long> wareIds = wareSkuDao.listWareIdHasSkuStock(skuId);
    26. stock.setWareId(wareIds);
    27. return stock;
    28. }).collect(Collectors.toList());
    29. // 2、锁定库存
    30. for (SkuWareHasStock hasStock : collect) {
    31. Boolean skuStocked = false;
    32. Long skuId = hasStock.getSkuId();
    33. List<Long> wareIds = hasStock.getWareId();
    34. Integer num = hasStock.getNum();
    35. // 2.1 若仓库为空。即没有任何仓库有这个商品的库存,直接抛异常
    36. if (wareIds == null || wareIds.size() == 0) {
    37. throw new NoStockException(skuId);
    38. }
    39. // 2.2 遍历仓库,扣库存(每个仓库依次扣库存)
    40. // 如果每个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ
    41. // 锁定失败,前面保存的工作单信息就回滚了。发送出去的消息,即使要解锁记录,由于去数据库查不到id,所以就不用解锁
    42. for (Long wareId : wareIds) {
    43. // 2.3 锁定库存。stock_locked加num
    44. // 返回1表示成功(库存表1行受影响),返回0表示失败(0行受影响)
    45. Long count = wareSkuDao.lockSkuStock(skuId, wareId, num);
    46. if (count == 1) {
    47. // 库存锁定成功
    48. skuStocked = true;
    49. // 将数据保存到数据库wms_ware_order_task_detail表
    50. WareOrderTaskDetailEntity detailEntity = new WareOrderTaskDetailEntity(
    51. null, skuId, "", hasStock.getNum(), taskEntity.getId(), wareId, 1);
    52. orderDetailService.save(detailEntity);
    53. StockLockedTo stockLockedTo = new StockLockedTo();
    54. stockLockedTo.setId(taskEntity.getId());
    55. StockDetailTo detailTo = new StockDetailTo();
    56. BeanUtils.copyProperties(detailEntity, detailTo);
    57. // 防止回滚之后找不到数据,所以保存完整库存单
    58. stockLockedTo.setDetail(detailTo);
    59. // 库存锁定成功后,给延时队列发送消息
    60. rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", stockLockedTo);
    61. break;
    62. } else {
    63. // 当前仓库库存不足,尝试锁下一个仓库
    64. }
    65. }
    66. // 当前商品所有仓库都无货(没锁住库存)
    67. if (skuStocked == false) {
    68. throw new NoStockException(skuId);
    69. }
    70. }
    71. // 所有商品的库存都是锁定成功才会返回true
    72. return true;
    73. }

    3、监听队列

    1)延迟队列会将过期的消息路由至”stock.release.stock.queue”,通过监听该队列实现库存的解锁
    2)为保证消息的可靠到达,我们使用手动确认消息的模式,在解锁成功后确认消息,若出现异常则重新归队 ```java package com.atguigu.gulimall.ware.listener;

// 监听解锁库存队列 @RabbitListener(queues = “stock.release.stock.queue”) @Service public class StockReleaseListener {

  1. @Autowired
  2. WareSkuService wareSkuService;
  3. /**
  4. * 监听解锁库存功能
  5. */
  6. @RabbitHandler
  7. public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
  8. System.out.println("收到接收解锁库存的信息......");
  9. try {
  10. // 当前消息是否被第二次及以后(重新)派发过来了
  11. Boolean redelivered = message.getMessageProperties().getRedelivered();
  12. // 解锁库存
  13. wareSkuService.unlockStock(stockLockedTo);
  14. // 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)
  15. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  16. }catch (Exception e){
  17. System.out.println("rabbitMQ错误:"+e.getMessage());
  18. // 只要有任何异常,回退消息。拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)
  19. channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
  20. }
  21. }
  22. @RabbitHandler
  23. public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
  24. try {
  25. System.out.println("订单关闭,准备解锁库存......");
  26. // 解锁库存
  27. wareSkuService.unlockStock(orderTo);
  28. // 不批量处理
  29. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  30. }catch (Exception e){
  31. System.out.println("rabbitMQ错误"+e.getMessage());
  32. // 将消息重新回队
  33. channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
  34. }
  35. }

}

  1. <a name="xGBwe"></a>
  2. ## 4、解锁库存
  3. 1)如果工作单详情不为空,说明该库存锁定成功<br />查询最新的订单状态,如果**订单不存在**,说明订单提交出现异常回滚,或者**订单处于已取消**的状态,需要对类已锁定的库存进行解锁<br />2)如果**工作单详情为空**,说明库存未锁定,自然**无需解锁**<br />3)为保证幂等性,我们分别对订单的状态和工作单的状态都进行了判断,只有当订单过期且工作单显示当前库存处于锁定的状态时,才进行库存的解锁
  4. ```java
  5. /**
  6. * 1、库存自动解锁
  7. * 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚,之前锁定的库存就要自动解锁
  8. * 订单失败:锁库存失败
  9. *
  10. * 只要解锁库存的消息失败,一定要告诉服务解锁失败,需要手动ACK,回复消息
  11. *
  12. * 解锁库存逻辑
  13. * 查询数据库关于这个订单的锁定库存信息wms_ware_order_task、wms_ware_order_task_detail
  14. * 若有数据,即表示库存锁定成功。此时根据订单情况判断是否需要解锁库存
  15. * 1、订单不存在,表示订单数据自身已回滚,此时必须解锁库存
  16. * 2、订单存在,根据订单状态确认是否需要解锁库存
  17. * 1)订单状态为已取消,此时需要解锁库存
  18. * 2)订单状态未取消,此时不能解锁库存
  19. * 若没有数据,表示库存锁定失败,库存已回滚,这种情况就无需解锁库存
  20. */
  21. @Override
  22. public void unlockStock(StockLockedTo stockLockedTo){
  23. System.out.println("收到解锁库存的消息......");
  24. StockDetailTo detail = stockLockedTo.getDetail();
  25. Long detailId = detail.getId();
  26. // 解锁库存
  27. // 1.查询关于这个订单的锁定库存信息
  28. WareOrderTaskDetailEntity orderTaskDetailEntity = orderDetailService.getById(detailId);
  29. if (orderTaskDetailEntity != null) {
  30. // 有锁定库存信息,即库存锁定成功,根据订单情况解锁
  31. Long id = stockLockedTo.getId(); //库存工作单wms_ware_order_task表的Id
  32. WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
  33. String orderSn = taskEntity.getOrderSn();
  34. // 远程调用订单服务,根据订单号获取订单实体
  35. R r = orderFeignService.getOrderStatus(orderSn);
  36. if (r.getCode() == 0) {
  37. OrderVo data = r.getData(new TypeReference<OrderVo>() { });
  38. if (data == null || data.getStatus() == 4) {
  39. // 订单不存在(订单数据已经回滚) 或者 有订单但订单状态是已取消,才可以解锁库存
  40. // 只有状态是1(已锁定),才能解锁
  41. if (orderTaskDetailEntity.getLockStatus() == 1) {
  42. unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);
  43. }
  44. }
  45. } else {
  46. // 其它状态(包含订单成功)不解锁
  47. // 拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)
  48. // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  49. throw new RuntimeException("远程服务失败....."); // 需要重新解锁。监听器中已实现
  50. }
  51. } else {
  52. // 若不存在锁定库存信息,即库存锁定失败,库存回滚,这种情况无需解锁
  53. }
  54. }
  55. public void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
  56. // 库存解锁,将锁定库存数stock_locked减去num。即恢复原库存
  57. wareSkuDao.unlockStock(skuId, wareId, num);
  58. // 更新库存工作单状态
  59. WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
  60. entity.setId(taskDetailId);
  61. entity.setLockStatus(2); // 2-已解锁
  62. orderDetailService.updateById(entity);
  63. }

五、定时关单

image.png

1、提交订单

  1. /**
  2. * 下单操作。创建订单、验令牌、验价格、锁库存
  3. * @param vo
  4. * @return
  5. */
  6. @Transactional
  7. @Override
  8. public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {
  9. // 1、验证令牌。令牌的对比和删除必须保证原子性
  10. // 2、创建订单,订单项等信息
  11. // 3、验价:根据订单计算的价格与界面提交的价格进行对比(可能因后台修改商品价格出现不一致的情况)
  12. // 4、保存订单数据至数据库
  13. // 5、锁定库存、只要有异常就回滚订单数据
  14. // 5.1 远程锁库存
  15. // 6、远程扣减积分
  16. /**
  17. * 7、订单创建成功,将每个订单信息发送给MQ
  18. * 通过路由键order.create.order发送给延迟队列order.create.order。
  19. * 30分钟后延迟队列将消息通过交换机order-event-exchange、路由键order.release.order
  20. * 发送给order.release.order.queue、order.release.coupon.queue等队列
  21. * 此时需要查询订单的最新状态,判断当前订单是正常关单还是异常关单,若是异常关单,需要回滚相关数据
  22. */
  23. rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());
  24. }

图片.png

2、监听队列

创建订单的消息会进入延迟队列,最终发送至队列order.release.order.queue,因此我们对该队列进行监听,进行订单的关闭

  1. package com.atguigu.gulimall.order.listener;
  2. // 监听order.release.order.queue队列中的消息(30分钟后(订单30分钟未支付场景)才会到达这个队列)
  3. @Component
  4. @RabbitListener(queues = "order.release.order.queue")
  5. public class OrderCloseListener {
  6. @Autowired
  7. OrderService orderService;
  8. @RabbitHandler
  9. public void listener(OrderEntity entity, Channel channel, Message msg) throws IOException {
  10. try {
  11. System.out.println("收到过期的订单信息,准备关闭订单:" + entity.getOrderSn());
  12. // 关闭订单
  13. orderService.closeOrder(entity);
  14. channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
  15. } catch (Exception e) {
  16. System.out.println("订单关闭异常,库存解锁异常:" + e.getMessage());
  17. // 拒绝消息,让其重新回到消息队列
  18. channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);
  19. }
  20. }
  21. }

3、关闭订单

1)由于要保证幂等性,因此要查询最新的订单状态判断是否需要关单
2)关闭订单后也需要解锁库存,因此发送消息进行库存、会员服务对应的解锁

  1. /**
  2. * 关闭订单(30分钟未支付)
  3. * @param entity
  4. */
  5. @Override
  6. public void closeOrder(OrderEntity entity) {
  7. // 查询订单的最新状态(30分钟内容订单状态可能有变化,所以要取最新状态)
  8. OrderEntity orderEntity = this.getById(entity.getId());
  9. if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) { // 订单状态为待付款
  10. // 关闭订单,将订单状态修改为已取消
  11. OrderEntity update = new OrderEntity();
  12. update.setId(entity.getId());
  13. update.setStatus(OrderStatusEnum.CANCLED.getCode());
  14. this.updateById(update);
  15. OrderTo orderTo = new OrderTo();
  16. BeanUtils.copyProperties(orderEntity, orderTo);
  17. // 保证消息一定会发送出去,每个消息都可以做好日志记录(给数据库保存每个消息的详细信息)
  18. // 定期扫描数据库将失败的消息再发送一遍
  19. try {
  20. // 订单关闭后,将消息通过订单交换机order-event-exchange、路由键order.release.other发送给库存队列stock.release.stock.queue
  21. // 监听库存队列,收到消息后判断该订单的状态,若为已取消,则主动释放库存
  22. rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
  23. } catch (Exception e) {
  24. // 将没法送成功的消息进行重试发送
  25. }
  26. }
  27. }

4、解锁库存

4.1 建立订单交换机与解锁库存队列的绑定关系

  1. package com.atguigu.gulimall.order.config;
  2. @Configuration
  3. public class MyMQConfig {
  4. // 创建订单交换机与订单关闭队列order.release.order.queue的绑定关系
  5. @Bean
  6. public Binding orderReleaseBingding() {
  7. return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,
  8. "order-event-exchange", "order.release.order", null);
  9. }
  10. /**
  11. * 订单释放直接和库存释放进行绑定。
  12. * 场景:订单创建成功后,由于服务卡顿、消息延迟或订单解锁功能异常导致30分钟仍未取消订单。
  13. * 而此时库存处理解锁流程时,获取到的订单状态仍为新建,此时库存解锁流程就直接消费掉消息,而不去释放库存。
  14. * 订单服务正常后,取消订单,导致订单(已取消)与库存(未释放)不一致的情况
  15. * 所以需要订单释放直接与库存释放进行绑定,订单释放后主动发一条消息给库存释放队列,
  16. * 然后监听库存释放队列,有消息就根据订单状态判断是否需要解锁库存
  17. *
  18. * 创建订单交换机与库存释放队列stock.release.stock.queue的绑定关系
  19. */
  20. @Bean
  21. public Binding orderReleaseOtherBingding() {
  22. return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE,
  23. "order-event-exchange", "order.release.other.#", null);
  24. }
  25. }

4.2 监听解锁库存队列

  1. package com.atguigu.gulimall.ware.listener;
  2. @Service
  3. @RabbitListener(queues = "stock.release.stock.queue")
  4. public class StockReleaseListener {
  5. @Autowired
  6. WareSkuService wareSkuService;
  7. /**
  8. * 监听解锁库存功能
  9. */
  10. @RabbitHandler
  11. public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
  12. try {
  13. System.out.println("订单关闭,准备解锁库存......");
  14. // 解锁库存(重载方法)
  15. wareSkuService.unlockStock(orderTo);
  16. // 不批量处理
  17. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  18. }catch (Exception e){
  19. System.out.println("rabbitMQ错误"+e.getMessage());
  20. // 将消息重新回队
  21. channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
  22. }
  23. }
  24. }

4.3 解锁库存

  1. /**
  2. * 防止因为订单服务故障,导致订单状态消息一直未消费,
  3. * 此时库存消息优先到期。查询订单状态仍为新建,所以解锁库存消息就直接消费掉,不会解锁库存,
  4. * 导致因故障受影响的订单,永远也不能解锁库存
  5. */
  6. @Transactional
  7. @Override
  8. public void unlockStock(OrderTo orderTo) {
  9. String orderSn = orderTo.getOrderSn();
  10. // 查询最新库存状态,防止重复解锁库存
  11. WareOrderTaskEntity task = wareOrderTaskService.getOrderTaskByOrderSn(orderSn);
  12. Long id = task.getId();
  13. // 按照工作单找到所有 没有解锁的库存,进行解锁
  14. List<WareOrderTaskDetailEntity> list = orderDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>()
  15. .eq("task_id", id)
  16. .eq("lock_status", 1));
  17. for (WareOrderTaskDetailEntity entity : list) {
  18. unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());
  19. }
  20. }
  21. public void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {
  22. // 库存解锁,将锁定库存数stock_locked减去num。即恢复原库存
  23. wareSkuDao.unlockStock(skuId, wareId, num);
  24. // 更新库存工作单状态
  25. WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
  26. entity.setId(taskDetailId);
  27. entity.setLockStatus(2); // 2-已解锁
  28. orderDetailService.updateById(entity);
  29. }