简介
订单中心
电商系统涉及到3流,分别时信息流,资金流,物流,而订单系统作为中枢将三者有机的集合起来。订单模块是电商系统的枢纽,在订单这个环节上需求获取多个模块的数据和信息,同时对这些信息进行加工处理后流向下个环节,这一系列就构成了订单的信息流通。
订单构成

流程

拦截器
订单模块需要用户登录后操作
步骤:
- 添加拦截器
@Componentpublic class LoginUserInterceptor implements HandlerInterceptor {public static ThreadLocal<MemberRespVo> threadLocal = new ThreadLocal<>();@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {HttpSession session = request.getSession();MemberRespVo memberRespVo = (MemberRespVo) session.getAttribute(AuthConstant.LOGIN_USER);if (memberRespVo != null) {// 保存到ThreadLocal中threadLocal.set(memberRespVo);return true;}else {// 设置提示request.getSession().setAttribute("msg", "请先登录!");// 重定向去登录response.sendRedirect("http://auth.gulimalls.com/login.html");return false;}}}
- 添加配置使拦截器生效
@Configurationpublic class MyWebConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginUserInterceptor()).addPathPatterns("/**");}}
确认订单页流程
订单模型抽取
/*** 结算页VO(confirm.html需要的页面数据)** @author: wan*/public class OrderConfirmVO {/*** 会员收获地址列表,ums_member_receive_address**/@Getter@SetterList<MemberAddressVO> memberAddressVos;/*** 所有选中的购物项【购物车中的选中项】**/@Getter@SetterList<OrderItemVO> items;/*** 优惠券(会员积分)**/@Getter@Setterprivate Integer integration;/*** TODO 防止重复提交的令牌 幂等性**/@Getter@Setterprivate String uniqueToken;/*** 库存* 有货/无货,不放在item里面*/@Getter@SetterMap<Long, Boolean> stocks;/*** 总商品金额**///BigDecimal total;//计算订单总额public BigDecimal getTotal() {BigDecimal totalNum = BigDecimal.ZERO;if (!CollectionUtils.isEmpty(items)) {for (OrderItemVO item : items) {// 计算当前商品总价格BigDecimal itemPrice = item.getPrice().multiply(new BigDecimal(item.getCount()));// 累加全部商品总价格totalNum = totalNum.add(itemPrice);}}return totalNum;}/*** 应付总额**///BigDecimal payPrice;public BigDecimal getPayPrice() {return getTotal();}/*** 商品总数*/public Integer getCount() {Integer count = 0;if (!CollectionUtils.isEmpty(items)) {for (OrderItemVO item : items) {count += item.getCount();}}return count;}}
用户地址
/*** 收获地址,结算页VO* @author: wan*/@Datapublic class MemberAddressVO {private Long id;/*** member_id*/private Long memberId;/*** 收货人姓名*/private String name;/*** 电话*/private String phone;/*** 邮政编码*/private String postCode;/*** 省份/直辖市*/private String province;/*** 城市*/private String city;/*** 区*/private String region;/*** 详细地址(街道)*/private String detailAddress;/*** 省市区代码*/private String areacode;/*** 是否默认*/private Integer defaultStatus;}
勾选了的购物项
/*** @Description: 购物车所有购物项*/@Datapublic class OrderItemVO {private Long skuId; // skuIdprivate Boolean check = true; // 是否选中private String title; // 标题private String image; // 图片private List<String> skuAttrValues;// 商品销售属性 ["颜色:星河银","版本:8GB+256GB"]private BigDecimal price; // 单价private Integer count; // 当前商品数量private BigDecimal totalPrice; // 总价private BigDecimal weight = new BigDecimal("0.085");// 商品重量}
业务逻辑
Controller
@Controllerpublic class OrderWebController {@Autowiredprivate OrderService orderService;@GetMapping("/toTrade")public String toTrade(Model model) {OrderConfirmVO orderConfirmVO = orderService.getOrderConfirm();model.addAttribute("confirmOrderData", orderConfirmVO);return "confirm";}}
接口实现类
@Overridepublic OrderConfirmVO getOrderConfirm() {OrderConfirmVO orderConfirmVO = new OrderConfirmVO();MemberRespVo member = LoginUserInterceptor.threadLocal.get();// 远程调用查用户的地址List<MemberAddressVO> memberAddress = memberServiceClient.getMemberAddress(member.getId());orderConfirmVO.setMemberAddressVos(memberAddress);// 远程调用查用户选中了的购物项List<OrderItemVO> cartItems = cartServiceClient.getCartItems();orderConfirmVO.setItems(cartItems);// 优惠信息在用户信息中已经保存orderConfirmVO.setIntegration(member.getIntegration());// 其他数据计算// TODO 放重令牌return orderConfirmVO;}
注意此时调用远程调用了购物车模块查询购物项
@Overridepublic List<CartItemVo> getCartItems() {UserInfoTo userInfoTo = CartInterceptor.threadLocal.get();if (userInfoTo.getUserId() == null) {return null;} else {// 从redis中查出List<CartItemVo> cartItems = getCartItems(CartConstant.CART_PREFIX + userInfoTo.getUserId());// 过滤出选中的,且更新最新的价格List<CartItemVo> collect = cartItems.stream().filter(CartItemVo::getCheck).map((item) -> {// 远程调用查询BigDecimal skuPrice = productFeignService.getSkuPrice(item.getSkuId());item.setPrice(skuPrice);return item;}).collect(Collectors.toList());return collect;}}
此时会从threadLocal中取出user-key,但是此时在cart服务的拦截器会先对feign请求进行拦截,但此时请求却没有cookie信息。这就是feign调用不携带请求头的问题。
Feign远程调用丢失请求头问题
原因
主要原因还是feign调用时,会创建一个新请求来调用。

此时配置拦截器,注入到容器中,feign创建的新请求会被拦截器拦截。
Feign底层
feign底层可以注册多个拦截器处理请求

这个方法会遍历注册的拦截器,并调用拦截器的apply方法执行

拦截器接口

所以我们要返回这个接口的实例,注入到容器中。
配置拦截器
@Configurationpublic class GuliFeignConfig {@Bean("requestInterceptor")public RequestInterceptor requestInterceptor() {return new RequestInterceptor() {/*** 重写apply方法* @param requestTemplate 新请求*/@Overridepublic void apply(RequestTemplate requestTemplate) {// 获取原来的请求// 拿到刚进来的请求RequestContextHolder// ServletRequestAttributes是spring封装的,AbstractRequestAttributes(RequestContextHolder返回类型)子类// 如果不使用的化,也可以通过在controller中接收request,并通过ThreadLocal也可以共享,只是这里spring提供了可以直接获取的类ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request = servletRequestAttributes.getRequest();// 同步请求cookie,老请求放到新请求中// 所有Cookie被封装在请求头中,所以吧Cookie放到新请求即可String cookie = request.getHeader("Cookie");requestTemplate.header("Cookie", cookie);}};}}
异步优化
使用异步编排优化业务执行,线程池的配置在其他服务已经配置过,复制配置即可。
@Overridepublic OrderConfirmVO getOrderConfirm() throws ExecutionException, InterruptedException {OrderConfirmVO orderConfirmVO = new OrderConfirmVO();MemberRespVo member = LoginUserInterceptor.threadLocal.get();// 先从父线程获取原请求RequestAttributes request = RequestContextHolder.getRequestAttributes();CompletableFuture<Void> getAddress = CompletableFuture.runAsync(() -> {// 在子线程调用时先共享请求为父线程的请求RequestContextHolder.setRequestAttributes(request);// 远程调用查用户的地址List<MemberAddressVO> memberAddress = memberServiceClient.getMemberAddress(member.getId());orderConfirmVO.setMemberAddressVos(memberAddress);}, threadPoolExecutor);CompletableFuture<Void> getItem = CompletableFuture.runAsync(() -> {// 在子线程调用时先共享请求为父线程的请求RequestContextHolder.setRequestAttributes(request);// 远程调用查用户选中了的购物项List<OrderItemVO> cartItems = cartServiceClient.getCartItems();orderConfirmVO.setItems(cartItems);}, threadPoolExecutor).thenRunAsync(() -> {// 远程调用查询库存信息// 先收集skuIdList<OrderItemVO> items = orderConfirmVO.getItems();List<Long> skuIdList = items.stream().map(orderItemVo -> orderItemVo.getSkuId()).collect(Collectors.toList());R r = wmsServiceClient.getSkuHasStock(skuIdList);List<SkuHasStockTo> data = r.getData(new TypeReference<List<SkuHasStockTo>>() {});// 封装库存信息if (data != null) {Map<Long, Boolean> collect = data.stream().collect(Collectors.toMap(SkuHasStockTo::getSkuId, SkuHasStockTo::getHasStock));orderConfirmVO.setStocks(collect);}});// 优惠信息在用户信息中已经保存orderConfirmVO.setIntegration(member.getIntegration());// 其他数据计算// TODO 放重令牌// 等待2个异步任务CompletableFuture.allOf(getAddress, getItem).get();return orderConfirmVO;}
此时又会遇到问题,就是在两个线程中的请求不共享,2个线程分别调用远程服务,此时也就是没有携带cookie调用,所以会调用失败。
也就是threadLocal作用只是在父线程,而没有在子线程。
异步编排feign调用不丢失请求问题
此时解决方法就是子线程需要共享父线程的请求。此时请求就携带了cookie。
原因:
使用异步编排时,非同一线程无法取到RequestContextHolder(上下文环境保持器)
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();// 获取controller请求对象空指针异常
解决:获取主线程ServletRequestAttributes,给每个异步线程复制一份
// 先从父线程获取原请求RequestAttributes request = RequestContextHolder.getRequestAttributes();CompletableFuture<Void> getAddress = CompletableFuture.runAsync(() -> {// 在子线程调用时先共享请求为父线程的请求RequestContextHolder.setRequestAttributes(request);...}, threadPoolExecutor);CompletableFuture<Void> getItem = CompletableFuture.runAsync(() -> {// 在子线程调用时先共享请求为父线程的请求RequestContextHolder.setRequestAttributes(request);...}, threadPoolExecutor).thenRunAsync(() -> {...List<SkuHasStockTo> data = r.getData(new TypeReference<List<SkuHasStockTo>>() {});
计算运费
@Overridepublic FareVo getFare(Long addrId) {FareVo fareVo = new FareVo();// 远程调用R r = memberFeignService.info(addrId);MemberAddressVo memberAddressVo = r.getData("memberReceiveAddress", new TypeReference<MemberAddressVo>() {});if (memberAddressVo != null) {String phone = memberAddressVo.getPhone();//截取用户手机号码最后一位作为我们的运费计算//1558022051String fare = phone.substring(phone.length() - 1);BigDecimal bigDecimal = new BigDecimal(fare);fareVo.setFare(bigDecimal);fareVo.setAddress(memberAddressVo);return fareVo;}return null;}
创建订单流程
接口幂等性问题
具体问题见文档,使用token保证接口幂等性,注意保证查询token和删除token是原子的。

流程
加上接口幂等性校验流程图

@Overridepublic SubmitOrderResponseVo getSubmitOrderResponseVo(OrderSubmitVo orderSubmitVo) {SubmitOrderResponseVo submitOrderResponseVo = new SubmitOrderResponseVo();// 获取用户信息MemberRespVo member = LoginUserInterceptor.threadLocal.get();String submitToken = orderSubmitVo.getUniqueToken();orderSubmitThreadLocal.set(orderSubmitVo);// Lua脚本含义:// 查找KEYS[1]的值,如果等于ARGV[1],就进行删除,删除成功返回1,删除失败返回0,如果没找到返回0String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";// 使用Lua脚本保证令牌的验证和删除是原子性的Long flag = redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + member.getId()), submitToken);if (flag == 1L) {// 创建订单OrderCreateTo order = createOrder();// 验价BigDecimal totalAmount = order.getOrder().getPayAmount();BigDecimal payPrice = orderSubmitVo.getPayPrice();if (Math.abs(totalAmount.subtract(payPrice).doubleValue()) < 0.01) {// 金额对比成功// 保存订单saveOrder(order);// 锁定库存WareSkuLockVo wareSkuLockVo = new WareSkuLockVo();wareSkuLockVo.setOrderSn(order.getOrder().getOrderSn());List<OrderItemEntity> orderItems = order.getOrderItems();// 封装一个类List<OrderItemVo> orderItemVoList = orderItems.stream().map(entity -> {OrderItemVo orderItemVo = new OrderItemVo();orderItemVo.setSkuId(entity.getSkuId());orderItemVo.setTitle(entity.getSkuName());orderItemVo.setCount(entity.getSkuQuantity());return orderItemVo;}).collect(Collectors.toList());wareSkuLockVo.setLocks(orderItemVoList);// 远程调用R r = wmsServiceClient.lockOrderStock(wareSkuLockVo);if (r.getCode() == 0) {// 锁住库存成功submitOrderResponseVo.setOrder(order.getOrder());submitOrderResponseVo.setCode(0);// TODO 远程扣减积分return submitOrderResponseVo;} else {submitOrderResponseVo.setCode(3);throw new NoStockException((String)r.get("msg"));}} else {// 金额对比失败submitOrderResponseVo.setCode(2);return submitOrderResponseVo;}} else {// 令牌校验失败submitOrderResponseVo.setCode(1);return submitOrderResponseVo;}}
创建订单流程图

创建订单TO对象方法
private OrderCreateTo createOrder() {OrderCreateTo orderCreateTo = new OrderCreateTo();// 创建订单String orderSn = IdWorker.getTimeId();OrderEntity orderEntity = buildOrder(orderSn);// 生成订单项实体对象List<OrderItemEntity> orderItemEntities = buildOrderItems(orderSn);// 计算价格summaryFillOrder(orderEntity, orderItemEntities);orderCreateTo.setOrder(orderEntity);orderCreateTo.setOrderItems(orderItemEntities);return orderCreateTo;}
创建订单方法,和生成订单项集合封装方法
/*** 创建订单方法** @param orderSn* @return*/private OrderEntity buildOrder(String orderSn) {// 设置订单号OrderEntity orderEntity = new OrderEntity();orderEntity.setOrderSn(orderSn);// 设置memberIdMemberRespVo member = LoginUserInterceptor.threadLocal.get();orderEntity.setMemberId(member.getId());// 查找发送地址OrderSubmitVo orderSubmitVo = orderSubmitThreadLocal.get();R r = wmsServiceClient.getFare(orderSubmitVo.getAddrId());FareVo fareVo = r.getData(new TypeReference<FareVo>() {});orderEntity.setFreightAmount(fareVo.getFare());// 4.封装收货地址信息orderEntity.setReceiverName(fareVo.getAddress().getName());// 收货人名字orderEntity.setReceiverPhone(fareVo.getAddress().getPhone());// 收货人电话orderEntity.setReceiverProvince(fareVo.getAddress().getProvince());// 省orderEntity.setReceiverCity(fareVo.getAddress().getCity());// 市orderEntity.setReceiverRegion(fareVo.getAddress().getRegion());// 区orderEntity.setReceiverDetailAddress(fareVo.getAddress().getDetailAddress());// 详细地址orderEntity.setReceiverPostCode(fareVo.getAddress().getPostCode());// 收货人邮编// 5.封装订单状态信息orderEntity.setStatus(OrderConstant.OrderStatusEnum.CREATE_NEW.getCode());// 6.设置自动确认时间orderEntity.setAutoConfirmDay(OrderConstant.autoConfirmDay);// 7天// 7.设置未删除状态orderEntity.setDeleteStatus(OrderConstant.OrderIsDeleteEnum.NOT_DELETE.getIsDelete());// 8.设置时间Date now = new Date();orderEntity.setCreateTime(now);orderEntity.setModifyTime(now);return orderEntity;}/*** 生成订单项实体对象集合** @param orderSn* @return*/private List<OrderItemEntity> buildOrderItems(String orderSn) {List<OrderItemVo> cartItems = cartServiceClient.getCartItems();List<OrderItemEntity> orderItemEntityList = new ArrayList<>();if (!CollectionUtils.isEmpty(cartItems)) {orderItemEntityList = cartItems.stream().filter(OrderItemVo::getCheck).map(item -> buildOrderItem(orderSn, item)).collect(Collectors.toList());}return orderItemEntityList;}/*** 封装每一个购物项** @param orderSn* @param cartItem* @return*/private OrderItemEntity buildOrderItem(String orderSn, OrderItemVo cartItem) {OrderItemEntity itemEntity = new OrderItemEntity();// 1.封装订单号itemEntity.setOrderSn(orderSn);// 2.封装SPU信息R spuInfo = productServiceClient.getSpuBySkuId(cartItem.getSkuId());// 查询SPU信息SpuInfoTo spuInfoTO = spuInfo.getData(new TypeReference<SpuInfoTo>() {});itemEntity.setSpuId(spuInfoTO.getId());itemEntity.setSpuName(spuInfoTO.getSpuName());itemEntity.setSpuBrand(spuInfoTO.getSpuName());itemEntity.setCategoryId(spuInfoTO.getCatalogId());// 3.封装SKU信息itemEntity.setSkuId(cartItem.getSkuId());itemEntity.setSkuName(cartItem.getTitle());itemEntity.setSkuPic(cartItem.getImage());// 商品sku图片itemEntity.setSkuPrice(cartItem.getPrice());// 这个是最新价格,购物车模块查询数据库得到itemEntity.setSkuQuantity(cartItem.getCount());// 当前商品数量String skuAttrsVals = String.join(";", cartItem.getSkuAttrValues());itemEntity.setSkuAttrsVals(skuAttrsVals);// 商品销售属性组合["颜色:星河银","版本:8GB+256GB"]// 4.优惠信息【不做】// 5.积分信息int num = cartItem.getPrice().multiply(new BigDecimal(cartItem.getCount())).intValue();// 分值=单价*数量itemEntity.setGiftGrowth(num);// 成长值itemEntity.setGiftIntegration(num);// 积分// 6.价格信息itemEntity.setPromotionAmount(BigDecimal.ZERO);// 促销金额itemEntity.setCouponAmount(BigDecimal.ZERO);// 优惠券金额itemEntity.setIntegrationAmount(BigDecimal.ZERO);// 积分优惠金额BigDecimal realAmount = itemEntity.getSkuPrice().multiply(new BigDecimal(itemEntity.getSkuQuantity())).subtract(itemEntity.getPromotionAmount()).subtract(itemEntity.getCouponAmount()).subtract(itemEntity.getIntegrationAmount());itemEntity.setRealAmount(realAmount);// 实际金额,减去所有优惠金额return itemEntity;}
验价之后保存订单,保存订单后锁定库存
/*** 保存订单** @param orderCreateTo*/@Transactionalvoid saveOrder(OrderCreateTo orderCreateTo) {OrderEntity order = orderCreateTo.getOrder();this.save(order);List<OrderItemEntity> orderItems = orderCreateTo.getOrderItems();orderItemService.saveBatch(orderItems);}
锁定库存

if (r.getCode() == 0) {// 锁住库存成功submitOrderResponseVo.setOrder(order.getOrder());submitOrderResponseVo.setCode(0);// TODO 远程扣减积分return submitOrderResponseVo;}
锁定库存如果在远程扣减积分出现异常,那么就会库存不会回滚,此时就有分布式异常的问题。
远程调用库存扣减的方法:
主要就是对lock_stock字段进行操作。
/*** 锁定库存,sql执行锁定锁定** @param wareSkuLockVo* @return*/@Transactional(rollbackFor = NoStockException.class)@Overridepublic Boolean lockOrderStock(WareSkuLockVo wareSkuLockVo) {// 按照收货地址找到就近仓库,锁定库存(暂未实现)// 采用方案:获取每项商品在哪些仓库有库存,轮询尝试锁定,任一商品锁定失败回滚// 找到这个商品在哪里有库存,封装成一个对象List<OrderItemVo> locks = wareSkuLockVo.getLocks();List<SkuWareHasStock> skuWareHasStockList = locks.stream().map(item -> {SkuWareHasStock skuWareHasStock = new SkuWareHasStock();Long skuId = item.getSkuId();List<Long> wareIds = wareSkuDao.getSkuStockWareIds(skuId, item.getCount());// 没有仓库if (wareIds == null && wareIds.size() == 0) {throw new NoStockException(skuId);}skuWareHasStock.setLockNum(item.getCount());skuWareHasStock.setSkuId(skuId);skuWareHasStock.setWareIds(wareIds);return skuWareHasStock;}).collect(Collectors.toList());for (SkuWareHasStock skuWareHasStock : skuWareHasStockList) {// 是否锁定成功的标志Boolean skuStockLock = false;Long skuId = skuWareHasStock.getSkuId();List<Long> wareIds = skuWareHasStock.getWareIds();if (CollectionUtils.isEmpty(wareIds)) {// 只要有一个货物没有仓库有他库存,直接抛出异常,锁定库存失败,回滚throw new NoStockException(skuId);} else {// 有多个仓库有库存,此时需要判断哪个仓库够for (Long wareId : wareIds) {// 锁定成功就返回1,不成功返回0(代表影响的行数)Long count = wareSkuDao.lockSkuStock(skuId, wareId, skuWareHasStock.getLockNum());if (count == 1) {// 表示已经有一个仓库锁住了// 标志置为true,停止锁下一个仓库skuStockLock = true;break;}// 当前仓库失败,重试下一个仓库// 所有仓库都没成功锁住,抛出异常if (skuStockLock == false) {throw new NoStockException(skuId);}}}}return true;}
分布式事务的解决
参照分布式事务的文档。
后台保存商品分布式方案
Seata的使用参照官网:https://seata.io/zh-cn/docs/user/quickstart.html
前台订单分布式方案
本项目订单并没有使用Seata来解决库存的分布式事务问题,Seata在解决分布式事务的过程中会加锁一系列操作,出现异常直接回滚。并发程度并不是特别高,但后台保存商品可以使用Seata,此时并发并不是特别高,所以本项目分布式事务为了高并发,只保持最终一致性即可,可以使用消息队列。(延时队列)
保存库存锁定成功的消息在延时队列中,30分钟后把消息发送给解锁库存服务,解锁服务如果此时没有查看到订单,也就是可能被人取消了,此时就需要执行库存回滚,也就是定时任务的效果。

延时队列场景

为什么不使用Spring提供的定时任务?

定时任务的时效性问题:如果在下单之前开启定时任务,订单在1才下好单,那么订单在30分钟也就是31的时候才会算过期,而定时任务在30的时候检测没有过期,所以不回滚,在下一个定时任务才可以检测到。时差很大。不采用,而采用延时对了

死信队列和死信交换机

延时队列的实现
方法一:设置队列过期时间

方法二:设置消息过期时间

延时队列模拟关单
使用2个交换机

但可以优化一下,复用同一个交换机,根据routing-key不同进行转发

设计建议规范:(基于事件模型的交换机设计)
1、交换机命名:业务+exchange;交换机为Topic
2、路由键:事件.需要感知的业务(可以不写)
3、队列命名:事件+想要监听服务名+queue
4、绑定关系:事件.感知的业务(#)
@Beanpublic Queue orderDelayQueue() {Map<String, Object> arguments = new HashMap<>();// 延时队列的死信交换机和routing-keyarguments.put("x-dead-letter-exchange", "order-event-exchange");arguments.put("x-dead-letter-routing-key", "order.release.order");arguments.put("x-message-ttl", 60000);return new Queue("order.delay.queue", true, false, false, arguments);}@Beanpublic Queue orderReleaseOrderQueue() {return new Queue("order.release.order.queue", true, false, false, null);}@Beanpublic Exchange orderEventExchange() {return new TopicExchange("order-event-exchange", true, false);}@Beanpublic Binding orderCreateOrderBinding() {return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding orderReleaseOrderBinding() {return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}
利用消息队列就解决库存问题
库存问题解决流程图

根据图示创建相应的队列和交换机
根据模拟关单可以是由一个交换机代替两个交换机
@Configurationpublic class RabbitMqConfig {/*** 配置发送对象到消息队列为Json格式** @return*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic Queue stockDelayQueue() {Map<String, Object> arguments = new HashMap<>();// 延时队列的死信交换机和routing-keyarguments.put("x-dead-letter-exchange", "stock-event-exchange");arguments.put("x-dead-letter-routing-key", "stock.release");arguments.put("x-message-ttl", 120000);return new Queue("stock.delay.queue", true, false, false, arguments);}@Beanpublic Queue stockReleaseOrderQueue() {return new Queue("stock.release.stock.queue", true, false, false, null);}@Beanpublic Exchange stockEventExchange() {return new TopicExchange("stock-event-exchange", true, false);}@Beanpublic Binding orderCreateOrderBinding() {return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}@Beanpublic Binding stockReleaseOrderBinding() {return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}}
进行库存回滚的操作
注意先开启消息手动确认:
防止消息解锁库存出现异常之后,不重新处理,此时库存就未解锁,有很大问题。
spring:listener:simple:acknowledge-mode: manual # 配置ack为手动应答
回滚的情况判断:
/*** 回滚消息的逻辑:* 一.先在任务详情表中查询消息:* 1.如果没有,那么说明在锁定库存的时候已经出现了错误,在本地事务已经进行了回滚,所以所以此时不需要回滚* 2.如果有:* 解锁需要判断订单情况:* 如果没有这个订单,必须解锁。* 如果有这个订单:1.订单未取消,不解锁 2.订单取消了,解锁*/
public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {log.info("收到回滚消息");Long id = stockLockedTo.getId();WareOrderTaskDetailEntity byId = wareOrderTaskDetailService.getById(id);if (byId != null) {// 此时远程调用查询订单情况WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);R r = orderFeignService.getOrderStatus(taskEntity.getOrderSn());if (r.getCode() == 0) {OrderVo data = r.getData(new TypeReference<OrderVo>() {});// 只有不存在订单和订单一已经关闭才解锁if (data == null || data.getStatus() == 4) {// 调用解锁方法unLockStock(byId.getSkuId(), byId.getWareId(), byId.getSkuNum());// 手动应答channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}} else {// 如果远程调用失败,不应答,再次放入队列中,下一次应答channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}} else {// 没有任务详情表,说明已经回滚,不解锁// 手动应答channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}private void unLockStock(Long skuId, Long wareId, Integer num) {wareSkuDao.unLockStock(skuId, wareId, num);}
优化
(封装成一个方法)接收消息的listener
@Service@Slf4j@RabbitListener(queues = {"stock.release.stock.queue"})public class RabbitMQListener {@Autowiredprivate WareSkuService wareSkuService;@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {log.info("收到自动回滚消息,开始处理回滚操作");try {wareSkuService.handleStockLockedRelease(stockLockedTo);// 只要没有异常,就确认收到消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception exception) {channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}
调用方法:此时只有异常,也就是远程调用查询订单出现了异常抛出异常,进行手动的ack拒接。其他不解锁的情况也就不会滚库存数量。
public void handleStockLockedRelease(StockLockedTo stockLockedTo) throws Exception {/*** 回滚消息的逻辑:* 一.先在任务详情表中查询消息:* 1.如果没有,那么说明在锁定库存的时候已经出现了错误,在本地事务已经进行了回滚,所以所以此时不需要回滚* 2.如果有:* 解锁需要判断订单情况:* 如果没有这个订单,必须解锁。* 如果有这个订单:1.订单未取消,不解锁 2.订单取消了,解锁*/Long id = stockLockedTo.getId();StockDetailTo stockDetailTo = stockLockedTo.getDetail();WareOrderTaskDetailEntity taskDetail = wareOrderTaskDetailService.getById(stockDetailTo.getId());if (taskDetail != null) {// 此时远程调用查询订单情况WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);R r = orderFeignService.getOrderStatus(taskEntity.getOrderSn());if (r.getCode() == 0) {OrderVo data = r.getData(new TypeReference<OrderVo>() {});// 只有不存在订单和订单一已经关闭才解锁if (data == null || data.getStatus() == 4) {// 当任务详情表中的状态为1,也就是未解锁的时候才进行解锁if (taskDetail.getLockStatus() == 1) {// 调用解锁方法unLockStock(taskDetail.getSkuId(), taskDetail.getWareId(), taskDetail.getSkuNum(), taskDetail.getId());} else {// 订单其他状态,不可解锁(消息确认)}}} else {// 订单远程调用失败(消息重新入队)throw new RuntimeException("解锁异常");}} else {// 无库存锁定工作单记录,已回滚,无需解锁(消息确认)}}/*** 解锁库存** @param skuId* @param wareId* @param num* @param taskId*/@Transactionalpublic void unLockStock(Long skuId, Long wareId, Integer num, Long taskId) {// 仓库解锁wareSkuDao.unLockStock(skuId, wareId, num);// 更新解锁任务单详情表的解锁状态WareOrderTaskDetailEntity detail = new WareOrderTaskDetailEntity();detail.setId(taskId);detail.setLockStatus(2);wareOrderTaskDetailService.updateById(detail);}
发送消息
在订单模块库存调用成功后发送消息
if (r.getCode() == 0) {// 锁住库存成功submitOrderResponseVo.setOrder(order.getOrder());submitOrderResponseVo.setCode(0);// TODO 远程扣减积分// 发送消息到订单队列中,以后进行关单操作rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());return submitOrderResponseVo;} else {submitOrderResponseVo.setCode(3);throw new NoStockException((String) r.get("msg"));}
关单操作详情
如果订单在规定时间未进行支付,那么此时就要进行关单的功能,在关单模拟已经创建了队列和交换机,此时就是在创建订单后发送消息,在一定时间后,监听并关单即可。
关单流程图

监听消息
@Slf4j@Service@RabbitListener(queues = {"order.release.order.queue"})public class RabbitMQListener {@Autowiredprivate OrderService orderService;@RabbitHandlerpublic void closeOrder(OrderEntity orderEntity, Message message, Channel channel) throws IOException {log.info("订单超过未支付时间,开始关闭订单操作...");try {orderService.closeOrder(orderEntity);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception exception) {channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}
修改订单状态为关单状态
/*** 修改订单状态为关单** @param orderEntity*/@Overridepublic void closeOrder(OrderEntity orderEntity) {// 修改订单状态,表示未支付OrderEntity entity = getById(orderEntity.getId());// 时间过了还未付款,关单if (entity.getStatus() == 0) {OrderEntity updateEntity = new OrderEntity();updateEntity.setId(entity.getId());updateEntity.setStatus(4);updateById(updateEntity);}}
关单操作和库存解锁的问题
问题引入和解决方法

一般情况库存锁定在关单操作之后,如果有网络卡顿,那么关单在解锁库存之后,此时库存的解锁的逻辑:如果有这个订单:1.订单未取消,不解锁 2.订单取消了,解锁,所以此时库存是不会解锁的,因为订单还是在新建状态,并没有关单。
为了解决这个问题:可以在订单创建之后主动发送一次库存解锁操作,此时可以保证库存在订单关单之后一定被解锁,但是如果关单在解锁库存之前,那么正常解锁库存(不是订单主动发送的解锁)应该如何判断是否解锁呢?
此时在关单主动解锁库存的时候,一定需要将任务详情单的lock_status设置未2表示已经解锁,第二次被动解锁就再次解锁库存了。
流程
此时关单的交换机需要绑定库存的死信队列,注意一定不要绑到了库存的延时队列

@Beanpublic Binding orderReleaseOtherBinding() {return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.other.#",null);}
下单之后主动发送库存解锁消息
/*** 修改订单状态为关单** @param orderEntity*/@Overridepublic void closeOrder(OrderEntity orderEntity) {// 修改订单状态,表示未支付OrderEntity entity = getById(orderEntity.getId());// 时间过了还未付款,关单if (entity.getStatus() == 0) {OrderEntity updateEntity = new OrderEntity();updateEntity.setId(entity.getId());updateEntity.setStatus(4);updateById(updateEntity);}// 主动发起解锁库存消息,防止有延时导致库存未解锁,却又修改了订单OrderTo orderTo = new OrderTo();BeanUtils.copyProperties(entity, orderTo);rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);}
库存监听器监听OrderTo的消息
@RabbitHandlerpublic void handleStockLockedReleaseByOrderSend(OrderTo orderTo, Message message, Channel channel) throws IOException {log.info("收到订单发送关单后的回滚消息,开始处理回滚操作");try {wareSkuService.handleStockLockedReleaseByOrderSend(orderTo);// 只要没有异常,就确认收到消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception exception) {channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}
接口
此时需要先查询taskId,根据任务详情单进行回滚
@Override@Transactionalpublic void handleStockLockedReleaseByOrderSend(OrderTo orderTo) {// 通过任务表查询任务idString orderSn = orderTo.getOrderSn();Long taskId = orderTaskService.getTaskIdByOrderSn(orderSn);// 查询所有任务详情的信息,注意找的是没有解锁的任务详情List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", taskId).eq("lock_status", 1));// 遍历解锁库存for (WareOrderTaskDetailEntity entity : list) {unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());}}
如何保证消息的可靠性
消息丢失(三种情况)
消息丢失一消息发送出去,由于网络问题没有抵达服务器
- 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式
- 做好日志记录,每个消息状态是否都被服务器收到都应该记录
- 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发一
消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。
- publishe地必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
- 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队
消息重复(三种情况)
消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
消息消费失败,由于重试机制,自动又将消息发送出去
成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
- 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志
- 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理
- rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的
消息重复很好解决,库存服务在业务逻辑上也就放置了幂等性。
消息积压
消费者宕机积压
消费者消费能力不足积压
发送者发送流量太大。
- 上线更多的消费者,进行正常消费
- 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
项目也可以使用解锁库存的服务专为为一个微服务,可以解决消息挤压。
