简介

订单中心

电商系统涉及到3流,分别时信息流,资金流,物流,而订单系统作为中枢将三者有机的集合起来。订单模块是电商系统的枢纽,在订单这个环节上需求获取多个模块的数据和信息,同时对这些信息进行加工处理后流向下个环节,这一系列就构成了订单的信息流通。

订单构成

订单服务 - 图1

流程

订单服务 - 图2

拦截器

订单模块需要用户登录后操作

步骤:

  • 添加拦截器
  1. @Component
  2. public class LoginUserInterceptor implements HandlerInterceptor {
  3. public static ThreadLocal<MemberRespVo> threadLocal = new ThreadLocal<>();
  4. @Override
  5. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  6. HttpSession session = request.getSession();
  7. MemberRespVo memberRespVo = (MemberRespVo) session.getAttribute(AuthConstant.LOGIN_USER);
  8. if (memberRespVo != null) {
  9. // 保存到ThreadLocal中
  10. threadLocal.set(memberRespVo);
  11. return true;
  12. }else {
  13. // 设置提示
  14. request.getSession().setAttribute("msg", "请先登录!");
  15. // 重定向去登录
  16. response.sendRedirect("http://auth.gulimalls.com/login.html");
  17. return false;
  18. }
  19. }
  20. }
  • 添加配置使拦截器生效
  1. @Configuration
  2. public class MyWebConfig implements WebMvcConfigurer {
  3. @Override
  4. public void addInterceptors(InterceptorRegistry registry) {
  5. registry.addInterceptor(new LoginUserInterceptor()).addPathPatterns("/**");
  6. }
  7. }

确认订单页流程

订单模型抽取

  1. /**
  2. * 结算页VO(confirm.html需要的页面数据)
  3. *
  4. * @author: wan
  5. */
  6. public class OrderConfirmVO {
  7. /**
  8. * 会员收获地址列表,ums_member_receive_address
  9. **/
  10. @Getter
  11. @Setter
  12. List<MemberAddressVO> memberAddressVos;
  13. /**
  14. * 所有选中的购物项【购物车中的选中项】
  15. **/
  16. @Getter
  17. @Setter
  18. List<OrderItemVO> items;
  19. /**
  20. * 优惠券(会员积分)
  21. **/
  22. @Getter
  23. @Setter
  24. private Integer integration;
  25. /**
  26. * TODO 防止重复提交的令牌 幂等性
  27. **/
  28. @Getter
  29. @Setter
  30. private String uniqueToken;
  31. /**
  32. * 库存
  33. * 有货/无货,不放在item里面
  34. */
  35. @Getter
  36. @Setter
  37. Map<Long, Boolean> stocks;
  38. /**
  39. * 总商品金额
  40. **/
  41. //BigDecimal total;
  42. //计算订单总额
  43. public BigDecimal getTotal() {
  44. BigDecimal totalNum = BigDecimal.ZERO;
  45. if (!CollectionUtils.isEmpty(items)) {
  46. for (OrderItemVO item : items) {
  47. // 计算当前商品总价格
  48. BigDecimal itemPrice = item.getPrice().multiply(new BigDecimal(item.getCount()));
  49. // 累加全部商品总价格
  50. totalNum = totalNum.add(itemPrice);
  51. }
  52. }
  53. return totalNum;
  54. }
  55. /**
  56. * 应付总额
  57. **/
  58. //BigDecimal payPrice;
  59. public BigDecimal getPayPrice() {
  60. return getTotal();
  61. }
  62. /**
  63. * 商品总数
  64. */
  65. public Integer getCount() {
  66. Integer count = 0;
  67. if (!CollectionUtils.isEmpty(items)) {
  68. for (OrderItemVO item : items) {
  69. count += item.getCount();
  70. }
  71. }
  72. return count;
  73. }
  74. }

用户地址

  1. /**
  2. * 收获地址,结算页VO
  3. * @author: wan
  4. */
  5. @Data
  6. public class MemberAddressVO {
  7. private Long id;
  8. /**
  9. * member_id
  10. */
  11. private Long memberId;
  12. /**
  13. * 收货人姓名
  14. */
  15. private String name;
  16. /**
  17. * 电话
  18. */
  19. private String phone;
  20. /**
  21. * 邮政编码
  22. */
  23. private String postCode;
  24. /**
  25. * 省份/直辖市
  26. */
  27. private String province;
  28. /**
  29. * 城市
  30. */
  31. private String city;
  32. /**
  33. * 区
  34. */
  35. private String region;
  36. /**
  37. * 详细地址(街道)
  38. */
  39. private String detailAddress;
  40. /**
  41. * 省市区代码
  42. */
  43. private String areacode;
  44. /**
  45. * 是否默认
  46. */
  47. private Integer defaultStatus;
  48. }

勾选了的购物项

  1. /**
  2. * @Description: 购物车所有购物项
  3. */
  4. @Data
  5. public class OrderItemVO {
  6. private Long skuId; // skuId
  7. private Boolean check = true; // 是否选中
  8. private String title; // 标题
  9. private String image; // 图片
  10. private List<String> skuAttrValues;// 商品销售属性 ["颜色:星河银","版本:8GB+256GB"]
  11. private BigDecimal price; // 单价
  12. private Integer count; // 当前商品数量
  13. private BigDecimal totalPrice; // 总价
  14. private BigDecimal weight = new BigDecimal("0.085");// 商品重量
  15. }

业务逻辑

Controller

  1. @Controller
  2. public class OrderWebController {
  3. @Autowired
  4. private OrderService orderService;
  5. @GetMapping("/toTrade")
  6. public String toTrade(Model model) {
  7. OrderConfirmVO orderConfirmVO = orderService.getOrderConfirm();
  8. model.addAttribute("confirmOrderData", orderConfirmVO);
  9. return "confirm";
  10. }
  11. }

接口实现类

  1. @Override
  2. public OrderConfirmVO getOrderConfirm() {
  3. OrderConfirmVO orderConfirmVO = new OrderConfirmVO();
  4. MemberRespVo member = LoginUserInterceptor.threadLocal.get();
  5. // 远程调用查用户的地址
  6. List<MemberAddressVO> memberAddress = memberServiceClient.getMemberAddress(member.getId());
  7. orderConfirmVO.setMemberAddressVos(memberAddress);
  8. // 远程调用查用户选中了的购物项
  9. List<OrderItemVO> cartItems = cartServiceClient.getCartItems();
  10. orderConfirmVO.setItems(cartItems);
  11. // 优惠信息在用户信息中已经保存
  12. orderConfirmVO.setIntegration(member.getIntegration());
  13. // 其他数据计算
  14. // TODO 放重令牌
  15. return orderConfirmVO;
  16. }

注意此时调用远程调用了购物车模块查询购物项

  1. @Override
  2. public List<CartItemVo> getCartItems() {
  3. UserInfoTo userInfoTo = CartInterceptor.threadLocal.get();
  4. if (userInfoTo.getUserId() == null) {
  5. return null;
  6. } else {
  7. // 从redis中查出
  8. List<CartItemVo> cartItems = getCartItems(CartConstant.CART_PREFIX + userInfoTo.getUserId());
  9. // 过滤出选中的,且更新最新的价格
  10. List<CartItemVo> collect = cartItems.stream().filter(CartItemVo::getCheck).map((item) -> {
  11. // 远程调用查询
  12. BigDecimal skuPrice = productFeignService.getSkuPrice(item.getSkuId());
  13. item.setPrice(skuPrice);
  14. return item;
  15. }).collect(Collectors.toList());
  16. return collect;
  17. }
  18. }

此时会从threadLocal中取出user-key,但是此时在cart服务的拦截器会先对feign请求进行拦截,但此时请求却没有cookie信息。这就是feign调用不携带请求头的问题。

Feign远程调用丢失请求头问题

原因

主要原因还是feign调用时,会创建一个新请求来调用。

订单服务 - 图3

此时配置拦截器,注入到容器中,feign创建的新请求会被拦截器拦截。

Feign底层

feign底层可以注册多个拦截器处理请求

订单服务 - 图4

这个方法会遍历注册的拦截器,并调用拦截器的apply方法执行

订单服务 - 图5

拦截器接口

订单服务 - 图6

所以我们要返回这个接口的实例,注入到容器中。

配置拦截器

  1. @Configuration
  2. public class GuliFeignConfig {
  3. @Bean("requestInterceptor")
  4. public RequestInterceptor requestInterceptor() {
  5. return new RequestInterceptor() {
  6. /**
  7. * 重写apply方法
  8. * @param requestTemplate 新请求
  9. */
  10. @Override
  11. public void apply(RequestTemplate requestTemplate) {
  12. // 获取原来的请求
  13. // 拿到刚进来的请求RequestContextHolder
  14. // ServletRequestAttributes是spring封装的,AbstractRequestAttributes(RequestContextHolder返回类型)子类
  15. // 如果不使用的化,也可以通过在controller中接收request,并通过ThreadLocal也可以共享,只是这里spring提供了可以直接获取的类
  16. ServletRequestAttributes servletRequestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
  17. HttpServletRequest request = servletRequestAttributes.getRequest();
  18. // 同步请求cookie,老请求放到新请求中
  19. // 所有Cookie被封装在请求头中,所以吧Cookie放到新请求即可
  20. String cookie = request.getHeader("Cookie");
  21. requestTemplate.header("Cookie", cookie);
  22. }
  23. };
  24. }
  25. }

异步优化

使用异步编排优化业务执行,线程池的配置在其他服务已经配置过,复制配置即可。

  1. @Override
  2. public OrderConfirmVO getOrderConfirm() throws ExecutionException, InterruptedException {
  3. OrderConfirmVO orderConfirmVO = new OrderConfirmVO();
  4. MemberRespVo member = LoginUserInterceptor.threadLocal.get();
  5. // 先从父线程获取原请求
  6. RequestAttributes request = RequestContextHolder.getRequestAttributes();
  7. CompletableFuture<Void> getAddress = CompletableFuture.runAsync(() -> {
  8. // 在子线程调用时先共享请求为父线程的请求
  9. RequestContextHolder.setRequestAttributes(request);
  10. // 远程调用查用户的地址
  11. List<MemberAddressVO> memberAddress = memberServiceClient.getMemberAddress(member.getId());
  12. orderConfirmVO.setMemberAddressVos(memberAddress);
  13. }, threadPoolExecutor);
  14. CompletableFuture<Void> getItem = CompletableFuture.runAsync(() -> {
  15. // 在子线程调用时先共享请求为父线程的请求
  16. RequestContextHolder.setRequestAttributes(request);
  17. // 远程调用查用户选中了的购物项
  18. List<OrderItemVO> cartItems = cartServiceClient.getCartItems();
  19. orderConfirmVO.setItems(cartItems);
  20. }, threadPoolExecutor).thenRunAsync(() -> {
  21. // 远程调用查询库存信息
  22. // 先收集skuId
  23. List<OrderItemVO> items = orderConfirmVO.getItems();
  24. List<Long> skuIdList = items.stream().map(orderItemVo -> orderItemVo.getSkuId()).collect(Collectors.toList());
  25. R r = wmsServiceClient.getSkuHasStock(skuIdList);
  26. List<SkuHasStockTo> data = r.getData(new TypeReference<List<SkuHasStockTo>>() {
  27. });
  28. // 封装库存信息
  29. if (data != null) {
  30. Map<Long, Boolean> collect = data.stream().collect(Collectors.toMap(SkuHasStockTo::getSkuId, SkuHasStockTo::getHasStock));
  31. orderConfirmVO.setStocks(collect);
  32. }
  33. });
  34. // 优惠信息在用户信息中已经保存
  35. orderConfirmVO.setIntegration(member.getIntegration());
  36. // 其他数据计算
  37. // TODO 放重令牌
  38. // 等待2个异步任务
  39. CompletableFuture.allOf(getAddress, getItem).get();
  40. return orderConfirmVO;
  41. }

此时又会遇到问题,就是在两个线程中的请求不共享,2个线程分别调用远程服务,此时也就是没有携带cookie调用,所以会调用失败。

也就是threadLocal作用只是在父线程,而没有在子线程。

异步编排feign调用不丢失请求问题

此时解决方法就是子线程需要共享父线程的请求。此时请求就携带了cookie。

原因:

使用异步编排时,非同一线程无法取到RequestContextHolder(上下文环境保持器)
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();// 获取controller请求对象空指针异常

解决:获取主线程ServletRequestAttributes,给每个异步线程复制一份

  1. // 先从父线程获取原请求
  2. RequestAttributes request = RequestContextHolder.getRequestAttributes();
  3. CompletableFuture<Void> getAddress = CompletableFuture.runAsync(() -> {
  4. // 在子线程调用时先共享请求为父线程的请求
  5. RequestContextHolder.setRequestAttributes(request);
  6. ...
  7. }, threadPoolExecutor);
  8. CompletableFuture<Void> getItem = CompletableFuture.runAsync(() -> {
  9. // 在子线程调用时先共享请求为父线程的请求
  10. RequestContextHolder.setRequestAttributes(request);
  11. ...
  12. }, threadPoolExecutor).thenRunAsync(() -> {
  13. ...
  14. List<SkuHasStockTo> data = r.getData(new TypeReference<List<SkuHasStockTo>>() {
  15. });

计算运费

  1. @Override
  2. public FareVo getFare(Long addrId) {
  3. FareVo fareVo = new FareVo();
  4. // 远程调用
  5. R r = memberFeignService.info(addrId);
  6. MemberAddressVo memberAddressVo = r.getData("memberReceiveAddress", new TypeReference<MemberAddressVo>() {
  7. });
  8. if (memberAddressVo != null) {
  9. String phone = memberAddressVo.getPhone();
  10. //截取用户手机号码最后一位作为我们的运费计算
  11. //1558022051
  12. String fare = phone.substring(phone.length() - 1);
  13. BigDecimal bigDecimal = new BigDecimal(fare);
  14. fareVo.setFare(bigDecimal);
  15. fareVo.setAddress(memberAddressVo);
  16. return fareVo;
  17. }
  18. return null;
  19. }

创建订单流程

接口幂等性问题

具体问题见文档,使用token保证接口幂等性,注意保证查询token和删除token是原子的。

订单服务 - 图7

流程

加上接口幂等性校验流程图

订单服务 - 图8

  1. @Override
  2. public SubmitOrderResponseVo getSubmitOrderResponseVo(OrderSubmitVo orderSubmitVo) {
  3. SubmitOrderResponseVo submitOrderResponseVo = new SubmitOrderResponseVo();
  4. // 获取用户信息
  5. MemberRespVo member = LoginUserInterceptor.threadLocal.get();
  6. String submitToken = orderSubmitVo.getUniqueToken();
  7. orderSubmitThreadLocal.set(orderSubmitVo);
  8. // Lua脚本含义:
  9. // 查找KEYS[1]的值,如果等于ARGV[1],就进行删除,删除成功返回1,删除失败返回0,如果没找到返回0
  10. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
  11. // 使用Lua脚本保证令牌的验证和删除是原子性的
  12. Long flag = redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + member.getId()), submitToken);
  13. if (flag == 1L) {
  14. // 创建订单
  15. OrderCreateTo order = createOrder();
  16. // 验价
  17. BigDecimal totalAmount = order.getOrder().getPayAmount();
  18. BigDecimal payPrice = orderSubmitVo.getPayPrice();
  19. if (Math.abs(totalAmount.subtract(payPrice).doubleValue()) < 0.01) {
  20. // 金额对比成功
  21. // 保存订单
  22. saveOrder(order);
  23. // 锁定库存
  24. WareSkuLockVo wareSkuLockVo = new WareSkuLockVo();
  25. wareSkuLockVo.setOrderSn(order.getOrder().getOrderSn());
  26. List<OrderItemEntity> orderItems = order.getOrderItems();
  27. // 封装一个类
  28. List<OrderItemVo> orderItemVoList = orderItems.stream().map(entity -> {
  29. OrderItemVo orderItemVo = new OrderItemVo();
  30. orderItemVo.setSkuId(entity.getSkuId());
  31. orderItemVo.setTitle(entity.getSkuName());
  32. orderItemVo.setCount(entity.getSkuQuantity());
  33. return orderItemVo;
  34. }).collect(Collectors.toList());
  35. wareSkuLockVo.setLocks(orderItemVoList);
  36. // 远程调用
  37. R r = wmsServiceClient.lockOrderStock(wareSkuLockVo);
  38. if (r.getCode() == 0) {
  39. // 锁住库存成功
  40. submitOrderResponseVo.setOrder(order.getOrder());
  41. submitOrderResponseVo.setCode(0);
  42. // TODO 远程扣减积分
  43. return submitOrderResponseVo;
  44. } else {
  45. submitOrderResponseVo.setCode(3);
  46. throw new NoStockException((String)r.get("msg"));
  47. }
  48. } else {
  49. // 金额对比失败
  50. submitOrderResponseVo.setCode(2);
  51. return submitOrderResponseVo;
  52. }
  53. } else {
  54. // 令牌校验失败
  55. submitOrderResponseVo.setCode(1);
  56. return submitOrderResponseVo;
  57. }
  58. }

创建订单流程图

订单服务 - 图9

创建订单TO对象方法

  1. private OrderCreateTo createOrder() {
  2. OrderCreateTo orderCreateTo = new OrderCreateTo();
  3. // 创建订单
  4. String orderSn = IdWorker.getTimeId();
  5. OrderEntity orderEntity = buildOrder(orderSn);
  6. // 生成订单项实体对象
  7. List<OrderItemEntity> orderItemEntities = buildOrderItems(orderSn);
  8. // 计算价格
  9. summaryFillOrder(orderEntity, orderItemEntities);
  10. orderCreateTo.setOrder(orderEntity);
  11. orderCreateTo.setOrderItems(orderItemEntities);
  12. return orderCreateTo;
  13. }

创建订单方法,和生成订单项集合封装方法

  1. /**
  2. * 创建订单方法
  3. *
  4. * @param orderSn
  5. * @return
  6. */
  7. private OrderEntity buildOrder(String orderSn) {
  8. // 设置订单号
  9. OrderEntity orderEntity = new OrderEntity();
  10. orderEntity.setOrderSn(orderSn);
  11. // 设置memberId
  12. MemberRespVo member = LoginUserInterceptor.threadLocal.get();
  13. orderEntity.setMemberId(member.getId());
  14. // 查找发送地址
  15. OrderSubmitVo orderSubmitVo = orderSubmitThreadLocal.get();
  16. R r = wmsServiceClient.getFare(orderSubmitVo.getAddrId());
  17. FareVo fareVo = r.getData(new TypeReference<FareVo>() {
  18. });
  19. orderEntity.setFreightAmount(fareVo.getFare());
  20. // 4.封装收货地址信息
  21. orderEntity.setReceiverName(fareVo.getAddress().getName());// 收货人名字
  22. orderEntity.setReceiverPhone(fareVo.getAddress().getPhone());// 收货人电话
  23. orderEntity.setReceiverProvince(fareVo.getAddress().getProvince());// 省
  24. orderEntity.setReceiverCity(fareVo.getAddress().getCity());// 市
  25. orderEntity.setReceiverRegion(fareVo.getAddress().getRegion());// 区
  26. orderEntity.setReceiverDetailAddress(fareVo.getAddress().getDetailAddress());// 详细地址
  27. orderEntity.setReceiverPostCode(fareVo.getAddress().getPostCode());// 收货人邮编
  28. // 5.封装订单状态信息
  29. orderEntity.setStatus(OrderConstant.OrderStatusEnum.CREATE_NEW.getCode());
  30. // 6.设置自动确认时间
  31. orderEntity.setAutoConfirmDay(OrderConstant.autoConfirmDay);// 7天
  32. // 7.设置未删除状态
  33. orderEntity.setDeleteStatus(OrderConstant.OrderIsDeleteEnum.NOT_DELETE.getIsDelete());
  34. // 8.设置时间
  35. Date now = new Date();
  36. orderEntity.setCreateTime(now);
  37. orderEntity.setModifyTime(now);
  38. return orderEntity;
  39. }
  40. /**
  41. * 生成订单项实体对象集合
  42. *
  43. * @param orderSn
  44. * @return
  45. */
  46. private List<OrderItemEntity> buildOrderItems(String orderSn) {
  47. List<OrderItemVo> cartItems = cartServiceClient.getCartItems();
  48. List<OrderItemEntity> orderItemEntityList = new ArrayList<>();
  49. if (!CollectionUtils.isEmpty(cartItems)) {
  50. orderItemEntityList = cartItems.stream()
  51. .filter(OrderItemVo::getCheck)
  52. .map(item -> buildOrderItem(orderSn, item))
  53. .collect(Collectors.toList());
  54. }
  55. return orderItemEntityList;
  56. }
  57. /**
  58. * 封装每一个购物项
  59. *
  60. * @param orderSn
  61. * @param cartItem
  62. * @return
  63. */
  64. private OrderItemEntity buildOrderItem(String orderSn, OrderItemVo cartItem) {
  65. OrderItemEntity itemEntity = new OrderItemEntity();
  66. // 1.封装订单号
  67. itemEntity.setOrderSn(orderSn);
  68. // 2.封装SPU信息
  69. R spuInfo = productServiceClient.getSpuBySkuId(cartItem.getSkuId());// 查询SPU信息
  70. SpuInfoTo spuInfoTO = spuInfo.getData(new TypeReference<SpuInfoTo>() {
  71. });
  72. itemEntity.setSpuId(spuInfoTO.getId());
  73. itemEntity.setSpuName(spuInfoTO.getSpuName());
  74. itemEntity.setSpuBrand(spuInfoTO.getSpuName());
  75. itemEntity.setCategoryId(spuInfoTO.getCatalogId());
  76. // 3.封装SKU信息
  77. itemEntity.setSkuId(cartItem.getSkuId());
  78. itemEntity.setSkuName(cartItem.getTitle());
  79. itemEntity.setSkuPic(cartItem.getImage());// 商品sku图片
  80. itemEntity.setSkuPrice(cartItem.getPrice());// 这个是最新价格,购物车模块查询数据库得到
  81. itemEntity.setSkuQuantity(cartItem.getCount());// 当前商品数量
  82. String skuAttrsVals = String.join(";", cartItem.getSkuAttrValues());
  83. itemEntity.setSkuAttrsVals(skuAttrsVals);// 商品销售属性组合["颜色:星河银","版本:8GB+256GB"]
  84. // 4.优惠信息【不做】
  85. // 5.积分信息
  86. int num = cartItem.getPrice().multiply(new BigDecimal(cartItem.getCount())).intValue();// 分值=单价*数量
  87. itemEntity.setGiftGrowth(num);// 成长值
  88. itemEntity.setGiftIntegration(num);// 积分
  89. // 6.价格信息
  90. itemEntity.setPromotionAmount(BigDecimal.ZERO);// 促销金额
  91. itemEntity.setCouponAmount(BigDecimal.ZERO);// 优惠券金额
  92. itemEntity.setIntegrationAmount(BigDecimal.ZERO);// 积分优惠金额
  93. BigDecimal realAmount = itemEntity.getSkuPrice().multiply(new BigDecimal(itemEntity.getSkuQuantity()))
  94. .subtract(itemEntity.getPromotionAmount())
  95. .subtract(itemEntity.getCouponAmount())
  96. .subtract(itemEntity.getIntegrationAmount());
  97. itemEntity.setRealAmount(realAmount);// 实际金额,减去所有优惠金额
  98. return itemEntity;
  99. }

验价之后保存订单,保存订单后锁定库存

  1. /**
  2. * 保存订单
  3. *
  4. * @param orderCreateTo
  5. */
  6. @Transactional
  7. void saveOrder(OrderCreateTo orderCreateTo) {
  8. OrderEntity order = orderCreateTo.getOrder();
  9. this.save(order);
  10. List<OrderItemEntity> orderItems = orderCreateTo.getOrderItems();
  11. orderItemService.saveBatch(orderItems);
  12. }

锁定库存

订单服务 - 图10

  1. if (r.getCode() == 0) {
  2. // 锁住库存成功
  3. submitOrderResponseVo.setOrder(order.getOrder());
  4. submitOrderResponseVo.setCode(0);
  5. // TODO 远程扣减积分
  6. return submitOrderResponseVo;
  7. }

锁定库存如果在远程扣减积分出现异常,那么就会库存不会回滚,此时就有分布式异常的问题。

远程调用库存扣减的方法:

主要就是对lock_stock字段进行操作。

  1. /**
  2. * 锁定库存,sql执行锁定锁定
  3. *
  4. * @param wareSkuLockVo
  5. * @return
  6. */
  7. @Transactional(rollbackFor = NoStockException.class)
  8. @Override
  9. public Boolean lockOrderStock(WareSkuLockVo wareSkuLockVo) {
  10. // 按照收货地址找到就近仓库,锁定库存(暂未实现)
  11. // 采用方案:获取每项商品在哪些仓库有库存,轮询尝试锁定,任一商品锁定失败回滚
  12. // 找到这个商品在哪里有库存,封装成一个对象
  13. List<OrderItemVo> locks = wareSkuLockVo.getLocks();
  14. List<SkuWareHasStock> skuWareHasStockList = locks.stream().map(item -> {
  15. SkuWareHasStock skuWareHasStock = new SkuWareHasStock();
  16. Long skuId = item.getSkuId();
  17. List<Long> wareIds = wareSkuDao.getSkuStockWareIds(skuId, item.getCount());
  18. // 没有仓库
  19. if (wareIds == null && wareIds.size() == 0) {
  20. throw new NoStockException(skuId);
  21. }
  22. skuWareHasStock.setLockNum(item.getCount());
  23. skuWareHasStock.setSkuId(skuId);
  24. skuWareHasStock.setWareIds(wareIds);
  25. return skuWareHasStock;
  26. }).collect(Collectors.toList());
  27. for (SkuWareHasStock skuWareHasStock : skuWareHasStockList) {
  28. // 是否锁定成功的标志
  29. Boolean skuStockLock = false;
  30. Long skuId = skuWareHasStock.getSkuId();
  31. List<Long> wareIds = skuWareHasStock.getWareIds();
  32. if (CollectionUtils.isEmpty(wareIds)) {
  33. // 只要有一个货物没有仓库有他库存,直接抛出异常,锁定库存失败,回滚
  34. throw new NoStockException(skuId);
  35. } else {
  36. // 有多个仓库有库存,此时需要判断哪个仓库够
  37. for (Long wareId : wareIds) {
  38. // 锁定成功就返回1,不成功返回0(代表影响的行数)
  39. Long count = wareSkuDao.lockSkuStock(skuId, wareId, skuWareHasStock.getLockNum());
  40. if (count == 1) {
  41. // 表示已经有一个仓库锁住了
  42. // 标志置为true,停止锁下一个仓库
  43. skuStockLock = true;
  44. break;
  45. }
  46. // 当前仓库失败,重试下一个仓库
  47. // 所有仓库都没成功锁住,抛出异常
  48. if (skuStockLock == false) {
  49. throw new NoStockException(skuId);
  50. }
  51. }
  52. }
  53. }
  54. return true;
  55. }

分布式事务的解决

参照分布式事务的文档。

后台保存商品分布式方案

Seata的使用参照官网:https://seata.io/zh-cn/docs/user/quickstart.html

前台订单分布式方案

本项目订单并没有使用Seata来解决库存的分布式事务问题,Seata在解决分布式事务的过程中会加锁一系列操作,出现异常直接回滚。并发程度并不是特别高,但后台保存商品可以使用Seata,此时并发并不是特别高,所以本项目分布式事务为了高并发,只保持最终一致性即可,可以使用消息队列。(延时队列)

保存库存锁定成功的消息在延时队列中,30分钟后把消息发送给解锁库存服务,解锁服务如果此时没有查看到订单,也就是可能被人取消了,此时就需要执行库存回滚,也就是定时任务的效果。

image.png

延时队列场景

订单服务 - 图12

为什么不使用Spring提供的定时任务?

订单服务 - 图13

定时任务的时效性问题:如果在下单之前开启定时任务,订单在1才下好单,那么订单在30分钟也就是31的时候才会算过期,而定时任务在30的时候检测没有过期,所以不回滚,在下一个定时任务才可以检测到。时差很大。不采用,而采用延时对了

订单服务 - 图14

死信队列和死信交换机

订单服务 - 图15

延时队列的实现

方法一:设置队列过期时间

订单服务 - 图16

方法二:设置消息过期时间

订单服务 - 图17

延时队列模拟关单

使用2个交换机

订单服务 - 图18

但可以优化一下,复用同一个交换机,根据routing-key不同进行转发

订单服务 - 图19

设计建议规范:(基于事件模型的交换机设计)

1、交换机命名:业务+exchange;交换机为Topic

2、路由键:事件.需要感知的业务(可以不写)

3、队列命名:事件+想要监听服务名+queue

4、绑定关系:事件.感知的业务(#)

  1. @Bean
  2. public Queue orderDelayQueue() {
  3. Map<String, Object> arguments = new HashMap<>();
  4. // 延时队列的死信交换机和routing-key
  5. arguments.put("x-dead-letter-exchange", "order-event-exchange");
  6. arguments.put("x-dead-letter-routing-key", "order.release.order");
  7. arguments.put("x-message-ttl", 60000);
  8. return new Queue("order.delay.queue", true, false, false, arguments);
  9. }
  10. @Bean
  11. public Queue orderReleaseOrderQueue() {
  12. return new Queue("order.release.order.queue", true, false, false, null);
  13. }
  14. @Bean
  15. public Exchange orderEventExchange() {
  16. return new TopicExchange("order-event-exchange", true, false);
  17. }
  18. @Bean
  19. public Binding orderCreateOrderBinding() {
  20. return new Binding("order.delay.queue",
  21. Binding.DestinationType.QUEUE,
  22. "order-event-exchange",
  23. "order.create.order",
  24. null);
  25. }
  26. @Bean
  27. public Binding orderReleaseOrderBinding() {
  28. return new Binding("order.release.order.queue",
  29. Binding.DestinationType.QUEUE,
  30. "order-event-exchange",
  31. "order.release.order",
  32. null);
  33. }

利用消息队列就解决库存问题

库存问题解决流程图

订单服务 - 图20

根据图示创建相应的队列和交换机

根据模拟关单可以是由一个交换机代替两个交换机

  1. @Configuration
  2. public class RabbitMqConfig {
  3. /**
  4. * 配置发送对象到消息队列为Json格式
  5. *
  6. * @return
  7. */
  8. @Bean
  9. public MessageConverter messageConverter() {
  10. return new Jackson2JsonMessageConverter();
  11. }
  12. @Bean
  13. public Queue stockDelayQueue() {
  14. Map<String, Object> arguments = new HashMap<>();
  15. // 延时队列的死信交换机和routing-key
  16. arguments.put("x-dead-letter-exchange", "stock-event-exchange");
  17. arguments.put("x-dead-letter-routing-key", "stock.release");
  18. arguments.put("x-message-ttl", 120000);
  19. return new Queue("stock.delay.queue", true, false, false, arguments);
  20. }
  21. @Bean
  22. public Queue stockReleaseOrderQueue() {
  23. return new Queue("stock.release.stock.queue", true, false, false, null);
  24. }
  25. @Bean
  26. public Exchange stockEventExchange() {
  27. return new TopicExchange("stock-event-exchange", true, false);
  28. }
  29. @Bean
  30. public Binding orderCreateOrderBinding() {
  31. return new Binding("stock.delay.queue",
  32. Binding.DestinationType.QUEUE,
  33. "stock-event-exchange",
  34. "stock.locked",
  35. null);
  36. }
  37. @Bean
  38. public Binding stockReleaseOrderBinding() {
  39. return new Binding("stock.release.stock.queue",
  40. Binding.DestinationType.QUEUE,
  41. "stock-event-exchange",
  42. "stock.release.#",
  43. null);
  44. }
  45. }

进行库存回滚的操作

注意先开启消息手动确认:

防止消息解锁库存出现异常之后,不重新处理,此时库存就未解锁,有很大问题。

  1. spring:
  2. listener:
  3. simple:
  4. acknowledge-mode: manual # 配置ack为手动应答

回滚的情况判断:

  1. /**
  2. * 回滚消息的逻辑:
  3. * 一.先在任务详情表中查询消息:
  4. * 1.如果没有,那么说明在锁定库存的时候已经出现了错误,在本地事务已经进行了回滚,所以所以此时不需要回滚
  5. * 2.如果有:
  6. * 解锁需要判断订单情况:
  7. * 如果没有这个订单,必须解锁。
  8. * 如果有这个订单:1.订单未取消,不解锁 2.订单取消了,解锁
  9. */
  1. public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
  2. log.info("收到回滚消息");
  3. Long id = stockLockedTo.getId();
  4. WareOrderTaskDetailEntity byId = wareOrderTaskDetailService.getById(id);
  5. if (byId != null) {
  6. // 此时远程调用查询订单情况
  7. WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
  8. R r = orderFeignService.getOrderStatus(taskEntity.getOrderSn());
  9. if (r.getCode() == 0) {
  10. OrderVo data = r.getData(new TypeReference<OrderVo>() {
  11. });
  12. // 只有不存在订单和订单一已经关闭才解锁
  13. if (data == null || data.getStatus() == 4) {
  14. // 调用解锁方法
  15. unLockStock(byId.getSkuId(), byId.getWareId(), byId.getSkuNum());
  16. // 手动应答
  17. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  18. }
  19. } else {
  20. // 如果远程调用失败,不应答,再次放入队列中,下一次应答
  21. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  22. }
  23. } else {
  24. // 没有任务详情表,说明已经回滚,不解锁
  25. // 手动应答
  26. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  27. }
  28. }
  29. private void unLockStock(Long skuId, Long wareId, Integer num) {
  30. wareSkuDao.unLockStock(skuId, wareId, num);
  31. }

优化

(封装成一个方法)接收消息的listener

  1. @Service
  2. @Slf4j
  3. @RabbitListener(queues = {"stock.release.stock.queue"})
  4. public class RabbitMQListener {
  5. @Autowired
  6. private WareSkuService wareSkuService;
  7. @RabbitHandler
  8. public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
  9. log.info("收到自动回滚消息,开始处理回滚操作");
  10. try {
  11. wareSkuService.handleStockLockedRelease(stockLockedTo);
  12. // 只要没有异常,就确认收到消息
  13. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  14. } catch (Exception exception) {
  15. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  16. }
  17. }
  18. }

调用方法:此时只有异常,也就是远程调用查询订单出现了异常抛出异常,进行手动的ack拒接。其他不解锁的情况也就不会滚库存数量。

  1. public void handleStockLockedRelease(StockLockedTo stockLockedTo) throws Exception {
  2. /**
  3. * 回滚消息的逻辑:
  4. * 一.先在任务详情表中查询消息:
  5. * 1.如果没有,那么说明在锁定库存的时候已经出现了错误,在本地事务已经进行了回滚,所以所以此时不需要回滚
  6. * 2.如果有:
  7. * 解锁需要判断订单情况:
  8. * 如果没有这个订单,必须解锁。
  9. * 如果有这个订单:1.订单未取消,不解锁 2.订单取消了,解锁
  10. */
  11. Long id = stockLockedTo.getId();
  12. StockDetailTo stockDetailTo = stockLockedTo.getDetail();
  13. WareOrderTaskDetailEntity taskDetail = wareOrderTaskDetailService.getById(stockDetailTo.getId());
  14. if (taskDetail != null) {
  15. // 此时远程调用查询订单情况
  16. WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);
  17. R r = orderFeignService.getOrderStatus(taskEntity.getOrderSn());
  18. if (r.getCode() == 0) {
  19. OrderVo data = r.getData(new TypeReference<OrderVo>() {
  20. });
  21. // 只有不存在订单和订单一已经关闭才解锁
  22. if (data == null || data.getStatus() == 4) {
  23. // 当任务详情表中的状态为1,也就是未解锁的时候才进行解锁
  24. if (taskDetail.getLockStatus() == 1) {
  25. // 调用解锁方法
  26. unLockStock(taskDetail.getSkuId(), taskDetail.getWareId(), taskDetail.getSkuNum(), taskDetail.getId());
  27. } else {
  28. // 订单其他状态,不可解锁(消息确认)
  29. }
  30. }
  31. } else {
  32. // 订单远程调用失败(消息重新入队)
  33. throw new RuntimeException("解锁异常");
  34. }
  35. } else {
  36. // 无库存锁定工作单记录,已回滚,无需解锁(消息确认)
  37. }
  38. }
  39. /**
  40. * 解锁库存
  41. *
  42. * @param skuId
  43. * @param wareId
  44. * @param num
  45. * @param taskId
  46. */
  47. @Transactional
  48. public void unLockStock(Long skuId, Long wareId, Integer num, Long taskId) {
  49. // 仓库解锁
  50. wareSkuDao.unLockStock(skuId, wareId, num);
  51. // 更新解锁任务单详情表的解锁状态
  52. WareOrderTaskDetailEntity detail = new WareOrderTaskDetailEntity();
  53. detail.setId(taskId);
  54. detail.setLockStatus(2);
  55. wareOrderTaskDetailService.updateById(detail);
  56. }

发送消息

在订单模块库存调用成功后发送消息

  1. if (r.getCode() == 0) {
  2. // 锁住库存成功
  3. submitOrderResponseVo.setOrder(order.getOrder());
  4. submitOrderResponseVo.setCode(0);
  5. // TODO 远程扣减积分
  6. // 发送消息到订单队列中,以后进行关单操作
  7. rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());
  8. return submitOrderResponseVo;
  9. } else {
  10. submitOrderResponseVo.setCode(3);
  11. throw new NoStockException((String) r.get("msg"));
  12. }

关单操作详情

如果订单在规定时间未进行支付,那么此时就要进行关单的功能,在关单模拟已经创建了队列和交换机,此时就是在创建订单后发送消息,在一定时间后,监听并关单即可。

关单流程图

订单服务 - 图21

监听消息

  1. @Slf4j
  2. @Service
  3. @RabbitListener(queues = {"order.release.order.queue"})
  4. public class RabbitMQListener {
  5. @Autowired
  6. private OrderService orderService;
  7. @RabbitHandler
  8. public void closeOrder(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
  9. log.info("订单超过未支付时间,开始关闭订单操作...");
  10. try {
  11. orderService.closeOrder(orderEntity);
  12. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  13. } catch (Exception exception) {
  14. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  15. }
  16. }
  17. }

修改订单状态为关单状态

  1. /**
  2. * 修改订单状态为关单
  3. *
  4. * @param orderEntity
  5. */
  6. @Override
  7. public void closeOrder(OrderEntity orderEntity) {
  8. // 修改订单状态,表示未支付
  9. OrderEntity entity = getById(orderEntity.getId());
  10. // 时间过了还未付款,关单
  11. if (entity.getStatus() == 0) {
  12. OrderEntity updateEntity = new OrderEntity();
  13. updateEntity.setId(entity.getId());
  14. updateEntity.setStatus(4);
  15. updateById(updateEntity);
  16. }
  17. }

关单操作和库存解锁的问题

问题引入和解决方法

订单服务 - 图22

一般情况库存锁定在关单操作之后,如果有网络卡顿,那么关单在解锁库存之后,此时库存的解锁的逻辑:如果有这个订单:1.订单未取消,不解锁 2.订单取消了,解锁,所以此时库存是不会解锁的,因为订单还是在新建状态,并没有关单。

为了解决这个问题:可以在订单创建之后主动发送一次库存解锁操作,此时可以保证库存在订单关单之后一定被解锁,但是如果关单在解锁库存之前,那么正常解锁库存(不是订单主动发送的解锁)应该如何判断是否解锁呢?

此时在关单主动解锁库存的时候,一定需要将任务详情单的lock_status设置未2表示已经解锁,第二次被动解锁就再次解锁库存了。

流程

此时关单的交换机需要绑定库存的死信队列,注意一定不要绑到了库存的延时队列

订单服务 - 图23

  1. @Bean
  2. public Binding orderReleaseOtherBinding() {
  3. return new Binding("stock.release.stock.queue",
  4. Binding.DestinationType.QUEUE,
  5. "order-event-exchange",
  6. "order.release.other.#",
  7. null);
  8. }

下单之后主动发送库存解锁消息

  1. /**
  2. * 修改订单状态为关单
  3. *
  4. * @param orderEntity
  5. */
  6. @Override
  7. public void closeOrder(OrderEntity orderEntity) {
  8. // 修改订单状态,表示未支付
  9. OrderEntity entity = getById(orderEntity.getId());
  10. // 时间过了还未付款,关单
  11. if (entity.getStatus() == 0) {
  12. OrderEntity updateEntity = new OrderEntity();
  13. updateEntity.setId(entity.getId());
  14. updateEntity.setStatus(4);
  15. updateById(updateEntity);
  16. }
  17. // 主动发起解锁库存消息,防止有延时导致库存未解锁,却又修改了订单
  18. OrderTo orderTo = new OrderTo();
  19. BeanUtils.copyProperties(entity, orderTo);
  20. rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
  21. }

库存监听器监听OrderTo的消息

  1. @RabbitHandler
  2. public void handleStockLockedReleaseByOrderSend(OrderTo orderTo, Message message, Channel channel) throws IOException {
  3. log.info("收到订单发送关单后的回滚消息,开始处理回滚操作");
  4. try {
  5. wareSkuService.handleStockLockedReleaseByOrderSend(orderTo);
  6. // 只要没有异常,就确认收到消息
  7. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  8. } catch (Exception exception) {
  9. channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
  10. }
  11. }

接口

此时需要先查询taskId,根据任务详情单进行回滚

  1. @Override
  2. @Transactional
  3. public void handleStockLockedReleaseByOrderSend(OrderTo orderTo) {
  4. // 通过任务表查询任务id
  5. String orderSn = orderTo.getOrderSn();
  6. Long taskId = orderTaskService.getTaskIdByOrderSn(orderSn);
  7. // 查询所有任务详情的信息,注意找的是没有解锁的任务详情
  8. List<WareOrderTaskDetailEntity> list = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>()
  9. .eq("task_id", taskId)
  10. .eq("lock_status", 1));
  11. // 遍历解锁库存
  12. for (WareOrderTaskDetailEntity entity : list) {
  13. unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());
  14. }
  15. }

如何保证消息的可靠性

消息丢失(三种情况)

消息丢失一消息发送出去,由于网络问题没有抵达服务器

  • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式
  • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
  • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发一

消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。

  • publishe地必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机

  • 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队

消息重复(三种情况)

消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者

消息消费失败,由于重试机制,自动又将消息发送出去

成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送

  • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志
  • 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理
  • rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的

消息重复很好解决,库存服务在业务逻辑上也就放置了幂等性。

消息积压

消费者宕机积压

消费者消费能力不足积压

发送者发送流量太大。

  • 上线更多的消费者,进行正常消费
  • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

项目也可以使用解锁库存的服务专为为一个微服务,可以解决消息挤压。