简介
订单中心
电商系统涉及到3流,分别时信息流,资金流,物流,而订单系统作为中枢将三者有机的集合起来。订单模块是电商系统的枢纽,在订单这个环节上需求获取多个模块的数据和信息,同时对这些信息进行加工处理后流向下个环节,这一系列就构成了订单的信息流通。
订单构成
流程
拦截器
订单模块需要用户登录后操作
步骤:
- 添加拦截器
@Component
public class LoginUserInterceptor implements HandlerInterceptor {
public static ThreadLocal<MemberRespVo> threadLocal = new ThreadLocal<>();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
HttpSession session = request.getSession();
MemberRespVo memberRespVo = (MemberRespVo) session.getAttribute(AuthConstant.LOGIN_USER);
if (memberRespVo != null) {
// 保存到ThreadLocal中
threadLocal.set(memberRespVo);
return true;
}else {
// 设置提示
request.getSession().setAttribute("msg", "请先登录!");
// 重定向去登录
response.sendRedirect("http://auth.gulimalls.com/login.html");
return false;
}
}
}
- 添加配置使拦截器生效
@Configuration
public class MyWebConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginUserInterceptor()).addPathPatterns("/**");
}
}
确认订单页流程
订单模型抽取
/**
* 结算页VO(confirm.html需要的页面数据)
*
* @author: wan
*/
public class OrderConfirmVO {
/**
* 会员收获地址列表,ums_member_receive_address
**/
@Getter
@Setter
List<MemberAddressVO> memberAddressVos;
/**
* 所有选中的购物项【购物车中的选中项】
**/
@Getter
@Setter
List<OrderItemVO> items;
/**
* 优惠券(会员积分)
**/
@Getter
@Setter
private Integer integration;
/**
* TODO 防止重复提交的令牌 幂等性
**/
@Getter
@Setter
private String uniqueToken;
/**
* 库存
* 有货/无货,不放在item里面
*/
@Getter
@Setter
Map<Long, Boolean> stocks;
/**
* 总商品金额
**/
//BigDecimal total;
//计算订单总额
public BigDecimal getTotal() {
BigDecimal totalNum = BigDecimal.ZERO;
if (!CollectionUtils.isEmpty(items)) {
for (OrderItemVO item : items) {
// 计算当前商品总价格
BigDecimal itemPrice = item.getPrice().multiply(new BigDecimal(item.getCount()));
// 累加全部商品总价格
totalNum = totalNum.add(itemPrice);
}
}
return totalNum;
}
/**
* 应付总额
**/
//BigDecimal payPrice;
public BigDecimal getPayPrice() {
return getTotal();
}
/**
* 商品总数
*/
public Integer getCount() {
Integer count = 0;
if (!CollectionUtils.isEmpty(items)) {
for (OrderItemVO item : items) {
count += item.getCount();
}
}
return count;
}
}
用户地址
/**
* 收获地址,结算页VO
* @author: wan
*/
@Data
public class MemberAddressVO {
private Long id;
/**
* member_id
*/
private Long memberId;
/**
* 收货人姓名
*/
private String name;
/**
* 电话
*/
private String phone;
/**
* 邮政编码
*/
private String postCode;
/**
* 省份/直辖市
*/
private String province;
/**
* 城市
*/
private String city;
/**
* 区
*/
private String region;
/**
* 详细地址(街道)
*/
private String detailAddress;
/**
* 省市区代码
*/
private String areacode;
/**
* 是否默认
*/
private Integer defaultStatus;
}
勾选了的购物项
/**
* @Description: 购物车所有购物项
*/
@Data
public class OrderItemVO {
private Long skuId; // skuId
private Boolean check = true; // 是否选中
private String title; // 标题
private String image; // 图片
private List<String> skuAttrValues;// 商品销售属性 ["颜色:星河银","版本:8GB+256GB"]
private BigDecimal price; // 单价
private Integer count; // 当前商品数量
private BigDecimal totalPrice; // 总价
private BigDecimal weight = new BigDecimal("0.085");// 商品重量
}
业务逻辑
Controller
@Controller
public class OrderWebController {
@Autowired
private OrderService orderService;
@GetMapping("/toTrade")
public String toTrade(Model model) {
OrderConfirmVO orderConfirmVO = orderService.getOrderConfirm();
model.addAttribute("confirmOrderData", orderConfirmVO);
return "confirm";
}
}
接口实现类
@Override
public OrderConfirmVO getOrderConfirm() {
OrderConfirmVO orderConfirmVO = new OrderConfirmVO();
MemberRespVo member = LoginUserInterceptor.threadLocal.get();
// 远程调用查用户的地址
List<MemberAddressVO> memberAddress = memberServiceClient.getMemberAddress(member.getId());
orderConfirmVO.setMemberAddressVos(memberAddress);
// 远程调用查用户选中了的购物项
List<OrderItemVO> cartItems = cartServiceClient.getCartItems();
orderConfirmVO.setItems(cartItems);
// 优惠信息在用户信息中已经保存
orderConfirmVO.setIntegration(member.getIntegration());
// 其他数据计算
// TODO 放重令牌
return orderConfirmVO;
}
注意此时调用远程调用了购物车模块查询购物项
@Override
public List<CartItemVo> getCartItems() {
UserInfoTo userInfoTo = CartInterceptor.threadLocal.get();
if (userInfoTo.getUserId() == null) {
return null;
} else {
// 从redis中查出
List<CartItemVo> cartItems = getCartItems(CartConstant.CART_PREFIX + userInfoTo.getUserId());
// 过滤出选中的,且更新最新的价格
List<CartItemVo> collect = cartItems.stream().filter(CartItemVo::getCheck).map((item) -> {
// 远程调用查询
BigDecimal skuPrice = productFeignService.getSkuPrice(item.getSkuId());
item.setPrice(skuPrice);
return item;
}).collect(Collectors.toList());
return collect;
}
}
此时会从threadLocal中取出user-key,但是此时在cart服务的拦截器会先对feign请求进行拦截,但此时请求却没有cookie信息。这就是feign调用不携带请求头的问题。
Feign远程调用丢失请求头问题
原因
主要原因还是feign调用时,会创建一个新请求来调用。
此时配置拦截器,注入到容器中,feign创建的新请求会被拦截器拦截。
Feign底层
feign底层可以注册多个拦截器处理请求
这个方法会遍历注册的拦截器,并调用拦截器的apply方法执行
拦截器接口
所以我们要返回这个接口的实例,注入到容器中。
配置拦截器
@Configuration
public class GuliFeignConfig {
@Bean("requestInterceptor")
public RequestInterceptor requestInterceptor() {
return new RequestInterceptor() {
/**
* 重写apply方法
* @param requestTemplate 新请求
*/
@Override
public void apply(RequestTemplate requestTemplate) {
// 获取原来的请求
// 拿到刚进来的请求RequestContextHolder
// ServletRequestAttributes是spring封装的,AbstractRequestAttributes(RequestContextHolder返回类型)子类
// 如果不使用的化,也可以通过在controller中接收request,并通过ThreadLocal也可以共享,只是这里spring提供了可以直接获取的类
ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = servletRequestAttributes.getRequest();
// 同步请求cookie,老请求放到新请求中
// 所有Cookie被封装在请求头中,所以吧Cookie放到新请求即可
String cookie = request.getHeader("Cookie");
requestTemplate.header("Cookie", cookie);
}
};
}
}
异步优化
使用异步编排优化业务执行,线程池的配置在其他服务已经配置过,复制配置即可。
@Override
public OrderConfirmVO getOrderConfirm() throws ExecutionException, InterruptedException {
OrderConfirmVO orderConfirmVO = new OrderConfirmVO();
MemberRespVo member = LoginUserInterceptor.threadLocal.get();
// 先从父线程获取原请求
RequestAttributes request = RequestContextHolder.getRequestAttributes();
CompletableFuture<Void> getAddress = CompletableFuture.runAsync(() -> {
// 在子线程调用时先共享请求为父线程的请求
RequestContextHolder.setRequestAttributes(request);
// 远程调用查用户的地址
List<MemberAddressVO> memberAddress = memberServiceClient.getMemberAddress(member.getId());
orderConfirmVO.setMemberAddressVos(memberAddress);
}, threadPoolExecutor);
CompletableFuture<Void> getItem = CompletableFuture.runAsync(() -> {
// 在子线程调用时先共享请求为父线程的请求
RequestContextHolder.setRequestAttributes(request);
// 远程调用查用户选中了的购物项
List<OrderItemVO> cartItems = cartServiceClient.getCartItems();
orderConfirmVO.setItems(cartItems);
}, threadPoolExecutor).thenRunAsync(() -> {
// 远程调用查询库存信息
// 先收集skuId
List<OrderItemVO> items = orderConfirmVO.getItems();
List<Long> skuIdList = items.stream().map(orderItemVo -> orderItemVo.getSkuId()).collect(Collectors.toList());
R r = wmsServiceClient.getSkuHasStock(skuIdList);
List<SkuHasStockTo> data = r.getData(new TypeReference<List<SkuHasStockTo>>() {
});
// 封装库存信息
if (data != null) {
Map<Long, Boolean> collect = data.stream().collect(Collectors.toMap(SkuHasStockTo::getSkuId, SkuHasStockTo::getHasStock));
orderConfirmVO.setStocks(collect);
}
});
// 优惠信息在用户信息中已经保存
orderConfirmVO.setIntegration(member.getIntegration());
// 其他数据计算
// TODO 放重令牌
// 等待2个异步任务
CompletableFuture.allOf(getAddress, getItem).get();
return orderConfirmVO;
}
此时又会遇到问题,就是在两个线程中的请求不共享,2个线程分别调用远程服务,此时也就是没有携带cookie调用,所以会调用失败。
也就是threadLocal作用只是在父线程,而没有在子线程。
异步编排feign调用不丢失请求问题
此时解决方法就是子线程需要共享父线程的请求。此时请求就携带了cookie。
原因:
使用异步编排时,非同一线程无法取到RequestContextHolder(上下文环境保持器)
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();// 获取controller请求对象空指针异常
解决:获取主线程ServletRequestAttributes,给每个异步线程复制一份
// 先从父线程获取原请求
RequestAttributes request = RequestContextHolder.getRequestAttributes();
CompletableFuture<Void> getAddress = CompletableFuture.runAsync(() -> {
// 在子线程调用时先共享请求为父线程的请求
RequestContextHolder.setRequestAttributes(request);
...
}, threadPoolExecutor);
CompletableFuture<Void> getItem = CompletableFuture.runAsync(() -> {
// 在子线程调用时先共享请求为父线程的请求
RequestContextHolder.setRequestAttributes(request);
...
}, threadPoolExecutor).thenRunAsync(() -> {
...
List<SkuHasStockTo> data = r.getData(new TypeReference<List<SkuHasStockTo>>() {
});
计算运费
@Override
public FareVo getFare(Long addrId) {
FareVo fareVo = new FareVo();
// 远程调用
R r = memberFeignService.info(addrId);
MemberAddressVo memberAddressVo = r.getData("memberReceiveAddress", new TypeReference<MemberAddressVo>() {
});
if (memberAddressVo != null) {
String phone = memberAddressVo.getPhone();
//截取用户手机号码最后一位作为我们的运费计算
//1558022051
String fare = phone.substring(phone.length() - 1);
BigDecimal bigDecimal = new BigDecimal(fare);
fareVo.setFare(bigDecimal);
fareVo.setAddress(memberAddressVo);
return fareVo;
}
return null;
}
创建订单流程
接口幂等性问题
具体问题见文档,使用token保证接口幂等性,注意保证查询token和删除token是原子的。
流程
加上接口幂等性校验流程图
@Override
public SubmitOrderResponseVo getSubmitOrderResponseVo(OrderSubmitVo orderSubmitVo) {
SubmitOrderResponseVo submitOrderResponseVo = new SubmitOrderResponseVo();
// 获取用户信息
MemberRespVo member = LoginUserInterceptor.threadLocal.get();
String submitToken = orderSubmitVo.getUniqueToken();
orderSubmitThreadLocal.set(orderSubmitVo);
// Lua脚本含义:
// 查找KEYS[1]的值,如果等于ARGV[1],就进行删除,删除成功返回1,删除失败返回0,如果没找到返回0
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用Lua脚本保证令牌的验证和删除是原子性的
Long flag = redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + member.getId()), submitToken);
if (flag == 1L) {
// 创建订单
OrderCreateTo order = createOrder();
// 验价
BigDecimal totalAmount = order.getOrder().getPayAmount();
BigDecimal payPrice = orderSubmitVo.getPayPrice();
if (Math.abs(totalAmount.subtract(payPrice).doubleValue()) < 0.01) {
// 金额对比成功
// 保存订单
saveOrder(order);
// 锁定库存
WareSkuLockVo wareSkuLockVo = new WareSkuLockVo();
wareSkuLockVo.setOrderSn(order.getOrder().getOrderSn());
List<OrderItemEntity> orderItems = order.getOrderItems();
// 封装一个类
List<OrderItemVo> orderItemVoList = orderItems.stream().map(entity -> {
OrderItemVo orderItemVo = new OrderItemVo();
orderItemVo.setSkuId(entity.getSkuId());
orderItemVo.setTitle(entity.getSkuName());
orderItemVo.setCount(entity.getSkuQuantity());
return orderItemVo;
}).collect(Collectors.toList());
wareSkuLockVo.setLocks(orderItemVoList);
// 远程调用
R r = wmsServiceClient.lockOrderStock(wareSkuLockVo);
if (r.getCode() == 0) {
// 锁住库存成功
submitOrderResponseVo.setOrder(order.getOrder());
submitOrderResponseVo.setCode(0);
// TODO 远程扣减积分
return submitOrderResponseVo;
} else {
submitOrderResponseVo.setCode(3);
throw new NoStockException((String)r.get("msg"));
}
} else {
// 金额对比失败
submitOrderResponseVo.setCode(2);
return submitOrderResponseVo;
}
} else {
// 令牌校验失败
submitOrderResponseVo.setCode(1);
return submitOrderResponseVo;
}
}
创建订单流程图
创建订单TO对象方法
private OrderCreateTo createOrder() {
OrderCreateTo orderCreateTo = new OrderCreateTo();
// 创建订单
String orderSn = IdWorker.getTimeId();
OrderEntity orderEntity = buildOrder(orderSn);
// 生成订单项实体对象
List<OrderItemEntity> orderItemEntities = buildOrderItems(orderSn);
// 计算价格
summaryFillOrder(orderEntity, orderItemEntities);
orderCreateTo.setOrder(orderEntity);
orderCreateTo.setOrderItems(orderItemEntities);
return orderCreateTo;
}
创建订单方法,和生成订单项集合封装方法
/**
* 创建订单方法
*
* @param orderSn
* @return
*/
private OrderEntity buildOrder(String orderSn) {
// 设置订单号
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(orderSn);
// 设置memberId
MemberRespVo member = LoginUserInterceptor.threadLocal.get();
orderEntity.setMemberId(member.getId());
// 查找发送地址
OrderSubmitVo orderSubmitVo = orderSubmitThreadLocal.get();
R r = wmsServiceClient.getFare(orderSubmitVo.getAddrId());
FareVo fareVo = r.getData(new TypeReference<FareVo>() {
});
orderEntity.setFreightAmount(fareVo.getFare());
// 4.封装收货地址信息
orderEntity.setReceiverName(fareVo.getAddress().getName());// 收货人名字
orderEntity.setReceiverPhone(fareVo.getAddress().getPhone());// 收货人电话
orderEntity.setReceiverProvince(fareVo.getAddress().getProvince());// 省
orderEntity.setReceiverCity(fareVo.getAddress().getCity());// 市
orderEntity.setReceiverRegion(fareVo.getAddress().getRegion());// 区
orderEntity.setReceiverDetailAddress(fareVo.getAddress().getDetailAddress());// 详细地址
orderEntity.setReceiverPostCode(fareVo.getAddress().getPostCode());// 收货人邮编
// 5.封装订单状态信息
orderEntity.setStatus(OrderConstant.OrderStatusEnum.CREATE_NEW.getCode());
// 6.设置自动确认时间
orderEntity.setAutoConfirmDay(OrderConstant.autoConfirmDay);// 7天
// 7.设置未删除状态
orderEntity.setDeleteStatus(OrderConstant.OrderIsDeleteEnum.NOT_DELETE.getIsDelete());
// 8.设置时间
Date now = new Date();
orderEntity.setCreateTime(now);
orderEntity.setModifyTime(now);
return orderEntity;
}
/**
* 生成订单项实体对象集合
*
* @param orderSn
* @return
*/
private List<OrderItemEntity> buildOrderItems(String orderSn) {
List<OrderItemVo> cartItems = cartServiceClient.getCartItems();
List<OrderItemEntity> orderItemEntityList = new ArrayList<>();
if (!CollectionUtils.isEmpty(cartItems)) {
orderItemEntityList = cartItems.stream()
.filter(OrderItemVo::getCheck)
.map(item -> buildOrderItem(orderSn, item))
.collect(Collectors.toList());
}
return orderItemEntityList;
}
/**
* 封装每一个购物项
*
* @param orderSn
* @param cartItem
* @return
*/
private OrderItemEntity buildOrderItem(String orderSn, OrderItemVo cartItem) {
OrderItemEntity itemEntity = new OrderItemEntity();
// 1.封装订单号
itemEntity.setOrderSn(orderSn);
// 2.封装SPU信息
R spuInfo = productServiceClient.getSpuBySkuId(cartItem.getSkuId());// 查询SPU信息
SpuInfoTo spuInfoTO = spuInfo.getData(new TypeReference<SpuInfoTo>() {
});
itemEntity.setSpuId(spuInfoTO.getId());
itemEntity.setSpuName(spuInfoTO.getSpuName());
itemEntity.setSpuBrand(spuInfoTO.getSpuName());
itemEntity.setCategoryId(spuInfoTO.getCatalogId());
// 3.封装SKU信息
itemEntity.setSkuId(cartItem.getSkuId());
itemEntity.setSkuName(cartItem.getTitle());
itemEntity.setSkuPic(cartItem.getImage());// 商品sku图片
itemEntity.setSkuPrice(cartItem.getPrice());// 这个是最新价格,购物车模块查询数据库得到
itemEntity.setSkuQuantity(cartItem.getCount());// 当前商品数量
String skuAttrsVals = String.join(";", cartItem.getSkuAttrValues());
itemEntity.setSkuAttrsVals(skuAttrsVals);// 商品销售属性组合["颜色:星河银","版本:8GB+256GB"]
// 4.优惠信息【不做】
// 5.积分信息
int num = cartItem.getPrice().multiply(new BigDecimal(cartItem.getCount())).intValue();// 分值=单价*数量
itemEntity.setGiftGrowth(num);// 成长值
itemEntity.setGiftIntegration(num);// 积分
// 6.价格信息
itemEntity.setPromotionAmount(BigDecimal.ZERO);// 促销金额
itemEntity.setCouponAmount(BigDecimal.ZERO);// 优惠券金额
itemEntity.setIntegrationAmount(BigDecimal.ZERO);// 积分优惠金额
BigDecimal realAmount = itemEntity.getSkuPrice().multiply(new BigDecimal(itemEntity.getSkuQuantity()))
.subtract(itemEntity.getPromotionAmount())
.subtract(itemEntity.getCouponAmount())
.subtract(itemEntity.getIntegrationAmount());
itemEntity.setRealAmount(realAmount);// 实际金额,减去所有优惠金额
return itemEntity;
}
验价之后保存订单,保存订单后锁定库存
/**
* 保存订单
*
* @param orderCreateTo
*/
@Transactional
void saveOrder(OrderCreateTo orderCreateTo) {
OrderEntity order = orderCreateTo.getOrder();
this.save(order);
List<OrderItemEntity> orderItems = orderCreateTo.getOrderItems();
orderItemService.saveBatch(orderItems);
}
锁定库存
if (r.getCode() == 0) {
// 锁住库存成功
submitOrderResponseVo.setOrder(order.getOrder());
submitOrderResponseVo.setCode(0);
// TODO 远程扣减积分
return submitOrderResponseVo;
}
锁定库存如果在远程扣减积分出现异常,那么就会库存不会回滚,此时就有分布式异常的问题。
远程调用库存扣减的方法:
主要就是对lock_stock字段进行操作。
/**
* 锁定库存,sql执行锁定锁定
*
* @param wareSkuLockVo
* @return
*/
@Transactional(rollbackFor = NoStockException.class)
@Override
public Boolean lockOrderStock(WareSkuLockVo wareSkuLockVo) {
// 按照收货地址找到就近仓库,锁定库存(暂未实现)
// 采用方案:获取每项商品在哪些仓库有库存,轮询尝试锁定,任一商品锁定失败回滚
// 找到这个商品在哪里有库存,封装成一个对象
List<OrderItemVo> locks = wareSkuLockVo.getLocks();
List<SkuWareHasStock> skuWareHasStockList = locks.stream().map(item -> {
SkuWareHasStock skuWareHasStock = new SkuWareHasStock();
Long skuId = item.getSkuId();
List<Long> wareIds = wareSkuDao.getSkuStockWareIds(skuId, item.getCount());
// 没有仓库
if (wareIds == null && wareIds.size() == 0) {
throw new NoStockException(skuId);
}
skuWareHasStock.setLockNum(item.getCount());
skuWareHasStock.setSkuId(skuId);
skuWareHasStock.setWareIds(wareIds);
return skuWareHasStock;
}).collect(Collectors.toList());
for (SkuWareHasStock skuWareHasStock : skuWareHasStockList) {
// 是否锁定成功的标志
Boolean skuStockLock = false;
Long skuId = skuWareHasStock.getSkuId();
List<Long> wareIds = skuWareHasStock.getWareIds();
if (CollectionUtils.isEmpty(wareIds)) {
// 只要有一个货物没有仓库有他库存,直接抛出异常,锁定库存失败,回滚
throw new NoStockException(skuId);
} else {
// 有多个仓库有库存,此时需要判断哪个仓库够
for (Long wareId : wareIds) {
// 锁定成功就返回1,不成功返回0(代表影响的行数)
Long count = wareSkuDao.lockSkuStock(skuId, wareId, skuWareHasStock.getLockNum());
if (count == 1) {
// 表示已经有一个仓库锁住了
// 标志置为true,停止锁下一个仓库
skuStockLock = true;
break;
}
// 当前仓库失败,重试下一个仓库
// 所有仓库都没成功锁住,抛出异常
if (skuStockLock == false) {
throw new NoStockException(skuId);
}
}
}
}
return true;
}
分布式事务的解决
参照分布式事务的文档。
后台保存商品分布式方案
Seata的使用参照官网:https://seata.io/zh-cn/docs/user/quickstart.html
前台订单分布式方案
本项目订单并没有使用Seata来解决库存的分布式事务问题,Seata在解决分布式事务的过程中会加锁一系列操作,出现异常直接回滚。并发程度并不是特别高,但后台保存商品可以使用Seata,此时并发并不是特别高,所以本项目分布式事务为了高并发,只保持最终一致性即可,可以使用消息队列。(延时队列)
保存库存锁定成功的消息在延时队列中,30分钟后把消息发送给解锁库存服务,解锁服务如果此时没有查看到订单,也就是可能被人取消了,此时就需要执行库存回滚,也就是定时任务的效果。
延时队列场景
为什么不使用Spring提供的定时任务?
定时任务的时效性问题:如果在下单之前开启定时任务,订单在1才下好单,那么订单在30分钟也就是31的时候才会算过期,而定时任务在30的时候检测没有过期,所以不回滚,在下一个定时任务才可以检测到。时差很大。不采用,而采用延时对了
死信队列和死信交换机
延时队列的实现
方法一:设置队列过期时间
方法二:设置消息过期时间
延时队列模拟关单
使用2个交换机
但可以优化一下,复用同一个交换机,根据routing-key不同进行转发
设计建议规范:(基于事件模型的交换机设计)
1、交换机命名:业务+exchange;交换机为Topic
2、路由键:事件.需要感知的业务(可以不写)
3、队列命名:事件+想要监听服务名+queue
4、绑定关系:事件.感知的业务(#)
@Bean
public Queue orderDelayQueue() {
Map<String, Object> arguments = new HashMap<>();
// 延时队列的死信交换机和routing-key
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000);
return new Queue("order.delay.queue", true, false, false, arguments);
}
@Bean
public Queue orderReleaseOrderQueue() {
return new Queue("order.release.order.queue", true, false, false, null);
}
@Bean
public Exchange orderEventExchange() {
return new TopicExchange("order-event-exchange", true, false);
}
@Bean
public Binding orderCreateOrderBinding() {
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
@Bean
public Binding orderReleaseOrderBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
利用消息队列就解决库存问题
库存问题解决流程图
根据图示创建相应的队列和交换机
根据模拟关单可以是由一个交换机代替两个交换机
@Configuration
public class RabbitMqConfig {
/**
* 配置发送对象到消息队列为Json格式
*
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Queue stockDelayQueue() {
Map<String, Object> arguments = new HashMap<>();
// 延时队列的死信交换机和routing-key
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
arguments.put("x-dead-letter-routing-key", "stock.release");
arguments.put("x-message-ttl", 120000);
return new Queue("stock.delay.queue", true, false, false, arguments);
}
@Bean
public Queue stockReleaseOrderQueue() {
return new Queue("stock.release.stock.queue", true, false, false, null);
}
@Bean
public Exchange stockEventExchange() {
return new TopicExchange("stock-event-exchange", true, false);
}
@Bean
public Binding orderCreateOrderBinding() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
@Bean
public Binding stockReleaseOrderBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
}
进行库存回滚的操作
注意先开启消息手动确认:
防止消息解锁库存出现异常之后,不重新处理,此时库存就未解锁,有很大问题。
spring:
listener:
simple:
acknowledge-mode: manual # 配置ack为手动应答
回滚的情况判断:
/**
* 回滚消息的逻辑:
* 一.先在任务详情表中查询消息:
* 1.如果没有,那么说明在锁定库存的时候已经出现了错误,在本地事务已经进行了回滚,所以所以此时不需要回滚
* 2.如果有:
* 解锁需要判断订单情况:
* 如果没有这个订单,必须解锁。
* 如果有这个订单:1.订单未取消,不解锁 2.订单取消了,解锁
*/
public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
log.info("收到回滚消息");
Long id = stockLockedTo.getId();
WareOrderTaskDetailEntity byId = wareOrderTaskDetailService.getById(id);
if (byId != null) {
// 此时远程调用查询订单情况
WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
R r = orderFeignService.getOrderStatus(taskEntity.getOrderSn());
if (r.getCode() == 0) {
OrderVo data = r.getData(new TypeReference<OrderVo>() {
});
// 只有不存在订单和订单一已经关闭才解锁
if (data == null || data.getStatus() == 4) {
// 调用解锁方法
unLockStock(byId.getSkuId(), byId.getWareId(), byId.getSkuNum());
// 手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
} else {
// 如果远程调用失败,不应答,再次放入队列中,下一次应答
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
} else {
// 没有任务详情表,说明已经回滚,不解锁
// 手动应答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
private void unLockStock(Long skuId, Long wareId, Integer num) {
wareSkuDao.unLockStock(skuId, wareId, num);
}
优化
(封装成一个方法)接收消息的listener
@Service
@Slf4j
@RabbitListener(queues = {"stock.release.stock.queue"})
public class RabbitMQListener {
@Autowired
private WareSkuService wareSkuService;
@RabbitHandler
public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
log.info("收到自动回滚消息,开始处理回滚操作");
try {
wareSkuService.handleStockLockedRelease(stockLockedTo);
// 只要没有异常,就确认收到消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception exception) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
调用方法:此时只有异常,也就是远程调用查询订单出现了异常抛出异常,进行手动的ack拒接。其他不解锁的情况也就不会滚库存数量。
public void handleStockLockedRelease(StockLockedTo stockLockedTo) throws Exception {
/**
* 回滚消息的逻辑:
* 一.先在任务详情表中查询消息:
* 1.如果没有,那么说明在锁定库存的时候已经出现了错误,在本地事务已经进行了回滚,所以所以此时不需要回滚
* 2.如果有:
* 解锁需要判断订单情况:
* 如果没有这个订单,必须解锁。
* 如果有这个订单:1.订单未取消,不解锁 2.订单取消了,解锁
*/
Long id = stockLockedTo.getId();
StockDetailTo stockDetailTo = stockLockedTo.getDetail();
WareOrderTaskDetailEntity taskDetail = wareOrderTaskDetailService.getById(stockDetailTo.getId());
if (taskDetail != null) {
// 此时远程调用查询订单情况
WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
R r = orderFeignService.getOrderStatus(taskEntity.getOrderSn());
if (r.getCode() == 0) {
OrderVo data = r.getData(new TypeReference<OrderVo>() {
});
// 只有不存在订单和订单一已经关闭才解锁
if (data == null || data.getStatus() == 4) {
// 当任务详情表中的状态为1,也就是未解锁的时候才进行解锁
if (taskDetail.getLockStatus() == 1) {
// 调用解锁方法
unLockStock(taskDetail.getSkuId(), taskDetail.getWareId(), taskDetail.getSkuNum(), taskDetail.getId());
} else {
// 订单其他状态,不可解锁(消息确认)
}
}
} else {
// 订单远程调用失败(消息重新入队)
throw new RuntimeException("解锁异常");
}
} else {
// 无库存锁定工作单记录,已回滚,无需解锁(消息确认)
}
}
/**
* 解锁库存
*
* @param skuId
* @param wareId
* @param num
* @param taskId
*/
@Transactional
public void unLockStock(Long skuId, Long wareId, Integer num, Long taskId) {
// 仓库解锁
wareSkuDao.unLockStock(skuId, wareId, num);
// 更新解锁任务单详情表的解锁状态
WareOrderTaskDetailEntity detail = new WareOrderTaskDetailEntity();
detail.setId(taskId);
detail.setLockStatus(2);
wareOrderTaskDetailService.updateById(detail);
}
发送消息
在订单模块库存调用成功后发送消息
if (r.getCode() == 0) {
// 锁住库存成功
submitOrderResponseVo.setOrder(order.getOrder());
submitOrderResponseVo.setCode(0);
// TODO 远程扣减积分
// 发送消息到订单队列中,以后进行关单操作
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());
return submitOrderResponseVo;
} else {
submitOrderResponseVo.setCode(3);
throw new NoStockException((String) r.get("msg"));
}
关单操作详情
如果订单在规定时间未进行支付,那么此时就要进行关单的功能,在关单模拟已经创建了队列和交换机,此时就是在创建订单后发送消息,在一定时间后,监听并关单即可。
关单流程图
监听消息
@Slf4j
@Service
@RabbitListener(queues = {"order.release.order.queue"})
public class RabbitMQListener {
@Autowired
private OrderService orderService;
@RabbitHandler
public void closeOrder(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
log.info("订单超过未支付时间,开始关闭订单操作...");
try {
orderService.closeOrder(orderEntity);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception exception) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
修改订单状态为关单状态
/**
* 修改订单状态为关单
*
* @param orderEntity
*/
@Override
public void closeOrder(OrderEntity orderEntity) {
// 修改订单状态,表示未支付
OrderEntity entity = getById(orderEntity.getId());
// 时间过了还未付款,关单
if (entity.getStatus() == 0) {
OrderEntity updateEntity = new OrderEntity();
updateEntity.setId(entity.getId());
updateEntity.setStatus(4);
updateById(updateEntity);
}
}
关单操作和库存解锁的问题
问题引入和解决方法
一般情况库存锁定在关单操作之后,如果有网络卡顿,那么关单在解锁库存之后,此时库存的解锁的逻辑:如果有这个订单:1.订单未取消,不解锁 2.订单取消了,解锁,所以此时库存是不会解锁的,因为订单还是在新建状态,并没有关单。
为了解决这个问题:可以在订单创建之后主动发送一次库存解锁操作,此时可以保证库存在订单关单之后一定被解锁,但是如果关单在解锁库存之前,那么正常解锁库存(不是订单主动发送的解锁)应该如何判断是否解锁呢?
此时在关单主动解锁库存的时候,一定需要将任务详情单的lock_status设置未2表示已经解锁,第二次被动解锁就再次解锁库存了。
流程
此时关单的交换机需要绑定库存的死信队列,注意一定不要绑到了库存的延时队列
@Bean
public Binding orderReleaseOtherBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
下单之后主动发送库存解锁消息
/**
* 修改订单状态为关单
*
* @param orderEntity
*/
@Override
public void closeOrder(OrderEntity orderEntity) {
// 修改订单状态,表示未支付
OrderEntity entity = getById(orderEntity.getId());
// 时间过了还未付款,关单
if (entity.getStatus() == 0) {
OrderEntity updateEntity = new OrderEntity();
updateEntity.setId(entity.getId());
updateEntity.setStatus(4);
updateById(updateEntity);
}
// 主动发起解锁库存消息,防止有延时导致库存未解锁,却又修改了订单
OrderTo orderTo = new OrderTo();
BeanUtils.copyProperties(entity, orderTo);
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
}
库存监听器监听OrderTo的消息
@RabbitHandler
public void handleStockLockedReleaseByOrderSend(OrderTo orderTo, Message message, Channel channel) throws IOException {
log.info("收到订单发送关单后的回滚消息,开始处理回滚操作");
try {
wareSkuService.handleStockLockedReleaseByOrderSend(orderTo);
// 只要没有异常,就确认收到消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception exception) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
接口
此时需要先查询taskId,根据任务详情单进行回滚
@Override
@Transactional
public void handleStockLockedReleaseByOrderSend(OrderTo orderTo) {
// 通过任务表查询任务id
String orderSn = orderTo.getOrderSn();
Long taskId = orderTaskService.getTaskIdByOrderSn(orderSn);
// 查询所有任务详情的信息,注意找的是没有解锁的任务详情
List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>()
.eq("task_id", taskId)
.eq("lock_status", 1));
// 遍历解锁库存
for (WareOrderTaskDetailEntity entity : list) {
unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());
}
}
如何保证消息的可靠性
消息丢失(三种情况)
消息丢失一消息发送出去,由于网络问题没有抵达服务器
- 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式
- 做好日志记录,每个消息状态是否都被服务器收到都应该记录
- 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发一
消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。
- publishe地必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
- 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队
消息重复(三种情况)
消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
消息消费失败,由于重试机制,自动又将消息发送出去
成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
- 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志
- 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理
- rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的
消息重复很好解决,库存服务在业务逻辑上也就放置了幂等性。
消息积压
消费者宕机积压
消费者消费能力不足积压
发送者发送流量太大。
- 上线更多的消费者,进行正常消费
- 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
项目也可以使用解锁库存的服务专为为一个微服务,可以解决消息挤压。