使用延时队列完成
订单实体
package com.ctgu.sheep.mq.model;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;
import org.springframework.format.annotation.DateTimeFormat;
import javax.persistence.*;
import java.time.LocalDateTime;
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.model
* @datetime 2022/9/18 星期日
*/
@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "t_orders")
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String goodsName;
private String buyerName;
@ManyToOne
@JoinColumn(name = "buyer_id", referencedColumnName = "id")
private User buyer;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime payTime;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime finishTime;
/**
* orderStatus: 0:未支付 1:已支付 2:已完成 3:已取消
*/
private Integer orderStatus;
private String cancelReason;
}
用户实体
package com.ctgu.sheep.mq.model;
import lombok.*;
import javax.persistence.*;
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.model
* @datetime 2022/9/18 星期日
*/
@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Entity
@Table(name = "t_users")
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String username;
}
延时队列配置
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.config
* @datetime 2022/9/18 星期日
*/
@Configuration
public class OrderQueueConfig {
public static final String ORDER_QUEUE_NAME = "order.queue";
public static final String ORDER_EXCHANGE_NAME = "order.exchange";
public static final String ORDER_ROUTING_KEY = "order.routingkey";
@Bean("orderQueue")
public Queue orderQueue() {
return new Queue(ORDER_QUEUE_NAME);
}
@Bean("orderExchange")
public CustomExchange orderExchange() {
HashMap<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(ORDER_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingOrderQueue(@Qualifier("orderQueue") Queue queue,
@Qualifier("orderExchange") CustomExchange customExchange) {
return BindingBuilder.bind(queue).to(customExchange).with(ORDER_ROUTING_KEY).noargs();
}
}
订单业务控制器
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.controller
* @datetime 2022/9/18 星期日
*/
@RestController
@RequestMapping("/order")
@Slf4j
public class OrderController {
public static final String ORDER_EXCHANGE_NAME = "order.exchange";
public static final String ORDER_ROUTING_KEY = "order.routingkey";
@Autowired
private OrderRepository orderRepository;
@Autowired
private UserRepository userRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/create")
@Transactional(rollbackFor = Exception.class)
public String createOrder(@RequestBody CreateOrderParam param) {
User user = userRepository.findById(param.getBuyerId()).get();
Order order = new Order();
order.setGoodsName(param.getGoodsName());
order.setBuyerName(user.getUsername());
order.setBuyer(user);
order.setCreateTime(LocalDateTime.now());
order.setOrderStatus(0);
Order entity = orderRepository.save(order);
log.info("订单创建成功,订单id:{}", entity.getId());
rabbitTemplate.convertAndSend(ORDER_EXCHANGE_NAME, ORDER_ROUTING_KEY, String.valueOf(entity.getId()), message -> {
message.getMessageProperties().setDelay(30000);
return message;
});
return "ok";
}
@PostMapping("/pay")
public String payOrder(@RequestParam Integer orderId) {
Order order = orderRepository.findById(orderId).get();
order.setPayTime(LocalDateTime.now());
order.setOrderStatus(1);
orderRepository.save(order);
return "支付成功";
}
}
超期订单消费者
/**
* Created By Intellij IDEA
*
* @author ssssheep
* @package com.ctgu.sheep.mq.mq
* @datetime 2022/9/18 星期日
*/
@Slf4j
@Component
public class DeadOrderQueueConsumer {
@Autowired
private OrderRepository orderRepository;
public static final String ORDER_QUEUE_NAME = "order.queue";
@RabbitListener(queues = ORDER_QUEUE_NAME)
@Transactional(rollbackFor = Exception.class)
public void orderOverTime(Message message) {
String oid = new String(message.getBody());
log.info("订单延时队列收到消息,订单id:{}", oid);
Order order = orderRepository.findById(Integer.valueOf(oid)).get();
if (order.getOrderStatus() == 0) {
order.setOrderStatus(3);
order.setCancelReason("订单超时未支付");
orderRepository.save(order);
}
}
}
测试
请求下单的接口
请求成功,订单数据存入到数据库中
30s后,消息进入延时队列中,查询到订单还未支付,因此就将订单的状态改为超时
当我们创建订单后,在30s内进行了支付,那么后续消息队列就不会再修改订单的状态