支付订单的核心流程(未接入MQ)
每一次的订单支付成功后,都需要执行一系列的动作,包括
- 更新订单状态。
- 扣减库存。
- 增加积分。
- 发送优惠劵,发送红包。
- 发短信。
- 通知发货。
如果上述的所有操作都是同步执行的话,那可能就会导致处理的时间过长。
比如:
- 更新订单状态需要30ms;
- 调用库存服务的接口进行扣减库存需要耗费80ms;
- 增加积分需要耗费50ms;
- 发送优惠券需要耗时60ms;
- 发送短信需要耗费100ms(涉及与第三方短信系统交互,可能性能抖动会达到1秒+);
- 通知发货需要耗时500ms(因为涉及到跟第三方物流系统交互以及与仓库管理系统交互,比较耗费时间,可能会性能抖动达到1秒+)。
总共耗时时间就有 30 + 80 + 50 + 60 + 100 + 500 = 820ms。
伪代码逻辑实现
/**
* 收到订单支付成功的通知
*/
public void payOrderSuccess(Order order) {
updateOrderStatus(order); // 更新本地订单数据库里的订单状态
stockService.updateProductStock(order); // 调用库存服务的接口,扣减库存
creditService.updateCredit(order); // 调用积分库存的接口,增加积分
marketingService.addVoucher(order); // 调用营销服务的接口,增加优惠券
pushService.sendMessage(order); // 调用推送服务的接口,发送短信
warehouseService.deliveryGoods(order); // 调用仓储服务的接口,通知发货
}
支付订单的核心流程(接入MQ)
在用户支付完毕后,只要执行最核心的更新订单状态以及扣减库存就可以了。然后诸如增加积分、发送优惠券、发送短信、通知发货等操作,都可以通过MQ实现异步化执行。
具体就是,订单系统仅会同步执行更新订单状态和扣减库存这两个关键操作,因为一旦支付成功,只要保证了订单状态变为【已支付】,库存扣减掉,就可以保证核心数据不错乱。然后订单系统接着就会发送一个订单支付的消息给RocketMQ,后面相关的积分系统,营销系统,第三方短信系统,仓储系统,都会从RocketMQ里获取订单支付的消息,然后进行后续的业务逻辑处理。
改造后的执行流程时长:
- 更新订单状态30ms。
- 扣减库存80ms。
- 发送订单支付消息到RocketMQ 10ms。
总耗时时间是 30 + 80 + 10 = 120ms。
对于其他系统(积分系统、营销系统等),都会自己从RocketMQ里去获取订单支付消息执行自己要处理的业务逻辑,不会再影响订单核心链路的性能。
伪代码逻辑实现
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
/**
* 生产者发送消息到RocketMQ里的一个Topic中
*/
public class RocketMQProducer {
// 这个是RocketMQ的生产者类,用这个就可以发送消息到RocketMQ
private static DefaultMQProducer producer;
static {
// 这里就是构建一个Producer实例对象
producer = new DefaultMQProducer("order_producer_group");
// 这个是为Producer设置NameServer的地址,让他可以拉取路由信息
// 这样才能知道每个Topic的数据分散在哪些Broker机器上
// 然后才可以把消息发送到Broker上去
producer.setNamesrvAddr("localhost:9876");
// 这里启动一个Producer
producer.start();
}
public static void send(String topic, String message) throws Exception {
// 这里是构建一条消息对象
Message msgObj = new Message(
topic, // 指定发送消息到哪一个Topic上去
"", // 消息的Tag
message.getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息
);
// 利用Producer发送消息
SendResult sendResult = producer.send(msgObj);
System.out.printf("%s%n", sendResult);
}
}
Topic是一个逻辑的概念,实际上Topic的数据时分布式存储在Master Broker中的。生产者系统发送一个订单消息过去的时候,会根据一定的负载均衡算法和容错算法把消息发送到一个Broker中去。
/**
* 消费者系统(积分系统、营销系统、推送系统、仓储系统)
* 从RocketMQ里消费 “TopicOrderPaySuccess”中的订单消息,然后执行
* 增加积分、发送优惠券、发送短信、通知发货之类的业务逻辑
*/
public class RocketMQConsumer {
public static void start() {
new Thread() {
public void run() {
try {
// RocketMQ消费者示例对象
// "consumer_group" 之类的就是消费者分组
// 一般来说比如积分系统就是 "credit_consumer_group"
// 营销系统就是 "marketing_consumer_group"
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("credit_consumer_group");
// 这里给消费者设置NameServer的地址
// 这样就可以拉取到路由信息,知道Topic的数据在哪些broker上
// 然后就可以从对应的broker上拉取数据
consumer.setNamesrvAddr("localhost:9876");
// 选择订阅 "TopicOrderPaySuccess"的消息
// 这样就从这个Topic的broker机器上拉取订单消息过来
consumer.subscribe("TopicOrderPaySuccess", "*");
// 注册消息监听器来处理拉取到的订单消息
// 如果consumer拉取到订单消息,就会回调这个方法交给你处理
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MesageExt> msgs, ConsumeConcurrentlyContext context
) {
// 在这里对获取到的msgs订单消息进行处理
// 比如增加积分、发送优惠券、通知发货等等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started. %n");
while(true) { // 别让线程退出,就让创建好的consumer不停消费数据
Thread.sleep(1000);
}
} catch(Exception e) {
e.printStackTrace();
}
}
}.start();
}
}