1. 案例介绍

1.1 业务分析

模拟电商网站购物场景中的【下单】和【支付】业务
###1)下单
RocketMQ-02 - 图3

  1. 用户请求订单系统下单
  2. 订单系统通过RPC调用订单服务下单
  3. 订单服务调用优惠券服务,扣减优惠券
  4. 订单服务调用调用库存服务,校验并扣减库存
  5. 订单服务调用用户服务,扣减用户余额
  6. 订单服务完成确认订单

2)支付
RocketMQ-02 - 图4

  1. 用户请求支付系统
  2. 支付系统调用第三方支付平台API进行发起支付流程
  3. 用户通过第三方支付平台支付成功后,第三方支付平台回调通知支付系统
  4. 支付系统调用订单服务修改订单状态
  5. 支付系统调用积分服务添加积分
  6. 支付系统调用日志服务记录日志

    1.2 问题分析

    问题1

    用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。
    如何保证数据的完整性?
    RocketMQ-02 - 图5

使用MQ保证在下单失败后系统数据的完整性
RocketMQ-02 - 图6

问题2
用户通过第三方支付平台(支付宝、微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户支付结果,支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。
商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付凭条做出回应?
RocketMQ-02 - 图7

通过MQ进行数据分发,提高系统处理性能
RocketMQ-02 - 图8

2. 技术分析

2.1 技术选型

  • SpringBoot
  • Dubbo
  • Zookeeper
  • RocketMQ
  • Mysql

RocketMQ-02 - 图9

2.2 SpringBoot整合RocketMQ

下载rocketmq-spring项目
将rocketmq-spring安装到本地仓库
mvn install -Dmaven.skip.test=true

2.2.1 消息生产者

1)添加依赖


org.springframework.boot
spring-boot-starter-parent
2.0.1.RELEASE

2.0.3



org.apache.rocketmq
rocketmq-spring-boot-starter
${rocketmq-spring-boot-starter-version}


org.projectlombok
lombok
1.18.6


org.springframework.boot
spring-boot-starter-test
test

2)配置文件

application.properties
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=my-group

3)启动类

@SpringBootApplication
public class MQProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MQSpringBootApplication.class);
}
}

4)测试类

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MQSpringBootApplication.class})
public class ProducerTest {

  1. @Autowired<br /> private RocketMQTemplate rocketMQTemplate;
  2. @Test<br /> public void test1(){<br /> rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");<br /> }<br />}

2.2.2 消息消费者

1)添加依赖

同消息生产者

2)配置文件

同消息生产者

3)启动类

@SpringBootApplication
public class MQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(MQSpringBootApplication.class);
}
}

4)消息监听器

@Slf4j
@Component
@RocketMQMessageListener(topic = “springboot-mq”,consumerGroup = “springboot-mq-consumer-1”)
public class Consumer implements RocketMQListener {

  1. @Override<br /> public void onMessage(String message) {<br /> log.info("Receive message:"+message);<br /> }<br />}

2.3 SpringBoot整合Dubbo

下载dubbo-spring-boot-starter依赖包
将dubbo-spring-boot-starter安装到本地仓库
mvn install -Dmaven.skip.test=true
RocketMQ-02 - 图10

2.3.1 搭建Zookeeper集群

1)准备工作

  1. 安装JDK
  2. 将Zookeeper上传到服务器
  3. 解压Zookeeper,并创建data目录,将conf下的zoo_sample.cfg文件改名为zoo.cfg
  4. 建立/user/local/zookeeper-cluster,将解压后的Zookeeper复制到以下三个目录

/usr/local/zookeeper-cluster/zookeeper-1
/usr/local/zookeeper-cluster/zookeeper-2
/usr/local/zookeeper-cluster/zookeeper-3

  1. 配置每一个 Zookeeper 的 dataDir(zoo.cfg) clientPort 分别为 2181 2182 2183
  • 修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg

clientPort=2181
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data
修改/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
clientPort=2182
dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data
修改/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
clientPort=2183
dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data

2)配置集群

  1. 在每个 zookeeper 的 data 目录下创建一个 myid 文件,内容分别是 1、2、3 。这个文件就是记录每个服务器的 ID
  2. 在每一个 zookeeper 的 zoo.cfg 配置客户端访问端口(clientPort)和集群服务器 IP 列表。
  • 集群服务器 IP 列表如下

server.1=192.168.25.140:2881:3881
server.2=192.168.25.140:2882:3882
server.3=192.168.25.140:2883:3883
解释:server.服务器 ID=服务器 IP 地址:服务器之间通信端口:服务器之间投票选举端口

3)启动集群

启动集群就是分别启动每个实例。
RocketMQ-02 - 图11

2.3.2 RPC服务接口

public interface IUserService {
public String sayHello(String name);
}

2.3.3 服务提供者

1)添加依赖


org.springframework.boot
spring-boot-starter-parent
2.0.1.RELEASE




com.alibaba.spring.boot
dubbo-spring-boot-starter
2.0.0



org.springframework.boot
spring-boot-starter


log4j-to-slf4j
org.apache.logging.log4j





org.apache.zookeeper
zookeeper
3.4.10


org.slf4j
slf4j-log4j12


log4j
log4j


  1. <dependency><br /> <groupId>com.101tec</groupId><br /> <artifactId>zkclient</artifactId><br /> <version>0.9</version><br /> <exclusions><br /> <exclusion><br /> <artifactId>slf4j-log4j12</artifactId><br /> <groupId>org.slf4j</groupId><br /> </exclusion><br /> </exclusions><br /> </dependency><br /> <!--API--><br /> <dependency><br /> <groupId>com.itheima.demo</groupId><br /> <artifactId>dubbo-api</artifactId><br /> <version>1.0-SNAPSHOT</version><br /> </dependency>

2)配置文件

application.properties
spring.application.name=dubbo-demo-provider
spring.dubbo.application.id=dubbo-demo-provider
spring.dubbo.application.name=dubbo-demo-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20880

3)启动类

@EnableDubboConfiguration
@SpringBootApplication
public class ProviderBootstrap {

  1. public static void main(String[] args) throws IOException {<br /> SpringApplication.run(ProviderBootstrap.class,args);<br /> }

}

4)服务实现

@Component
@Service(interfaceClass = IUserService.class)
public class UserServiceImpl implements IUserService{
@Override
public String sayHello(String name) {
return “hello:”+name;
}
}

2.3.4 服务消费者

1)添加依赖


org.springframework.boot
spring-boot-starter-parent
2.0.1.RELEASE
  1. <dependency><br /> <groupId>org.springframework.boot</groupId><br /> <artifactId>spring-boot-starter-web</artifactId><br /> </dependency>
  2. <!--dubbo--><br /> <dependency><br /> <groupId>com.alibaba.spring.boot</groupId><br /> <artifactId>dubbo-spring-boot-starter</artifactId><br /> <version>2.0.0</version><br /> </dependency>
  3. <dependency><br /> <groupId>org.springframework.boot</groupId><br /> <artifactId>spring-boot-starter</artifactId><br /> <exclusions><br /> <exclusion><br /> <artifactId>log4j-to-slf4j</artifactId><br /> <groupId>org.apache.logging.log4j</groupId><br /> </exclusion><br /> </exclusions><br /> </dependency>
  4. <!--zookeeper--><br /> <dependency><br /> <groupId>org.apache.zookeeper</groupId><br /> <artifactId>zookeeper</artifactId><br /> <version>3.4.10</version><br /> <exclusions><br /> <exclusion><br /> <groupId>org.slf4j</groupId><br /> <artifactId>slf4j-log4j12</artifactId><br /> </exclusion><br /> <exclusion><br /> <groupId>log4j</groupId><br /> <artifactId>log4j</artifactId><br /> </exclusion><br /> </exclusions><br /> </dependency>
  5. <dependency><br /> <groupId>com.101tec</groupId><br /> <artifactId>zkclient</artifactId><br /> <version>0.9</version><br /> <exclusions><br /> <exclusion><br /> <artifactId>slf4j-log4j12</artifactId><br /> <groupId>org.slf4j</groupId><br /> </exclusion><br /> </exclusions><br /> </dependency>
  6. <!--API--><br /> <dependency><br /> <groupId>com.itheima.demo</groupId><br /> <artifactId>dubbo-api</artifactId><br /> <version>1.0-SNAPSHOT</version><br /> </dependency>

2)配置文件

application.properties
spring.application.name=dubbo-demo-consumer
spring.dubbo.application.name=dubbo-demo-consumer
spring.dubbo.application.id=dubbo-demo-consumer
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183

3)启动类

@EnableDubboConfiguration
@SpringBootApplication
public class ConsumerBootstrap {
public static void main(String[] args) {
SpringApplication.run(ConsumerBootstrap.class);
}
}

4)Controller

@RestController
@RequestMapping(“/user”)
public class UserController {

  1. @Reference<br /> private IUserService userService;
  2. @RequestMapping("/sayHello")<br /> public String sayHello(String name){<br /> return userService.sayHello(name);<br /> }

}

3. 环境搭建

3.1 数据库

1)优惠券表

Field Type Comment
coupon_id bigint(50) NOT NULL 优惠券ID
coupon_price decimal(10,2) NULL 优惠券金额
user_id bigint(50) NULL 用户ID
order_id bigint(32) NULL 订单ID
is_used int(1) NULL 是否使用 0未使用 1已使用
used_time timestamp NULL 使用时间

2)商品表

Field Type Comment
goods_id bigint(50) NOT NULL 主键
goods_name varchar(255) NULL 商品名称
goods_number int(11) NULL 商品库存
goods_price decimal(10,2) NULL 商品价格
goods_desc varchar(255) NULL 商品描述
add_time timestamp NULL 添加时间

3)订单表

Field Type Comment
order_id bigint(50) NOT NULL 订单ID
user_id bigint(50) NULL 用户ID
order_status int(1) NULL 订单状态 0未确认 1已确认 2已取消 3无效 4退款
pay_status int(1) NULL 支付状态 0未支付 1支付中 2已支付
shipping_status int(1) NULL 发货状态 0未发货 1已发货 2已退货
address varchar(255) NULL 收货地址
consignee varchar(255) NULL 收货人
goods_id bigint(50) NULL 商品ID
goods_number int(11) NULL 商品数量
goods_price decimal(10,2) NULL 商品价格
goods_amount decimal(10,0) NULL 商品总价
shipping_fee decimal(10,2) NULL 运费
order_amount decimal(10,2) NULL 订单价格
coupon_id bigint(50) NULL 优惠券ID
coupon_paid decimal(10,2) NULL 优惠券
money_paid decimal(10,2) NULL 已付金额
pay_amount decimal(10,2) NULL 支付金额
add_time timestamp NULL 创建时间
confirm_time timestamp NULL 订单确认时间
pay_time timestamp NULL 支付时间

4)订单商品日志表

Field Type Comment
goods_id int(11) NOT NULL 商品ID
order_id varchar(32) NOT NULL 订单ID
goods_number int(11) NULL 库存数量
log_time datetime NULL 记录时间

5)用户表

Field Type Comment
user_id bigint(50) NOT NULL 用户ID
user_name varchar(255) NULL 用户姓名
user_password varchar(255) NULL 用户密码
user_mobile varchar(255) NULL 手机号
user_score int(11) NULL 积分
user_reg_time timestamp NULL 注册时间
user_money decimal(10,0) NULL 用户余额

6)用户余额日志表

Field Type Comment
user_id bigint(50) NOT NULL 用户ID
order_id bigint(50) NOT NULL 订单ID
money_log_type int(1) NOT NULL 日志类型 1订单付款 2 订单退款
use_money decimal(10,2) NULL 操作金额
create_time timestamp NULL 日志时间

7)订单支付表

Field Type Comment
pay_id bigint(50) NOT NULL 支付编号
order_id bigint(50) NULL 订单编号
pay_amount decimal(10,2) NULL 支付金额
is_paid int(1) NULL 是否已支付 1否 2是

8)MQ消息生产表

Field Type Comment
id varchar(100) NOT NULL 主键
group_name varchar(100) NULL 生产者组名
msg_topic varchar(100) NULL 消息主题
msg_tag varchar(100) NULL Tag
msg_key varchar(100) NULL Key
msg_body varchar(500) NULL 消息内容
msg_status int(1) NULL 0:未处理;1:已经处理
create_time timestamp NOT NULL 记录时间

9)MQ消息消费表

Field Type Comment
msg_id varchar(50) NULL 消息ID
group_name varchar(100) NOT NULL 消费者组名
msg_tag varchar(100) NOT NULL Tag
msg_key varchar(100) NOT NULL Key
msg_body varchar(500) NULL 消息体
consumer_status int(1) NULL 0:正在处理;1:处理成功;2:处理失败
consumer_times int(1) NULL 消费次数
consumer_timestamp timestamp NULL 消费时间
remark varchar(500) NULL 备注

3.2 项目初始化

shop系统基于Maven进行项目管理

3.1.1 工程浏览

RocketMQ-02 - 图12

  • 父工程:shop-parent
  • 订单系统:shop-order-web
  • 支付系统:shop-pay-web
  • 优惠券服务:shop-coupon-service
  • 订单服务:shop-order-service
  • 支付服务:shop-pay-service
  • 商品服务:shop-goods-service
  • 用户服务:shop-user-service
  • 实体类:shop-pojo
  • 持久层:shop-dao
  • 接口层:shop-api
  • 工具工程:shop-common

共12个系统

3.1.2 工程关系

RocketMQ-02 - 图13

3.3 Mybatis逆向工程使用

1)代码生成

使用Mybatis逆向工程针对数据表生成CURD持久层代码
###2)代码导入

  • 将实体类导入到shop-pojo工程
  • 在服务层工程中导入对应的Mapper类和对应配置文件

    3.4 公共类介绍

  • ID生成器

  • IDWorker:Twitter雪花算法
  • 异常处理类
  • CustomerException:自定义异常类
  • CastException:异常抛出类
  • 常量类
  • ShopCode:系统状态类
  • 响应实体类
  • Result:封装响应状态和响应信息

    4. 下单业务

    RocketMQ-02 - 图14

4.1 下单基本流程

1)接口定义

  • IOrderService

public interface IOrderService {
/*
确认订单
@param order
@return Result
*/
Result confirmOrder(TradeOrder order);
}
###2)业务类实现
@Slf4j
@Component
@Service(interfaceClass = IOrderService.class)
public class OrderServiceImpl implements IOrderService {

  1. @Override<br /> public Result confirmOrder(TradeOrder order) {<br /> //1.校验订单
  2. //2.生成预订单
  3. try {<br /> //3.扣减库存
  4. //4.扣减优惠券
  5. //5.使用余额
  6. //6.确认订单
  7. //7.返回成功状态
  8. } catch (Exception e) {<br /> //1.确认订单失败,发送消息
  9. //2.返回失败状态<br /> }
  10. }<br />}<br />###3)校验订单<br />![](https://cdn.nlark.com/yuque/0/2021/png/251474/1623685592719-3eb96616-6386-4ce7-9860-99d24f323f56.png#align=left&display=inline&height=814&margin=%5Bobject%20Object%5D&originHeight=814&originWidth=166&status=done&style=none&width=166)

private void checkOrder(TradeOrder order) {
//1.校验订单是否存在
if(order==null){
CastException.cast(ShopCode.SHOP_ORDER_INVALID);
}
//2.校验订单中的商品是否存在
TradeGoods goods = goodsService.findOne(order.getGoodsId());
if(goods==null){
CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST);
}
//3.校验下单用户是否存在
TradeUser user = userService.findOne(order.getUserId());
if(user==null){
CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
}
//4.校验商品单价是否合法
if(order.getGoodsPrice().compareTo(goods.getGoodsPrice())!=0){
CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID);
}
//5.校验订单商品数量是否合法
if(order.getGoodsNumber()>=goods.getGoodsNumber()){
CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
}

  1. log.info("校验订单通过");<br />}<br />###4)生成预订单<br />![](https://cdn.nlark.com/yuque/0/2021/png/251474/1623685593391-a7202e59-b41f-4aea-8e41-c0ab2226ba96.png#align=left&display=inline&height=1668&margin=%5Bobject%20Object%5D&originHeight=1668&originWidth=448&status=done&style=none&width=448)

private Long savePreOrder(TradeOrder order) {
//1.设置订单状态为不可见
order.setOrderStatus(ShopCode.SHOP_ORDER_NO_CONFIRM.getCode());
//2.订单ID
order.setOrderId(idWorker.nextId());
//核算运费是否正确
BigDecimal shippingFee = calculateShippingFee(order.getOrderAmount());
if (order.getShippingFee().compareTo(shippingFee) != 0) {
CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID);
}
//3.计算订单总价格是否正确
BigDecimal orderAmount = order.getGoodsPrice().multiply(new BigDecimal(order.getGoodsNumber()));
orderAmount.add(shippingFee);
if (orderAmount.compareTo(order.getOrderAmount()) != 0) {
CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID);
}

  1. //4.判断优惠券信息是否合法<br /> Long couponId = order.getCouponId();<br /> if (couponId != null) {<br /> TradeCoupon coupon = couponService.findOne(couponId);<br /> //优惠券不存在<br /> if (coupon == null) {<br /> CastException.cast(ShopCode.SHOP_COUPON_NO_EXIST);<br /> }<br /> //优惠券已经使用<br /> if ((ShopCode.SHOP_COUPON_ISUSED.getCode().toString())<br /> .equals(coupon.getIsUsed().toString())) {<br /> CastException.cast(ShopCode.SHOP_COUPON_INVALIED);<br /> }<br /> order.setCouponPaid(coupon.getCouponPrice());<br /> } else {<br /> order.setCouponPaid(BigDecimal.ZERO);<br /> }
  2. //5.判断余额是否正确<br /> BigDecimal moneyPaid = order.getMoneyPaid();<br /> if (moneyPaid != null) {<br /> //比较余额是否大于0<br /> int r = order.getMoneyPaid().compareTo(BigDecimal.ZERO);<br /> //余额小于0<br /> if (r == -1) {<br /> CastException.cast(ShopCode.SHOP_MONEY_PAID_LESS_ZERO);<br /> }<br /> //余额大于0<br /> if (r == 1) {<br /> //查询用户信息<br /> TradeUser user = userService.findOne(order.getUserId());<br /> if (user == null) {<br /> CastException.cast(ShopCode.SHOP_USER_NO_EXIST);<br /> }<br /> //比较余额是否大于用户账户余额<br /> if (user.getUserMoney().compareTo(order.getMoneyPaid().longValue()) == -1) {<br /> CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);<br /> }<br /> order.setMoneyPaid(order.getMoneyPaid());<br /> }<br /> } else {<br /> order.setMoneyPaid(BigDecimal.ZERO);<br /> }<br /> //计算订单支付总价<br /> order.setPayAmount(orderAmount.subtract(order.getCouponPaid())<br /> .subtract(order.getMoneyPaid()));<br /> //设置订单添加时间<br /> order.setAddTime(new Date());
  3. //保存预订单<br /> int r = orderMapper.insert(order);<br /> if (ShopCode.SHOP_SUCCESS.getCode() != r) {<br /> CastException.cast(ShopCode.SHOP_ORDER_SAVE_ERROR);<br /> }<br /> log.info("订单:["+order.getOrderId()+"]预订单生成成功");<br /> return order.getOrderId();<br />}<br />###5)扣减库存
  • 通过dubbo调用商品服务完成扣减库存

private void reduceGoodsNum(TradeOrder order) {
TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
goodsNumberLog.setGoodsId(order.getGoodsId());
goodsNumberLog.setOrderId(order.getOrderId());
goodsNumberLog.setGoodsNumber(order.getGoodsNumber());
Result result = goodsService.reduceGoodsNum(goodsNumberLog);
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);
}
log.info(“订单:[“+order.getOrderId()+”]扣减库存[“+order.getGoodsNumber()+”个]成功”);
}

  • 商品服务GoodsService扣减库存

@Override
public Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog) {
if (goodsNumberLog == null ||
goodsNumberLog.getGoodsNumber() == null ||
goodsNumberLog.getOrderId() == null ||
goodsNumberLog.getGoodsNumber() == null ||
goodsNumberLog.getGoodsNumber().intValue() <= 0) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsNumberLog.getGoodsId());
if(goods.getGoodsNumber() //库存不足
CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
}
//减库存
goods.setGoodsNumber(goods.getGoodsNumber()-goodsNumberLog.getGoodsNumber());
goodsMapper.updateByPrimaryKey(goods);

  1. //记录库存操作日志<br /> goodsNumberLog.setGoodsNumber(-(goodsNumberLog.getGoodsNumber()));<br /> goodsNumberLog.setLogTime(new Date());<br /> goodsNumberLogMapper.insert(goodsNumberLog);
  2. return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());<br />}<br />###6)扣减优惠券
  • 通过dubbo完成扣减优惠券

private void changeCoponStatus(TradeOrder order) {
//判断用户是否使用优惠券
if (!StringUtils.isEmpty(order.getCouponId())) {
//封装优惠券对象
TradeCoupon coupon = couponService.findOne(order.getCouponId());
coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());
coupon.setUsedTime(new Date());
coupon.setOrderId(order.getOrderId());
Result result = couponService.changeCouponStatus(coupon);
//判断执行结果
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
//优惠券使用失败
CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);
}
log.info(“订单:[“+order.getOrderId()+”]使用扣减优惠券[“+coupon.getCouponPrice()+”元]成功”);
}

}

  • 优惠券服务CouponService更改优惠券状态

@Override
public Result changeCouponStatus(TradeCoupon coupon) {
try {
//判断请求参数是否合法
if (coupon == null || StringUtils.isEmpty(coupon.getCouponId())) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
//更新优惠券状态为已使用
couponMapper.updateByPrimaryKey(coupon);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
} catch (Exception e) {
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
###7)扣减用户余额

  • 通过用户服务完成扣减余额

private void reduceMoneyPaid(TradeOrder order) {
//判断订单中使用的余额是否合法
if (order.getMoneyPaid() != null && order.getMoneyPaid().compareTo(BigDecimal.ZERO) == 1) {
TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
userMoneyLog.setOrderId(order.getOrderId());
userMoneyLog.setUserId(order.getUserId());
userMoneyLog.setUseMoney(order.getMoneyPaid());
userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());
//扣减余额
Result result = userService.changeUserMoney(userMoneyLog);
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);
}
log.info(“订单:[“+order.getOrderId()+”扣减余额[“+order.getMoneyPaid()+”元]成功]”);
}
}

  • 用户服务UserService,更新余额

RocketMQ-02 - 图15

@Override
public Result changeUserMoney(TradeUserMoneyLog userMoneyLog) {
//判断请求参数是否合法
if (userMoneyLog == null
|| userMoneyLog.getUserId() == null
|| userMoneyLog.getUseMoney() == null
|| userMoneyLog.getOrderId() == null
|| userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO) <= 0) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}

  1. //查询该订单是否存在付款记录<br /> TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample();<br /> userMoneyLogExample.createCriteria()<br /> .andUserIdEqualTo(userMoneyLog.getUserId())<br /> .andOrderIdEqualTo(userMoneyLog.getOrderId());<br /> int count = userMoneyLogMapper.countByExample(userMoneyLogExample);<br /> TradeUser tradeUser = new TradeUser();<br /> tradeUser.setUserId(userMoneyLog.getUserId());<br /> tradeUser.setUserMoney(userMoneyLog.getUseMoney().longValue());<br /> //判断余额操作行为<br /> //【付款操作】<br /> if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_PAID.getCode())) {<br /> //订单已经付款,则抛异常<br /> if (count > 0) {<br /> CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);<br /> }<br /> //用户账户扣减余额<br /> userMapper.reduceUserMoney(tradeUser);<br /> }<br /> //【退款操作】<br /> if (userMoneyLog.getMoneyLogType().equals(ShopCode.SHOP_USER_MONEY_REFUND.getCode())) {<br /> //如果订单未付款,则不能退款,抛异常<br /> if (count == 0) {<br /> CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY);<br /> }<br /> //防止多次退款<br /> userMoneyLogExample = new TradeUserMoneyLogExample();<br /> userMoneyLogExample.createCriteria()<br /> .andUserIdEqualTo(userMoneyLog.getUserId())<br /> .andOrderIdEqualTo(userMoneyLog.getOrderId())<br /> .andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode());<br /> count = userMoneyLogMapper.countByExample(userMoneyLogExample);<br /> if (count > 0) {<br /> CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY);<br /> }<br /> //用户账户添加余额<br /> userMapper.addUserMoney(tradeUser);<br /> }
  2. //记录用户使用余额日志<br /> userMoneyLog.setCreateTime(new Date());<br /> userMoneyLogMapper.insert(userMoneyLog);<br /> return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());<br />}<br />###8)确认订单 <br />private void updateOrderStatus(TradeOrder order) {<br /> order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());<br /> order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());<br /> order.setConfirmTime(new Date());<br /> int r = orderMapper.updateByPrimaryKey(order);<br /> if (r <= 0) {<br /> CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);<br /> }<br /> log.info("订单:["+order.getOrderId()+"]状态修改成功");<br />}

9)小结

@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单
checkOrder(order);
//2.生成预订单
Long orderId = savePreOrder(order);
order.setOrderId(orderId);
try {
//3.扣减库存
reduceGoodsNum(order);
//4.扣减优惠券
changeCoponStatus(order);
//5.使用余额
reduceMoneyPaid(order);
//6.确认订单
updateOrderStatus(order);
log.info(“订单:[“+orderId+”]确认成功”);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
} catch (Exception e) {
//确认订单失败,发送消息

return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}

4.2 失败补偿机制

4.2.1 消息发送方

  • 配置RocketMQ属性值

rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroup

mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.confirm=order_confirm
mq.order.tag.cancel=order_cancel

  • 注入模板类和属性值信息

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value(“${mq.order.topic}”)
    private String topic;

    @Value(“${mq.order.tag.cancel}”)
    private String cancelTag;

  • 发送下单失败消息

@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单
//2.生成预订
try {
//3.扣减库存
//4.扣减优惠券
//5.使用余额
//6.确认订单
} catch (Exception e) {
//确认订单失败,发送消息
CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();
cancelOrderMQ.setOrderId(order.getOrderId());
cancelOrderMQ.setCouponId(order.getCouponId());
cancelOrderMQ.setGoodsId(order.getGoodsId());
cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());
cancelOrderMQ.setUserId(order.getUserId());
cancelOrderMQ.setUserMoney(order.getMoneyPaid());
try {
sendMessage(topic,
cancelTag,
cancelOrderMQ.getOrderId().toString(),
JSON.toJSONString(cancelOrderMQ));
} catch (Exception e1) {
e1.printStackTrace();
CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);
}
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
private void sendMessage(String topic, String tags, String keys, String body) throws Exception {
//判断Topic是否为空
if (StringUtils.isEmpty(topic)) {
CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
}
//判断消息内容是否为空
if (StringUtils.isEmpty(body)) {
CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
}
//消息体
Message message = new Message(topic, tags, keys, body.getBytes());
//发送消息
rocketMQTemplate.getProducer().send(message);
}

4.2.2 消费接收方

  • 配置RocketMQ属性值

rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic

  • 创建监听类,消费消息

@Slf4j
@Component
@RocketMQMessageListener(topic = “${mq.order.topic}”,
consumerGroup = “${mq.order.consumer.group.name}”,
messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener{

  1. @Override<br /> public void onMessage(MessageExt messageExt) {<br /> ...<br /> }<br />}

1)回退库存

  • 流程分析

RocketMQ-02 - 图16

  • 消息消费者

@Slf4j
@Component
@RocketMQMessageListener(topic = “${mq.order.topic}”,consumerGroup = “${mq.order.consumer.group.name}”,messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener{

  1. @Value("${mq.order.consumer.group.name}")<br /> private String groupName;
  2. @Autowired<br /> private TradeGoodsMapper goodsMapper;
  3. @Autowired<br /> private TradeMqConsumerLogMapper mqConsumerLogMapper;
  4. @Autowired<br /> private TradeGoodsNumberLogMapper goodsNumberLogMapper;
  5. @Override<br /> public void onMessage(MessageExt messageExt) {<br /> String msgId=null;<br /> String tags=null;<br /> String keys=null;<br /> String body=null;<br /> try {<br /> //1. 解析消息内容<br /> msgId = messageExt.getMsgId();<br /> tags= messageExt.getTags();<br /> keys= messageExt.getKeys();<br /> body= new String(messageExt.getBody(),"UTF-8");
  6. log.info("接受消息成功");
  7. //2. 查询消息消费记录<br /> TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();<br /> primaryKey.setMsgTag(tags);<br /> primaryKey.setMsgKey(keys);<br /> primaryKey.setGroupName(groupName);<br /> TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
  8. if(mqConsumerLog!=null){<br /> //3. 判断如果消费过...<br /> //3.1 获得消息处理状态<br /> Integer status = mqConsumerLog.getConsumerStatus();<br /> //处理过...返回<br /> if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){<br /> log.info("消息:"+msgId+",已经处理过");<br /> return;<br /> }
  9. //正在处理...返回<br /> if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){<br /> log.info("消息:"+msgId+",正在处理");<br /> return;<br /> }
  10. //处理失败<br /> if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){<br /> //获得消息处理次数<br /> Integer times = mqConsumerLog.getConsumerTimes();<br /> if(times>3){<br /> log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");<br /> return;<br /> }<br /> mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
  11. //使用数据库乐观锁更新<br /> TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();<br /> TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();<br /> criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());<br /> criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());<br /> criteria.andGroupNameEqualTo(groupName);<br /> criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());<br /> int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);<br /> if(r<=0){<br /> //未修改成功,其他线程并发修改<br /> log.info("并发修改,稍后处理");<br /> }<br /> }
  12. }else{<br /> //4. 判断如果没有消费过...<br /> mqConsumerLog = new TradeMqConsumerLog();<br /> mqConsumerLog.setMsgTag(tags);<br /> mqConsumerLog.setMsgKey(keys);<br /> mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());<br /> mqConsumerLog.setMsgBody(body);<br /> mqConsumerLog.setMsgId(msgId);<br /> mqConsumerLog.setConsumerTimes(0);
  13. //将消息处理信息添加到数据库<br /> mqConsumerLogMapper.insert(mqConsumerLog);<br /> }<br /> //5. 回退库存<br /> MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);<br /> Long goodsId = mqEntity.getGoodsId();<br /> TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);<br /> goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());<br /> goodsMapper.updateByPrimaryKey(goods);
  14. //记录库存操作日志<br /> TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();<br /> goodsNumberLog.setOrderId(mqEntity.getOrderId());<br /> goodsNumberLog.setGoodsId(goodsId);<br /> goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());<br /> goodsNumberLog.setLogTime(new Date());<br /> goodsNumberLogMapper.insert(goodsNumberLog);
  15. //6. 将消息的处理状态改为成功<br /> mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());<br /> mqConsumerLog.setConsumerTimestamp(new Date());<br /> mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);<br /> log.info("回退库存成功");<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();<br /> primaryKey.setMsgTag(tags);<br /> primaryKey.setMsgKey(keys);<br /> primaryKey.setGroupName(groupName);<br /> TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);<br /> if(mqConsumerLog==null){<br /> //数据库未有记录<br /> mqConsumerLog = new TradeMqConsumerLog();<br /> mqConsumerLog.setMsgTag(tags);<br /> mqConsumerLog.setMsgKey(keys);<br /> mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());<br /> mqConsumerLog.setMsgBody(body);<br /> mqConsumerLog.setMsgId(msgId);<br /> mqConsumerLog.setConsumerTimes(1);<br /> mqConsumerLogMapper.insert(mqConsumerLog);<br /> }else{<br /> mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);<br /> mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);<br /> }<br /> }
  16. }<br />}

2)回退优惠券

@Slf4j
@Component
@RocketMQMessageListener(topic = “${mq.order.topic}”,consumerGroup = “${mq.order.consumer.group.name}”,messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener{

  1. @Autowired<br /> private TradeCouponMapper couponMapper;
  2. @Override<br /> public void onMessage(MessageExt message) {
  3. try {<br /> //1. 解析消息内容<br /> String body = new String(message.getBody(), "UTF-8");<br /> MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);<br /> log.info("接收到消息");<br /> //2. 查询优惠券信息<br /> TradeCoupon coupon = couponMapper.selectByPrimaryKey(mqEntity.getCouponId());<br /> //3.更改优惠券状态<br /> coupon.setUsedTime(null);<br /> coupon.setIsUsed(ShopCode.SHOP_COUPON_UNUSED.getCode());<br /> coupon.setOrderId(null);<br /> couponMapper.updateByPrimaryKey(coupon);<br /> log.info("回退优惠券成功");<br /> } catch (UnsupportedEncodingException e) {<br /> e.printStackTrace();<br /> log.error("回退优惠券失败");<br /> }
  4. }<br />}

3)回退余额

@Slf4j
@Component
@RocketMQMessageListener(topic = “${mq.order.topic}”,consumerGroup = “${mq.order.consumer.group.name}”,messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener{

  1. @Autowired<br /> private IUserService userService;
  2. @Override<br /> public void onMessage(MessageExt messageExt) {
  3. try {<br /> //1.解析消息<br /> String body = new String(messageExt.getBody(), "UTF-8");<br /> MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);<br /> log.info("接收到消息");<br /> if(mqEntity.getUserMoney()!=null && mqEntity.getUserMoney().compareTo(BigDecimal.ZERO)>0){<br /> //2.调用业务层,进行余额修改<br /> TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();<br /> userMoneyLog.setUseMoney(mqEntity.getUserMoney());<br /> userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_REFUND.getCode());<br /> userMoneyLog.setUserId(mqEntity.getUserId());<br /> userMoneyLog.setOrderId(mqEntity.getOrderId());<br /> userService.updateMoneyPaid(userMoneyLog);<br /> log.info("余额回退成功");<br /> }<br /> } catch (UnsupportedEncodingException e) {<br /> e.printStackTrace();<br /> log.error("余额回退失败");<br /> }
  4. }<br />}

4)取消订单

@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), “UTF-8”);
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
log.info(“CancelOrderProcessor receive message:”+messageExt);
CancelOrderMQ cancelOrderMQ = JSON.parseObject(body, CancelOrderMQ.class);
TradeOrder order = orderService.findOne(cancelOrderMQ.getOrderId());
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
orderService.changeOrderStatus(order);
log.info(“订单:[“+order.getOrderId()+”]状态设置为取消”);
return order;
}

4.3 测试

1)准备测试环境

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ShopOrderServiceApplication.class)
public class OrderTest {

  1. @Autowired<br /> private IOrderService orderService;<br />}<br />###1)准备测试数据
  • 用户数据
  • 商品数据
  • 优惠券数据

2)测试下单成功流程
@Test
public void add(){
Long goodsId=XXXL;
Long userId=XXXL;
Long couponId=XXXL;

  1. TradeOrder order = new TradeOrder();<br /> order.setGoodsId(goodsId);<br /> order.setUserId(userId);<br /> order.setGoodsNumber(1);<br /> order.setAddress("北京");<br /> order.setGoodsPrice(new BigDecimal("5000"));<br /> order.setOrderAmount(new BigDecimal("5000"));<br /> order.setMoneyPaid(new BigDecimal("100"));<br /> order.setCouponId(couponId);<br /> order.setShippingFee(new BigDecimal(0));<br /> orderService.confirmOrder(order);<br />}<br />执行完毕后,查看数据库中用户的余额、优惠券数据,及订单的状态数据<br />###3)测试下单失败流程<br />代码同上。<br />执行完毕后,查看用户的余额、优惠券数据是否发生更改,订单的状态是否为取消。

5. 支付业务

5.1 创建支付订单

RocketMQ-02 - 图17

public Result createPayment(TradePay tradePay) {
//查询订单支付状态
try {
TradePayExample payExample = new TradePayExample();
TradePayExample.Criteria criteria = payExample.createCriteria();
criteria.andOrderIdEqualTo(tradePay.getOrderId());
criteria.andIsPaidEqualTo(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int count = tradePayMapper.countByExample(payExample);
if (count > 0) {
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
}

  1. long payId = idWorker.nextId();<br /> tradePay.setPayId(payId);<br /> tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());<br /> tradePayMapper.insert(tradePay);<br /> log.info("创建支付订单成功:" + payId);<br /> } catch (Exception e) {<br /> return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());<br /> }<br /> return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());<br />}

5.2 支付回调

5.2.1 流程分析

RocketMQ-02 - 图18

5.2.2 代码实现

public Result callbackPayment(TradePay tradePay) {

  1. if (tradePay.getIsPaid().equals(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode())) {<br /> tradePay = tradePayMapper.selectByPrimaryKey(tradePay.getPayId());<br /> if (tradePay == null) {<br /> CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);<br /> }<br /> tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());<br /> int i = tradePayMapper.updateByPrimaryKeySelective(tradePay);<br /> //更新成功代表支付成功<br /> if (i == 1) {<br /> TradeMqProducerTemp mqProducerTemp = new TradeMqProducerTemp();<br /> mqProducerTemp.setId(String.valueOf(idWorker.nextId()));<br /> mqProducerTemp.setGroupName("payProducerGroup");<br /> mqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));<br /> mqProducerTemp.setMsgTag(topic);<br /> mqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));<br /> mqProducerTemp.setCreateTime(new Date());<br /> mqProducerTempMapper.insert(mqProducerTemp);<br /> TradePay finalTradePay = tradePay;<br /> executorService.submit(new Runnable() {<br /> @Override<br /> public void run() {<br /> try {<br /> SendResult sendResult = sendMessage(topic, <br /> tag, <br /> finalTradePay.getPayId(), <br /> JSON.toJSONString(finalTradePay));<br /> log.info(JSON.toJSONString(sendResult));<br /> if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {<br /> mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());<br /> System.out.println("删除消息表成功");<br /> }<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> }<br /> }<br /> });<br /> } else {<br /> CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);<br /> }<br /> }<br /> return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());<br />}

线程池优化消息发送逻辑

  • 创建线程池对象

@Bean
public ThreadPoolTaskExecutor getThreadPool() {

  1. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  2. executor.setCorePoolSize(4);
  3. executor.setMaxPoolSize(8);
  4. executor.setQueueCapacity(100);
  5. executor.setKeepAliveSeconds(60);
  6. executor.setThreadNamePrefix("Pool-A");
  7. executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  8. executor.initialize();
  9. return executor;

}

  • 使用线程池

@Autowired
private ThreadPoolTaskExecutor executorService;

executorService.submit(new Runnable() {
@Override
public void run() {
try {
SendResult sendResult = sendMessage(topic, tag, finalTradePay.getPayId(), JSON.toJSONString(finalTradePay));
log.info(JSON.toJSONString(sendResult));
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
System.out.println(“删除消息表成功”);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});

5.2.3

处理消息

支付成功后,支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行处理

  1. 订单服务修改订单状态为已支付
  2. 日志服务记录支付日志
  3. 用户服务负责给用户增加积分

以下用订单服务为例说明消息的处理情况

1)配置RocketMQ属性值

mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group

2)消费消息

  • 在订单服务中,配置公共的消息处理类

public class BaseConsumer {

  1. public TradeOrder handleMessage(IOrderService <br /> orderService, <br /> MessageExt messageExt,Integer code) throws Exception {<br /> //解析消息内容<br /> String body = new String(messageExt.getBody(), "UTF-8");<br /> String msgId = messageExt.getMsgId();<br /> String tags = messageExt.getTags();<br /> String keys = messageExt.getKeys();<br /> OrderMQ orderMq = JSON.parseObject(body, OrderMQ.class);
  2. //查询<br /> TradeOrder order = orderService.findOne(orderMq.getOrderId());
  3. if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_CANCEL.getCode().equals(code)){<br /> order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());<br /> }
  4. if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode().equals(code)){<br /> order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());<br /> }<br /> orderService.changeOrderStatus(order);<br /> return order;<br /> }

}

  • 接受订单支付成功消息

@Slf4j
@Component
@RocketMQMessageListener(topic = “${mq.pay.topic}”,
consumerGroup = “${mq.pay.consumer.group.name}”)
public class PayConsumer extends BaseConsumer implements RocketMQListener {

  1. @Autowired<br /> private IOrderService orderService;
  2. @Override<br /> public void onMessage(MessageExt messageExt) {<br /> try {<br /> log.info("CancelOrderProcessor receive message:"+messageExt);<br /> TradeOrder order = handleMessage(orderService, <br /> messageExt, <br /> ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode());<br /> log.info("订单:["+order.getOrderId()+"]支付成功");<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> log.error("订单支付失败");<br /> }<br /> }<br />}

6. 整体联调

通过Rest客户端请求shop-order-web和shop-pay-web完成下单和支付操作

6.1 准备工作

1)配置RestTemplate类

@Configuration
public class RestTemplateConfig {

  1. @Bean<br /> @ConditionalOnMissingBean({ RestOperations.class, RestTemplate.class })<br /> public RestTemplate restTemplate(ClientHttpRequestFactory factory) {
  2. RestTemplate restTemplate = new RestTemplate(factory);
  3. // 使用 utf-8 编码集的 conver 替换默认的 conver(默认的 string conver 的编码集为"ISO-8859-1")<br /> List<HttpMessageConverter<?>> messageConverters = restTemplate.getMessageConverters();<br /> Iterator<HttpMessageConverter<?>> iterator = messageConverters.iterator();<br /> while (iterator.hasNext()) {<br /> HttpMessageConverter<?> converter = iterator.next();<br /> if (converter instanceof StringHttpMessageConverter) {<br /> iterator.remove();<br /> }<br /> }<br /> messageConverters.add(new StringHttpMessageConverter(Charset.forName("UTF-8")));
  4. return restTemplate;<br /> }
  5. @Bean<br /> @ConditionalOnMissingBean({ClientHttpRequestFactory.class})<br /> public ClientHttpRequestFactory simpleClientHttpRequestFactory() {<br /> SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();<br /> // ms<br /> factory.setReadTimeout(15000);<br /> // ms<br /> factory.setConnectTimeout(15000);<br /> return factory;<br /> }<br />}

2)配置请求地址

  • 订单系统

server.host=http://localhost
server.servlet.path=/order-web
server.port=8080
shop.order.baseURI=${server.host}:${server.port}${server.servlet.path}
shop.order.confirm=/order/confirm

  • 支付系统

server.host=http://localhost
server.servlet.path=/pay-web
server.port=9090
shop.pay.baseURI=${server.host}:${server.port}${server.servlet.path}
shop.pay.createPayment=/pay/createPayment
shop.pay.callbackPayment=/pay/callbackPayment

6.2 下单测试

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopOrderWebApplication.class)
@TestPropertySource(“classpath:application.properties”)
public class OrderTest {

  1. @Autowired<br /> private RestTemplate restTemplate;
  2. @Value("${shop.order.baseURI}")<br /> private String baseURI;
  3. @Value("${shop.order.confirm}")<br /> private String confirmOrderPath;
  4. @Autowired<br /> private IDWorker idWorker;

/*
下单
*/
@Test
public void confirmOrder(){
Long goodsId=XXXL;
Long userId=XXXL;
Long couponId=XXXL;

  1. TradeOrder order = new TradeOrder();<br /> order.setGoodsId(goodsId);<br /> order.setUserId(userId);<br /> order.setGoodsNumber(1);<br /> order.setAddress("北京");<br /> order.setGoodsPrice(new BigDecimal("5000"));<br /> order.setOrderAmount(new BigDecimal("5000"));<br /> order.setMoneyPaid(new BigDecimal("100"));<br /> order.setCouponId(couponId);<br /> order.setShippingFee(new BigDecimal(0));
  2. Result result = restTemplate.postForEntity(baseURI + confirmOrderPath, order, Result.class).getBody();<br /> System.out.println(result);<br /> }

}

6.3 支付测试

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ShopPayWebApplication.class)
@TestPropertySource(“classpath:application.properties”)
public class PayTest {

  1. @Autowired<br /> private RestTemplate restTemplate;
  2. @Value("${shop.pay.baseURI}")<br /> private String baseURI;
  3. @Value("${shop.pay.createPayment}")<br /> private String createPaymentPath;
  4. @Value("${shop.pay.callbackPayment}")<br /> private String callbackPaymentPath;
  5. @Autowired<br /> private IDWorker idWorker;

/*
创建支付订单
*/
@Test
public void createPayment(){

  1. Long orderId = 346321587315814400L;<br /> TradePay pay = new TradePay();<br /> pay.setOrderId(orderId);<br /> pay.setPayAmount(new BigDecimal(4800));
  2. Result result = restTemplate.postForEntity(baseURI + createPaymentPath, pay, Result.class).getBody();<br /> System.out.println(result);<br /> }
  3. /**<br /> * 支付回调<br /> */<br /> @Test<br /> public void callbackPayment(){<br /> Long payId = 346321891507720192L;<br /> TradePay pay = new TradePay();<br /> pay.setPayId(payId);<br /> pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());<br /> Result result = restTemplate.postForEntity(baseURI + callbackPaymentPath, pay, Result.class).getBody();<br /> System.out.println(result);
  4. }

}