支付订单的核心流程(未接入MQ)

image.png
每一次的订单支付成功后,都需要执行一系列的动作,包括

  • 更新订单状态。
  • 扣减库存。
  • 增加积分。
  • 发送优惠劵,发送红包。
  • 发短信。
  • 通知发货。

如果上述的所有操作都是同步执行的话,那可能就会导致处理的时间过长。
比如:

  • 更新订单状态需要30ms;
  • 调用库存服务的接口进行扣减库存需要耗费80ms;
  • 增加积分需要耗费50ms;
  • 发送优惠券需要耗时60ms;
  • 发送短信需要耗费100ms(涉及与第三方短信系统交互,可能性能抖动会达到1秒+);
  • 通知发货需要耗时500ms(因为涉及到跟第三方物流系统交互以及与仓库管理系统交互,比较耗费时间,可能会性能抖动达到1秒+)。

总共耗时时间就有 30 + 80 + 50 + 60 + 100 + 500 = 820ms。

伪代码逻辑实现

  1. /**
  2. * 收到订单支付成功的通知
  3. */
  4. public void payOrderSuccess(Order order) {
  5. updateOrderStatus(order); // 更新本地订单数据库里的订单状态
  6. stockService.updateProductStock(order); // 调用库存服务的接口,扣减库存
  7. creditService.updateCredit(order); // 调用积分库存的接口,增加积分
  8. marketingService.addVoucher(order); // 调用营销服务的接口,增加优惠券
  9. pushService.sendMessage(order); // 调用推送服务的接口,发送短信
  10. warehouseService.deliveryGoods(order); // 调用仓储服务的接口,通知发货
  11. }

支付订单的核心流程(接入MQ)

image.png
在用户支付完毕后,只要执行最核心的更新订单状态以及扣减库存就可以了。然后诸如增加积分、发送优惠券、发送短信、通知发货等操作,都可以通过MQ实现异步化执行。
具体就是,订单系统仅会同步执行更新订单状态和扣减库存这两个关键操作,因为一旦支付成功,只要保证了订单状态变为【已支付】,库存扣减掉,就可以保证核心数据不错乱。然后订单系统接着就会发送一个订单支付的消息给RocketMQ,后面相关的积分系统,营销系统,第三方短信系统,仓储系统,都会从RocketMQ里获取订单支付的消息,然后进行后续的业务逻辑处理。
改造后的执行流程时长:

  • 更新订单状态30ms。
  • 扣减库存80ms。
  • 发送订单支付消息到RocketMQ 10ms。

总耗时时间是 30 + 80 + 10 = 120ms。

对于其他系统(积分系统、营销系统等),都会自己从RocketMQ里去获取订单支付消息执行自己要处理的业务逻辑,不会再影响订单核心链路的性能。

伪代码逻辑实现

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.3.0</version>
  5. </dependency>
  1. /**
  2. * 生产者发送消息到RocketMQ里的一个Topic中
  3. */
  4. public class RocketMQProducer {
  5. // 这个是RocketMQ的生产者类,用这个就可以发送消息到RocketMQ
  6. private static DefaultMQProducer producer;
  7. static {
  8. // 这里就是构建一个Producer实例对象
  9. producer = new DefaultMQProducer("order_producer_group");
  10. // 这个是为Producer设置NameServer的地址,让他可以拉取路由信息
  11. // 这样才能知道每个Topic的数据分散在哪些Broker机器上
  12. // 然后才可以把消息发送到Broker上去
  13. producer.setNamesrvAddr("localhost:9876");
  14. // 这里启动一个Producer
  15. producer.start();
  16. }
  17. public static void send(String topic, String message) throws Exception {
  18. // 这里是构建一条消息对象
  19. Message msgObj = new Message(
  20. topic, // 指定发送消息到哪一个Topic上去
  21. "", // 消息的Tag
  22. message.getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息
  23. );
  24. // 利用Producer发送消息
  25. SendResult sendResult = producer.send(msgObj);
  26. System.out.printf("%s%n", sendResult);
  27. }
  28. }

Topic是一个逻辑的概念,实际上Topic的数据时分布式存储在Master Broker中的。生产者系统发送一个订单消息过去的时候,会根据一定的负载均衡算法和容错算法把消息发送到一个Broker中去。

  1. /**
  2. * 消费者系统(积分系统、营销系统、推送系统、仓储系统)
  3. * 从RocketMQ里消费 “TopicOrderPaySuccess”中的订单消息,然后执行
  4. * 增加积分、发送优惠券、发送短信、通知发货之类的业务逻辑
  5. */
  6. public class RocketMQConsumer {
  7. public static void start() {
  8. new Thread() {
  9. public void run() {
  10. try {
  11. // RocketMQ消费者示例对象
  12. // "consumer_group" 之类的就是消费者分组
  13. // 一般来说比如积分系统就是 "credit_consumer_group"
  14. // 营销系统就是 "marketing_consumer_group"
  15. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("credit_consumer_group");
  16. // 这里给消费者设置NameServer的地址
  17. // 这样就可以拉取到路由信息,知道Topic的数据在哪些broker上
  18. // 然后就可以从对应的broker上拉取数据
  19. consumer.setNamesrvAddr("localhost:9876");
  20. // 选择订阅 "TopicOrderPaySuccess"的消息
  21. // 这样就从这个Topic的broker机器上拉取订单消息过来
  22. consumer.subscribe("TopicOrderPaySuccess", "*");
  23. // 注册消息监听器来处理拉取到的订单消息
  24. // 如果consumer拉取到订单消息,就会回调这个方法交给你处理
  25. consumer.registerMessageListener(new MessageListenerConcurrently() {
  26. public ConsumeConcurrentlyStatus consumeMessage(
  27. List<MesageExt> msgs, ConsumeConcurrentlyContext context
  28. ) {
  29. // 在这里对获取到的msgs订单消息进行处理
  30. // 比如增加积分、发送优惠券、通知发货等等
  31. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  32. }
  33. });
  34. // 启动消费者实例
  35. consumer.start();
  36. System.out.printf("Consumer Started. %n");
  37. while(true) { // 别让线程退出,就让创建好的consumer不停消费数据
  38. Thread.sleep(1000);
  39. }
  40. } catch(Exception e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }.start();
  45. }
  46. }