image.png

使用延时队列完成

订单实体

  1. package com.ctgu.sheep.mq.model;
  2. import com.fasterxml.jackson.annotation.JsonFormat;
  3. import lombok.*;
  4. import org.springframework.format.annotation.DateTimeFormat;
  5. import javax.persistence.*;
  6. import java.time.LocalDateTime;
  7. /**
  8. * Created By Intellij IDEA
  9. *
  10. * @author ssssheep
  11. * @package com.ctgu.sheep.mq.model
  12. * @datetime 2022/9/18 星期日
  13. */
  14. @Getter
  15. @Setter
  16. @ToString
  17. @AllArgsConstructor
  18. @NoArgsConstructor
  19. @Entity
  20. @Table(name = "t_orders")
  21. public class Order {
  22. @Id
  23. @GeneratedValue(strategy = GenerationType.IDENTITY)
  24. private Integer id;
  25. private String goodsName;
  26. private String buyerName;
  27. @ManyToOne
  28. @JoinColumn(name = "buyer_id", referencedColumnName = "id")
  29. private User buyer;
  30. @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  31. @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  32. private LocalDateTime createTime;
  33. @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  34. @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  35. private LocalDateTime payTime;
  36. @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  37. @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
  38. private LocalDateTime finishTime;
  39. /**
  40. * orderStatus: 0:未支付 1:已支付 2:已完成 3:已取消
  41. */
  42. private Integer orderStatus;
  43. private String cancelReason;
  44. }

用户实体

  1. package com.ctgu.sheep.mq.model;
  2. import lombok.*;
  3. import javax.persistence.*;
  4. /**
  5. * Created By Intellij IDEA
  6. *
  7. * @author ssssheep
  8. * @package com.ctgu.sheep.mq.model
  9. * @datetime 2022/9/18 星期日
  10. */
  11. @Getter
  12. @Setter
  13. @ToString
  14. @AllArgsConstructor
  15. @NoArgsConstructor
  16. @Entity
  17. @Table(name = "t_users")
  18. public class User {
  19. @Id
  20. @GeneratedValue(strategy = GenerationType.IDENTITY)
  21. private Integer id;
  22. private String username;
  23. }

延时队列配置

  1. /**
  2. * Created By Intellij IDEA
  3. *
  4. * @author ssssheep
  5. * @package com.ctgu.sheep.mq.config
  6. * @datetime 2022/9/18 星期日
  7. */
  8. @Configuration
  9. public class OrderQueueConfig {
  10. public static final String ORDER_QUEUE_NAME = "order.queue";
  11. public static final String ORDER_EXCHANGE_NAME = "order.exchange";
  12. public static final String ORDER_ROUTING_KEY = "order.routingkey";
  13. @Bean("orderQueue")
  14. public Queue orderQueue() {
  15. return new Queue(ORDER_QUEUE_NAME);
  16. }
  17. @Bean("orderExchange")
  18. public CustomExchange orderExchange() {
  19. HashMap<String, Object> args = new HashMap<>();
  20. args.put("x-delayed-type", "direct");
  21. return new CustomExchange(ORDER_EXCHANGE_NAME, "x-delayed-message", true, false, args);
  22. }
  23. @Bean
  24. public Binding bindingOrderQueue(@Qualifier("orderQueue") Queue queue,
  25. @Qualifier("orderExchange") CustomExchange customExchange) {
  26. return BindingBuilder.bind(queue).to(customExchange).with(ORDER_ROUTING_KEY).noargs();
  27. }
  28. }

订单业务控制器

  1. /**
  2. * Created By Intellij IDEA
  3. *
  4. * @author ssssheep
  5. * @package com.ctgu.sheep.mq.controller
  6. * @datetime 2022/9/18 星期日
  7. */
  8. @RestController
  9. @RequestMapping("/order")
  10. @Slf4j
  11. public class OrderController {
  12. public static final String ORDER_EXCHANGE_NAME = "order.exchange";
  13. public static final String ORDER_ROUTING_KEY = "order.routingkey";
  14. @Autowired
  15. private OrderRepository orderRepository;
  16. @Autowired
  17. private UserRepository userRepository;
  18. @Autowired
  19. private RabbitTemplate rabbitTemplate;
  20. @PostMapping("/create")
  21. @Transactional(rollbackFor = Exception.class)
  22. public String createOrder(@RequestBody CreateOrderParam param) {
  23. User user = userRepository.findById(param.getBuyerId()).get();
  24. Order order = new Order();
  25. order.setGoodsName(param.getGoodsName());
  26. order.setBuyerName(user.getUsername());
  27. order.setBuyer(user);
  28. order.setCreateTime(LocalDateTime.now());
  29. order.setOrderStatus(0);
  30. Order entity = orderRepository.save(order);
  31. log.info("订单创建成功,订单id:{}", entity.getId());
  32. rabbitTemplate.convertAndSend(ORDER_EXCHANGE_NAME, ORDER_ROUTING_KEY, String.valueOf(entity.getId()), message -> {
  33. message.getMessageProperties().setDelay(30000);
  34. return message;
  35. });
  36. return "ok";
  37. }
  38. @PostMapping("/pay")
  39. public String payOrder(@RequestParam Integer orderId) {
  40. Order order = orderRepository.findById(orderId).get();
  41. order.setPayTime(LocalDateTime.now());
  42. order.setOrderStatus(1);
  43. orderRepository.save(order);
  44. return "支付成功";
  45. }
  46. }

超期订单消费者

  1. /**
  2. * Created By Intellij IDEA
  3. *
  4. * @author ssssheep
  5. * @package com.ctgu.sheep.mq.mq
  6. * @datetime 2022/9/18 星期日
  7. */
  8. @Slf4j
  9. @Component
  10. public class DeadOrderQueueConsumer {
  11. @Autowired
  12. private OrderRepository orderRepository;
  13. public static final String ORDER_QUEUE_NAME = "order.queue";
  14. @RabbitListener(queues = ORDER_QUEUE_NAME)
  15. @Transactional(rollbackFor = Exception.class)
  16. public void orderOverTime(Message message) {
  17. String oid = new String(message.getBody());
  18. log.info("订单延时队列收到消息,订单id:{}", oid);
  19. Order order = orderRepository.findById(Integer.valueOf(oid)).get();
  20. if (order.getOrderStatus() == 0) {
  21. order.setOrderStatus(3);
  22. order.setCancelReason("订单超时未支付");
  23. orderRepository.save(order);
  24. }
  25. }
  26. }

测试

请求下单的接口
image.png
请求成功,订单数据存入到数据库中
image.png
30s后,消息进入延时队列中,查询到订单还未支付,因此就将订单的状态改为超时
image.png
当我们创建订单后,在30s内进行了支付,那么后续消息队列就不会再修改订单的状态
image.png