学习目标

  • 防止秒杀重复排队

    1. 重复排队:一个人抢购商品,如果没有支付,不允许重复排队抢购
  • 并发超卖问题解决

    1. 1个商品卖给多个人:1商品多订单
  • 秒杀订单支付

    1. 秒杀支付:改造支付流程
  • 超时支付订单库存回滚(作业)

    1. 1.RabbitMQ延时队列
    2. 2.利用延时队列实现支付订单的监听,根据订单支付状况进行订单数据库回滚

    1 防止秒杀重复排队

    用户每次抢单的时候,一旦排队,我们设置一个自增值,让该值的初始值为1,每次进入抢单的时候,对它进行递增,如果值>1,则表明已经排队,不允许重复排队,如果重复排队,则对外抛出异常,并抛出异常信息100表示已经正在排队。

    1.1 后台排队记录

    修改SeckillOrderServiceImpl的add方法,新增递增值判断是否排队中,代码如下:
    第15章 秒杀 - 图1
    上图代码如下:

    1. //Long userQueueCount = redisTemplate.boundHashOps("UserQueueCount").increment(username, 1);
    2. Long userQueueCount = redisTemplate.boundHashOps(SystemConstants.SEC_KILL_QUEUE_REPEAT_KEY).increment(username, 1);
    3. if (userQueueCount > 1) {
    4. throw new RuntimeException("重复排队了");
    5. }

    2 并发超卖问题解决

    超卖问题,这里是指多人抢购同一商品的时候,多人同时判断是否有库存,如果只剩一个,则都会判断有库存,此时会导致超卖现象产生,也就是一个商品下了多个订单的现象。
    使用分布式锁来解决:
    (1)在多线程的方法中修改之前的样子如下:
    第15章 秒杀 - 图2
    (2)修改之后如下:
    第15章 秒杀 - 图3
    第15章 秒杀 - 图4

    2.1 思路分析

    (1)首先我们进行超卖问题的解决在于 本质是多个线程同时竞争资源去修改资源,所以只要在修改资源的时候,该资源只能只有一个线程修改即可。
    (2)先针对库存的判断及减库存的操作 作为一个原子性操作 进行加锁操作即可。
    如下图:
    第15章 秒杀 - 图5
    这里我们采用分布式锁:使用分布式锁的方式有很多我们决定采用REDIS来实现分布式锁。这里提供一套官方使用的红锁来实现。

    2.2 代码实现

    (1)添加依赖:

    1. <dependency>
    2. <groupId>org.redisson</groupId>
    3. <artifactId>redisson-spring-boot-starter</artifactId>
    4. <version>3.11.6</version>
    5. </dependency>

    (2)修改多线程下单的代码
    第15章 秒杀 - 图6
    如图的代码如下:

    1. @Async
    2. public void createOrder() {
    3. try {
    4. System.out.println("准备执行...." + Thread.currentThread().getName());
    5. Thread.sleep(10000);
    6. } catch (InterruptedException e) {
    7. e.printStackTrace();
    8. }
    9. //从队列中获取排队信息
    10. SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundListOps("SeckillOrderQueue").rightPop();
    11. if (seckillStatus != null) {
    12. //时间区间
    13. String time = seckillStatus.getTime();// "2019052510";
    14. //用户登录名
    15. String username = seckillStatus.getUsername();// "szitheima";
    16. //用户抢购商品
    17. Long id = seckillStatus.getGoodsId();//1131814847898587136L;
    18. //获取商品数据
    19. SeckillGoods goods = (SeckillGoods) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).get(id);
    20. //如果没有库存,则直接抛出异常
    21. /*if (goods == null || goods.getStockCount() <= 0) {
    22. throw new RuntimeException("已售罄!");
    23. }*/
    24. //如果有库存,则创建秒杀商品订单
    25. SeckillOrder seckillOrder = new SeckillOrder();
    26. seckillOrder.setId(idWorker.nextId());
    27. seckillOrder.setSeckillId(id);
    28. seckillOrder.setMoney(goods.getCostPrice());
    29. seckillOrder.setUserId(username);
    30. seckillOrder.setCreateTime(new Date());
    31. seckillOrder.setStatus("0");
    32. //将秒杀订单存入到Redis中
    33. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).put(username, seckillOrder);
    34. //库存减少
    35. //goods.setStockCount(goods.getStockCount() - 1);
    36. //判断当前商品是否还有库存
    37. if (goods.getStockCount() <= 0) {
    38. //并且将商品数据同步到MySQL中
    39. seckillGoodsMapper.updateByPrimaryKeySelective(goods);
    40. //如果没有库存,则清空Redis缓存中该商品
    41. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).delete(id);
    42. } else {
    43. //如果有库存,则直数据重置到Reids中
    44. // redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).put(id, goods);
    45. }
    46. //抢单成功,更新抢单状态,排队->等待支付
    47. seckillStatus.setStatus(2);
    48. seckillStatus.setOrderId(seckillOrder.getId());
    49. seckillStatus.setMoney(Float.valueOf(seckillOrder.getMoney()));
    50. //redisTemplate.boundHashOps("UserQueueStatus").put(username,seckillStatus);
    51. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).put(username, seckillStatus);
    52. }
    53. }

    (3)设置调用多线程的入口的service方法add中添加如下代码,在这里我们需要减库存
    第15章 秒杀 - 图7
    代码如下:

    1. //避免超卖--> 使用分布式锁
    2. RLock mylock = redissonClient.getLock("Mylock");
    3. try {
    4. //上锁
    5. mylock.lock(100, TimeUnit.SECONDS);
    6. dercount(id, time);
    7. } catch (Exception e) {
    8. e.printStackTrace();
    9. return false;
    10. } finally {
    11. mylock.unlock();
    12. }

    dercount(id, time);该方法如下:

    1. private void dercount(Long id, String time) {
    2. SeckillGoods goods = (SeckillGoods) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).get(id);
    3. //如果没有库存,则直接抛出异常
    4. if (goods == null || goods.getStockCount() <= 0) {
    5. throw new RuntimeException("已售罄!");
    6. }
    7. //减库存
    8. goods.setStockCount(goods.getStockCount() - 1);
    9. //存储到redis中
    10. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).put(id, goods);
    11. System.out.println("库存为:" + ((SeckillGoods) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).get(id)).getStockCount());
    12. }

    2.3 超卖控制测试

    使用jemtery来进行测试:库存为1 ,三个线程同时来买,售罄2个,下单一个。如下图:
    第15章 秒杀 - 图8
    第15章 秒杀 - 图9

    3 订单支付

    支付分析:
    第15章 秒杀 - 图10

    1. 1.如果是普通订单下单,则下单之后跳转都支付页面 传递 type表示从普通订单来
    2. 2.如果是秒杀订单下单,则下单之后跳转到支付页面 传递 type表示从秒杀订单来
    3. 3.支付页面接收到参数:订单号,金额,以及type标识 传递 给支付微服务 生成二维码链接 并传递数据到微信附件中参数中
    4. 4.用户扫描二维码支付成功,则通知给畅购支付微服务 获取到标识type
    5. 5.畅购支付微服务接收到通知,根据标识type 发送不同的消息队列中即可
    6. 6.多线程下单之后,并且需要顺便发送延时队列消息,30分钟后监听并从数据库中获取数据是否存在。存在则不用管,不存在则需要回复库存等操作。(可选)

    3.1 改造订单微服务

    分析:订单微服务创建订单之后需要返回订单的的ID和金额以及标记。
    (1)创建VO
    第15章 秒杀 - 图11

    1. public class OrderVo implements Serializable {
    2. private Integer type;
    3. private String orderId;
    4. private String totalFee;
    5. public OrderVo() {
    6. }
    7. public OrderVo(Integer type, String orderId, String totalFee) {
    8. this.type = type;
    9. this.orderId = orderId;
    10. this.totalFee = totalFee;
    11. }
    12. public Integer getType() {
    13. return type;
    14. }
    15. public void setType(Integer type) {
    16. this.type = type;
    17. }
    18. public String getOrderId() {
    19. return orderId;
    20. }
    21. public void setOrderId(String orderId) {
    22. this.orderId = orderId;
    23. }
    24. public String getTotalFee() {
    25. return totalFee;
    26. }
    27. public void setTotalFee(String totalFee) {
    28. this.totalFee = totalFee;
    29. }
    30. }

    (2) 修改controller

    1. @PostMapping("/add")
    2. public Result add(@RequestBody Order order) {
    3. String username = tokenDecode.getUsername();
    4. order.setUsername(username);
    5. OrderVo orderVo = orderService.add(order);
    6. //返回数据给前端 前端拿到数据进行跳转 给:订单号 金额 type类型
    7. return new Result(true, StatusCode.OK,"添加订单成功",orderVo);
    8. }

    (3)修改sevice: 返回Order对象即可

    1. public OrderVo add(Order order) {
    2. //1.添加订单
    3. //1.1生成订单的主键
    4. String orderId = idWorker.nextId() + "";
    5. order.setId(orderId);
    6. //直接从购物车(redis中)中获取 循环遍历算出来总金额
    7. List<OrderItem> values = redisTemplate.boundHashOps("Cart_" + order.getUsername()).values();
    8. Integer totalNum=0;
    9. Integer totalMoney=0;
    10. for (OrderItem orderItem : values) {
    11. Integer money = orderItem.getMoney();//小计
    12. Integer num = orderItem.getNum();//购买的数量
    13. totalNum+=num;
    14. totalMoney+=money;
    15. //2.添加订单选项
    16. //2.1 设置主键
    17. String orderItemId = idWorker.nextId() + "";
    18. orderItem.setId(orderItemId);
    19. //2.2 设置所属的订单的ID
    20. orderItem.setOrderId(order.getId());
    21. orderItemMapper.insertSelective(orderItem);
    22. //3.更新库存 update tb_sku set num=num-#{num} where id=#{id} and num>=#{num}
    23. //3.1 changgou-service-goods-api创建接口 添加注解和方法
    24. //3.2 changgou-service-goods controller【实现】接口
    25. //3.3 order微服务中添加依赖
    26. //3.4 开启feignclients
    27. //3.5 注入 调用
    28. skuFeign.decCount(orderItem.getSkuId(),orderItem.getNum());
    29. }
    30. //1.2 设置总数量
    31. order.setTotalNum(totalNum);
    32. //1.3 设置总金额
    33. order.setTotalMoney(totalMoney);
    34. order.setPayMoney(totalMoney);
    35. order.setPostFee(0);//免邮费
    36. //1.4 设置创建时间 和更新时间
    37. order.setCreateTime(new Date());
    38. order.setUpdateTime(order.getCreateTime());
    39. //1.5 设置订单所属的用户 controller已经设置
    40. order.setBuyerRate("0");//未评价
    41. order.setSourceType("1");//web
    42. order.setOrderStatus("0");
    43. order.setPayStatus("0");
    44. order.setConsignStatus("0");
    45. order.setIsDelete("0");//未删除
    46. orderMapper.insertSelective(order);
    47. //4.添加积分 feign update tb_user set points=points+#{points} where username=#{username}
    48. //4.1 changgou-service-user-api中创建接口和方法
    49. //4.2 changgou-service-user的controller中【实现】方法
    50. //4.3 加入依赖 启用feignclients 注入 调用
    51. userFeign.addPoints(order.getUsername(),10);
    52. //5.清空购物车
    53. redisTemplate.delete("Cart_" + order.getUsername());
    54. //金额 单位分
    55. OrderVo orderVo = new OrderVo(1,orderId,totalMoney.toString());
    56. return orderVo;
    57. }

    3.2 创建支付二维码

    下单成功后,会跳转到支付选择页面,在支付选择页面要显示订单编号和订单金额,所以我们需要在下单的时候,将订单金额以及订单编号信息存储到用户查询对象中。
    选择微信支付后,会跳转到微信支付页面,微信支付页面会根据用户名查看用户秒杀订单,并根据用户秒杀订单的ID创建预支付信息并获取二维码信息,展示给用户看,此时页面每3秒查询一次支付状态,如果支付成功,需要修改订单状态信息。

    3.2.1 回显订单号、金额

    下单后,进入支付选择页面,需要显示订单号和订单金额,所以需要在用户下单后将该数据传入到pay.html页面,所以查询订单状态的时候,需要将订单号和金额封装到查询的信息中
    (1)修改SeckillStatus

    1. public class SeckillStatus implements Serializable {
    2. //秒杀用户名
    3. private String username;
    4. //创建时间
    5. private Date createTime;
    6. //秒杀状态 1:排队中,2:抢单成功,3:支付超时,4:秒杀失败,5:支付完成
    7. private Integer status;
    8. //秒杀的商品ID
    9. private Long goodsId;
    10. //应付金额 分
    11. private Float money;
    12. //订单号
    13. private Long orderId;
    14. //类型 2
    15. private Integer type;
    16. public Integer getType() {
    17. return type;
    18. }
    19. public void setType(Integer type) {
    20. this.type = type;
    21. }
    22. //时间段
    23. private String time;
    24. public SeckillStatus() {
    25. }
    26. public SeckillStatus(String username, Date createTime, Integer status, Long goodsId, String time) {
    27. this.username = username;
    28. this.createTime = createTime;
    29. this.status = status;
    30. this.goodsId = goodsId;
    31. this.time = time;
    32. }
    33. public String getUsername() {
    34. return username;
    35. }
    36. public void setUsername(String username) {
    37. this.username = username;
    38. }
    39. public Date getCreateTime() {
    40. return createTime;
    41. }
    42. public void setCreateTime(Date createTime) {
    43. this.createTime = createTime;
    44. }
    45. public Integer getStatus() {
    46. return status;
    47. }
    48. public void setStatus(Integer status) {
    49. this.status = status;
    50. }
    51. public Long getGoodsId() {
    52. return goodsId;
    53. }
    54. public void setGoodsId(Long goodsId) {
    55. this.goodsId = goodsId;
    56. }
    57. public Float getMoney() {
    58. return money;
    59. }
    60. public void setMoney(Float money) {
    61. this.money = money;
    62. }
    63. public Long getOrderId() {
    64. return orderId;
    65. }
    66. public void setOrderId(Long orderId) {
    67. this.orderId = orderId;
    68. }
    69. public String getTime() {
    70. return time;
    71. }
    72. public void setTime(String time) {
    73. this.time = time;
    74. }
    75. }

    (2)修改controller
    第15章 秒杀 - 图12
    代码如下:

    1. @GetMapping("/query")
    2. public Result<SeckillStatus> queryStatus(){
    3. String username="zhangsan";
    4. SeckillStatus seckillStatus= seckillOrderService.query(username);
    5. //秒杀的订单
    6. if(seckillStatus!=null) {
    7. seckillStatus.setType(2);
    8. }
    9. return new Result<SeckillStatus>(true,StatusCode.OK,"查询状态成功,状态是什么你自己看",seckillStatus);
    10. }

    使用Postman测试,效果如下:
    http://localhost:18093/seckillOrder/query
    第15章 秒杀 - 图13

    3.2.2 创建二维码

    用户创建二维码,可以先查询用户的秒杀订单抢单信息,然后再发送请求到支付微服务中创建二维码,将订单编号以及订单对应的金额传递到支付微服务:/weixin/pay/create/native

    3.3 支付流程分析回顾

    第15章 秒杀 - 图14
    如上图,步骤分析如下:

    1. 1.如果是普通订单下单,则下单之后跳转都支付页面 传递 type表示从普通订单来
    2. 2.如果是秒杀订单下单,则下单之后跳转到支付页面 传递 type表示从秒杀订单来
    3. 3.支付页面接收到参数:订单号,金额,以及type标识 传递 给支付微服务 生成二维码链接 并传递数据到微信附件中参数中
    4. 4.用户扫描二维码支付成功,则通知给畅购支付微服务 获取到标识type
    5. 5.畅购支付微服务接收到通知,根据标识type 发送不同的消息队列中即可
    6. 6.多线程下单之后,并且需要顺便发送延时队列消息,30分钟后监听并从数据库中获取数据是否存在。存在则不用管,不存在则需要回复库存等操作。(可选)

    3.4 支付回调更新

    支付回调这一块代码已经实现了,但之前实现的是订单信息的回调数据发送给MQ,指定了对应的队列,不过现在需要实现的是秒杀信息发送给指定队列,所以之前的代码那块需要动态指定队列。

    3.4.1 支付回调队列指定

    关于指定队列如下:

    1. 1.创建支付二维码需要指定队列
    2. 2.回调地址回调的时候,获取支付二维码指定的队列,将支付信息发送到指定队列中

    在微信支付统一下单API中,有一个附加参数,如下:

    1. attach:附加数据,String(127),在查询API和支付通知中原样返回,可作为自定义参数使用。

    我们可以在创建二维码的时候,指定该参数,该参数用于指定回调支付信息的对应队列,每次回调的时候,会获取该参数,然后将回调信息发送到该参数对应的队列去。
    第15章 秒杀 - 图15

    3.4.1.1 改造支付方法

    修改支付微服务的WeixinPayController的createNative方法,代码如下:
    第15章 秒杀 - 图16
    代码如下

    1. @RequestMapping("/create/native")
    2. public Result<Map<String, String>> createNative(@RequestParam Map<String, String> parameter) {
    3. //todo 从令牌中获取用户的名称
    4. String username="zhangsan";
    5. parameter.put("username",username);
    6. Map<String, String> map = weixinPayService.createNative(parameter);
    7. return new Result<Map<String, String>>(true, StatusCode.OK, "创建二维码成功", map);
    8. }

    修改支付微服务的WeixinPayService的createNative方法,代码如下:
    第15章 秒杀 - 图17
    修改支付微服务的WeixinPayServiceImpl的createNative方法,代码如下:

    1. //模拟浏览器发送一个请求 获取响应(XML 装成map 返回)
    2. @Override
    3. public Map<String, String> createNative(Map<String, String> parameter) {
    4. try {
    5. //1.创建参数对象map 进行参数的组装
    6. Map<String, String> paramMap = new HashMap<>();
    7. paramMap.put("appid", wxProperties.getAppid());
    8. paramMap.put("mch_id", wxProperties.getPartner());
    9. paramMap.put("nonce_str", WXPayUtil.generateNonceStr());
    10. paramMap.put("body", "畅购的商品");
    11. paramMap.put("out_trade_no", parameter.get("out_trade_no"));
    12. paramMap.put("total_fee", parameter.get("total_fee"));// 重要 单位是分
    13. paramMap.put("spbill_create_ip", "127.0.0.1");
    14. paramMap.put("notify_url", wxProperties.getNotifyurl());
    15. paramMap.put("trade_type", "NATIVE");
    16. //==========================attachMap={username:zhangsan ,type:1}======================******************************==========
    17. Map<String,String> attachMap = new HashMap<>();
    18. attachMap.put("username",parameter.get("username"));
    19. attachMap.put("type",parameter.get("type"));
    20. //JSON
    21. paramMap.put("attach", JSON.toJSONString(attachMap));
    22. //签名 todo 将map 转换成XML 的是会自动添加签名
    23. //2.将map转成XML
    24. String xmlParam = WXPayUtil.generateSignedXml(paramMap, wxProperties.getPartnerkey());
    25. //3.使用httpclient 工具类 进行模拟浏览器发送请求 统一下单API
    26. HttpClient httpClient = new HttpClient("https://api.mch.weixin.qq.com/pay/unifiedorder");
    27. httpClient.setHttps(true);
    28. httpClient.setXmlParam(xmlParam);
    29. httpClient.post();
    30. //4.使用Httpclient 工具类 进行模拟浏览器接收响应
    31. String content = httpClient.getContent();
    32. System.out.println(content);
    33. //5.将xml 转成MAP (里面有code_url)
    34. Map<String, String> contentMap = WXPayUtil.xmlToMap(content);
    35. Map<String,String> resultMap = new HashMap<>();
    36. resultMap.put("total_fee", parameter.get("total_fee"));
    37. resultMap.put("out_trade_no",parameter.get("out_trade_no"));
    38. resultMap.put("code_url",contentMap.get("code_url"));
    39. //6.返回
    40. return resultMap;
    41. } catch (Exception e) {
    42. e.printStackTrace();
    43. }
    44. return null;
    45. }
    1. @ConfigurationProperties(prefix = "weixin")
    2. @Component
    3. public class WxProperties {
    4. private String appid;
    5. private String partner;
    6. private String partnerkey;
    7. private String notifyurl;
    8. public String getAppid() {
    9. return appid;
    10. }
    11. public void setAppid(String appid) {
    12. this.appid = appid;
    13. }
    14. public String getPartner() {
    15. return partner;
    16. }
    17. public void setPartner(String partner) {
    18. this.partner = partner;
    19. }
    20. public String getPartnerkey() {
    21. return partnerkey;
    22. }
    23. public void setPartnerkey(String partnerkey) {
    24. this.partnerkey = partnerkey;
    25. }
    26. public String getNotifyurl() {
    27. return notifyurl;
    28. }
    29. public void setNotifyurl(String notifyurl) {
    30. this.notifyurl = notifyurl;
    31. }
    32. }

    我们创建二维码的时候,需要将下面几个参数传递过去

    1. username:用户名,可以根据用户名查询用户排队信息
    2. out_trade_no:商户订单号,下单必须
    3. total_fee:支付金额,支付必须
    4. type:标识类型 1 标识普通订单 2 标识秒杀订单 。可以知道将支付信息发送到哪个队列

    修改WeixinPayApplication,添加对应队列以及对应交换机绑定,代码如下:

    1. @SpringBootApplication
    2. @EnableEurekaClient
    3. @EnableFeignClients
    4. public class WeixinPayApplication {
    5. public static void main(String[] args) {
    6. SpringApplication.run(WeixinPayApplication.class,args);
    7. }
    8. @Autowired
    9. private Environment env;
    10. //配置创建队列
    11. @Bean
    12. public Queue createQueue(){
    13. // queue.order
    14. return new Queue(environment.getProperty("mq.pay.queue.order"));
    15. }
    16. //创建交换机
    17. @Bean
    18. public DirectExchange createExchange(){
    19. // exchange.order
    20. return new DirectExchange(environment.getProperty("mq.pay.exchange.order"));
    21. }
    22. // 绑定队列到交换机
    23. @Bean
    24. public Binding binding(){
    25. // routing key : queue.order
    26. String property = environment.getProperty("mq.pay.routing.key");
    27. return BindingBuilder.bind(createQueue()).to(createExchange()).with(property);
    28. }
    29. //配置创建队列
    30. @Bean
    31. public Queue createSekillQueue(){
    32. // queue.order
    33. return new Queue(environment.getProperty("mq.pay.queue.seckillorder"));
    34. }
    35. //创建交换机
    36. @Bean
    37. public DirectExchange createSeckillExchange(){
    38. // exchange.order
    39. return new DirectExchange(environment.getProperty("mq.pay.exchange.seckillorder"));
    40. }
    41. // 绑定队列到交换机
    42. @Bean
    43. public Binding seckillbinding(){
    44. // routing key : queue.order
    45. String property = environment.getProperty("mq.pay.routing.seckillkey");
    46. return BindingBuilder.bind(createSekillQueue()).to(createSeckillExchange()).with(property);
    47. }
    48. }

    修改application.yml,添加如下配置

    1. #位置支付交换机和队列
    2. mq:
    3. pay:
    4. exchange:
    5. order: exchange.order
    6. seckillorder: exchange.seckillorder
    7. queue:
    8. order: queue.order
    9. seckillorder: queue.seckillorder
    10. routing:
    11. key: queue.order
    12. seckillkey: queue.seckillorder

    3.4.1.2 测试

    使用Postman创建二维码测试
    http://localhost:18092/weixin/pay/create/native?username=zhangsan&out_trade_no=1132510782836314121&total_fee=1&type=1
    第15章 秒杀 - 图18
    以后每次支付,都需要带上对应的参数,包括前面的订单支付。

    3.4.1.3 改造支付回调方法

    修改com.changgou.pay.controller.WeixinPayController的notifyUrl方法,获取自定义参数,并转成Map,获取from的标识地址,并将支付信息发送到绑定的queue中,代码如下:
    1:标识普通队列
    2:标识秒杀队列
    代码如下:

    1. @Autowired
    2. private Environment environment;
    3. /**
    4. * 用于接收微信传递过来的数据 通知地址 RequestMapping 中的值
    5. * 该链接是通过【统一下单API】中提交的参数notify_url设置 通知url必须为直接可访问的url,不能携带参数
    6. *
    7. * @return
    8. */
    9. @RequestMapping("/notify/url")
    10. public String notifyurl(HttpServletRequest request) {
    11. ByteArrayOutputStream outputStream =null;
    12. ServletInputStream inputStream=null;
    13. System.out.println("aaaaaaaa====");
    14. try {
    15. //1. 接收微信通知的数据流 读流
    16. inputStream = request.getInputStream();
    17. //2 将流转换成XML字符串 todo 写流
    18. outputStream = new ByteArrayOutputStream();
    19. byte[] buffer = new byte[1024];
    20. int len = 0;
    21. while ((len = inputStream.read(buffer)) != -1) {
    22. outputStream.write(buffer,0,len);
    23. }
    24. byte[] bytes = outputStream.toByteArray();
    25. String xml = new String(bytes, "utf-8");
    26. System.out.println(xml);
    27. System.out.println("=================================");
    28. Map<String, String> map = WXPayUtil.xmlToMap(xml);
    29. System.out.println(map);
    30. //3.发送消息(orderid,支付时间,微信支付订单号,....)给rabbitmq
    31. //判断 获取到attach的值 判断值是否是1 或者2 如果是1 标识 普通的订单 如果是2 标识 秒杀的订单
    32. //{username:zhangsan,type:1}
    33. String attach = map.get("attach");
    34. Map<String,String> attachMap = JSON.parseObject(attach, Map.class);
    35. String type = attachMap.get("type");
    36. // 根据不同的值发送到不同的队列中
    37. switch (type){
    38. case "1":// 普通订单
    39. rabbitTemplate.convertAndSend(exchange,routing, JSON.toJSONString(map));
    40. break;
    41. case "2": //秒杀订单
    42. rabbitTemplate.convertAndSend(
    43. environment.getProperty("mq.pay.exchange.seckillorder"),
    44. environment.getProperty("mq.pay.routing.seckillkey"),
    45. JSON.toJSONString(map));
    46. break;
    47. default:
    48. System.out.println("你给的都不对");
    49. break;
    50. }
    51. } catch (Exception e) {
    52. e.printStackTrace();
    53. }finally {
    54. try {
    55. if(outputStream!=null) {
    56. outputStream.close();
    57. }
    58. } catch (IOException e) {
    59. e.printStackTrace();
    60. }
    61. try {
    62. if(inputStream!=null) {
    63. inputStream.close();
    64. }
    65. } catch (IOException e) {
    66. e.printStackTrace();
    67. }
    68. }
    69. System.out.println("接收到了数据了");
    70. //4.返回给微信成功的响应
    71. return message;
    72. }

    3.4.2 支付状态监听

    支付状态通过回调地址发送给MQ之后,我们需要在秒杀系统中监听支付信息,如果用户已支付,则修改用户订单状态,如果支付失败,则直接删除订单,回滚库存。
    在秒杀微服务中添加依赖:

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>

    在秒杀工程中创建com.changgou.seckill.consumer.SeckillOrderPayMessageListener,实现监听消息,代码如下:

    1. @Component
    2. //监听某一个队列的消息
    3. @RabbitListener(queues = "queue.seckillorder")
    4. public class SeckillUpdateOrderListener {
    5. @Autowired
    6. private SeckillOrderMapper seckillOrderMapper;
    7. @Autowired
    8. private SeckillGoodsMapper seckillGoodsMapper;
    9. @Autowired
    10. private RedissonClient redissonClient;
    11. @Autowired
    12. private RedisTemplate redisTemplate;
    13. @RabbitHandler//指定该方法处理字符串类型的消息
    14. public void jieshouMessage(String msg) throws Exception {
    15. //1. 接收消息本身 转成MAP对象
    16. Map<String, String> map = JSON.parseObject(msg, Map.class);
    17. if (map != null) {
    18. //通信成功
    19. if (map.get("return_code").equalsIgnoreCase("SUCCESS")) {
    20. String attachString = map.get("attach");//{username:zhangsan,type:1}
    21. Map<String, String> attachMap = JSON.parseObject(attachString, Map.class);
    22. String username = attachMap.get("username");
    23. //业务成功 支付成功
    24. if (map.get("result_code").equalsIgnoreCase("SUCCESS")) {
    25. //2. 获取到支付的成功/失败的状态 如果是成功 将redis的订单存到数据库 并删除redis的订单
    26. //获取订单 存储到mysql
    27. SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).get(username);
    28. //支付成功
    29. seckillOrder.setStatus("1");
    30. String time_end = map.get("time_end");
    31. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
    32. Date date = simpleDateFormat.parse(time_end);
    33. seckillOrder.setPayTime(date);
    34. seckillOrder.setTransactionId(map.get("transaction_id"));
    35. seckillOrderMapper.insertSelective(seckillOrder);
    36. //删除redis中的订单
    37. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
    38. //清除重复排队标识
    39. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_QUEUE_REPEAT_KEY).delete(username);
    40. //清除用户的抢单信息
    41. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username);
    42. } else {
    43. //3. 如果 1.关闭微信支付订单 2.失败回滚库存 3.删除预订单 4.清除标记
    44. // todo 关闭微信支付订单
    45. // 获取秒杀商品对象 +1
    46. SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).get(username);
    47. if (seckillStatus != null) {
    48. SeckillGoods seckillGoods = (SeckillGoods) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime()).get(seckillStatus.getGoodsId());
    49. if(seckillGoods==null){
    50. //从数据库获取商品
    51. seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillStatus.getGoodsId());
    52. }
    53. //恢复库存 上锁
    54. RLock myLock = redissonClient.getLock("myLock");
    55. try {
    56. myLock.lock(20, TimeUnit.SECONDS);
    57. seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
    58. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime()).put(seckillStatus.getGoodsId(),seckillGoods);
    59. } catch (Exception e) {
    60. e.printStackTrace();
    61. }finally {
    62. myLock.unlock();
    63. }
    64. //删除redis中的订单
    65. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
    66. //清除重复排队标识
    67. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_QUEUE_REPEAT_KEY).delete(username);
    68. //清除用户的抢单信息
    69. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username);
    70. }
    71. }
    72. }
    73. }
    74. }
    75. }

    修改SeckillApplication创建对应的队列以及绑定对应交换机。

    1. @SpringBootApplication
    2. @EnableEurekaClient
    3. @EnableFeignClients
    4. @MapperScan(basePackages = {"com.changgou.seckill.dao"})
    5. @EnableScheduling
    6. @EnableAsync
    7. public class SeckillApplication {
    8. public static void main(String[] args) {
    9. SpringApplication.run(SeckillApplication.class,args);
    10. }
    11. @Bean
    12. public IdWorker idWorker(){
    13. return new IdWorker(1,1);
    14. }
    15. @Autowired
    16. private Environment environment;
    17. //配置创建队列
    18. @Bean
    19. public Queue createQueue(){
    20. // queue.order
    21. return new Queue(environment.getProperty("mq.pay.queue.order"));
    22. }
    23. //创建交换机
    24. @Bean
    25. public DirectExchange createExchange(){
    26. // exchange.order
    27. return new DirectExchange(environment.getProperty("mq.pay.exchange.order"));
    28. }
    29. // 绑定队列到交换机
    30. @Bean
    31. public Binding binding(){
    32. // routing key : queue.order
    33. String property = environment.getProperty("mq.pay.routing.key");
    34. return BindingBuilder.bind(createQueue()).to(createExchange()).with(property);
    35. }
    36. //配置创建队列
    37. @Bean
    38. public Queue createSekillQueue(){
    39. // queue.order
    40. return new Queue(environment.getProperty("mq.pay.queue.seckillorder"));
    41. }
    42. //创建交换机
    43. @Bean
    44. public DirectExchange createSeckillExchange(){
    45. // exchange.order
    46. return new DirectExchange(environment.getProperty("mq.pay.exchange.seckillorder"));
    47. }
    48. // 绑定队列到交换机
    49. @Bean
    50. public Binding seckillbinding(){
    51. // routing key : queue.order
    52. String property = environment.getProperty("mq.pay.routing.seckillkey");
    53. return BindingBuilder.bind(createSekillQueue()).to(createSeckillExchange()).with(property);
    54. }
    55. }

    修改application.yml文件,添加如下配置:

    1. #位置支付交换机和队列
    2. mq:
    3. pay:
    4. exchange:
    5. seckillorder: exchange.seckillorder
    6. queue:
    7. seckillorder: queue.seckillorder
    8. routing:
    9. seckillkey: queue.seckillorder

    3.4.4.3 测试

    使用Postman完整请求创建二维码下单测试一次。
    商品ID:1131814854034853888
    数量:49
    第15章 秒杀 - 图19
    下单:
    http://localhost:18093/seckill/order/add?id=1131814854034853888&time=2019052614
    下单后,Redis数据
    第15章 秒杀 - 图20
    下单查询:
    http://localhost:18093/seckill/order/query
    创建二维码:
    http://localhost:18092/weixin/pay/create/native?username=zhangsan&out_trade_no=1132530879663575040&total_fee=1&type=2
    秒杀抢单后,商品数量变化:
    第15章 秒杀 - 图21
    支付微服务回调方法控制台:

    1. {
    2. nonce_str=Mnv06RIaIwxzg3bA,
    3. code_url=weixin://wxpay/bizpayurl?pr=iTidd5h,
    4. appid=wx8397f8696b538317,
    5. sign=1436E43FBA8A171D79A9B78B61F0A7AB,
    6. trade_type=NATIVE,
    7. return_msg=OK,
    8. result_code=SUCCESS,
    9. mch_id=1473426802,
    10. return_code=SUCCESS,
    11. prepay_id=wx2614182102123859e3869a853739004200
    12. }
    13. {money=1, queue=queue.seckillorder, username=szitheima, outtradeno=1132530879663575040}

    订单微服务控制台输出

    1. {
    2. transaction_id=4200000289201905268232990890,
    3. nonce_str=a1aefe00a9bc4e8bb66a892dba38eb42,
    4. bank_type=CMB_CREDIT,
    5. openid=oNpSGwUp-194-Svy3JnVlAxtdLkc,
    6. sign=56679BC02CC82204635434817C1FCA46,
    7. fee_type=CNY,
    8. mch_id=1473426802,
    9. cash_fee=1,
    10. out_trade_no=1132530879663575040,
    11. appid=wx8397f8696b538317,
    12. total_fee=1,
    13. trade_type=NATIVE,
    14. result_code=SUCCESS,
    15. attach={
    16. "username": "szitheima",
    17. "outtradeno": "1132530879663575040",
    18. "money": "1",
    19. "queue": "queue.seckillorder"
    20. }, time_end=20190526141849, is_subscribe=N, return_code=SUCCESS
    21. }

    附录:
    支付微服务application.yml

    1. server:
    2. port: 18092
    3. spring:
    4. application:
    5. name: pay
    6. main:
    7. allow-bean-definition-overriding: true
    8. rabbitmq:
    9. host: 192.168.211.132
    10. port: 5672
    11. eureka:
    12. client:
    13. service-url:
    14. defaultZone: http://127.0.0.1:7001/eureka
    15. instance:
    16. prefer-ip-address: true
    17. feign:
    18. hystrix:
    19. enabled: true
    20. #hystrix 配置
    21. hystrix:
    22. command:
    23. default:
    24. execution:
    25. timeout:
    26. #如果enabled设置为false,则请求超时交给ribbon控制
    27. enabled: true
    28. isolation:
    29. strategy: SEMAPHORE
    30. #微信支付信息配置
    31. weixin:
    32. appid: wx8397f8696b538317
    33. partner: 1473426802
    34. partnerkey: T6m9iK73b0kn9g5v426MKfHQH7X8rKwb
    35. notifyurl: http://2cw4969042.wicp.vip:55644/weixin/pay/notify/url
    36. mq:
    37. pay:
    38. exchange:
    39. order: exchange.order
    40. seckillorder: exchange.seckillorder
    41. queue:
    42. order: queue.order
    43. seckillorder: queue.seckillorder
    44. routing:
    45. key: queue.order
    46. seckillkey: queue.seckillorder

    秒杀微服务application.yml配置

    1. server:
    2. port: 18093
    3. spring:
    4. application:
    5. name: seckill
    6. datasource:
    7. driver-class-name: com.mysql.jdbc.Driver
    8. url: jdbc:mysql://192.168.211.132:3306/changgou_seckill?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    9. username: root
    10. password: 123456
    11. rabbitmq:
    12. host: 192.168.211.132 #mq的服务器地址
    13. username: guest #账号
    14. password: guest #密码
    15. main:
    16. allow-bean-definition-overriding: true
    17. redis:
    18. host: 192.168.211.132
    19. port: 6379
    20. eureka:
    21. client:
    22. service-url:
    23. defaultZone: http://127.0.0.1:7001/eureka
    24. instance:
    25. prefer-ip-address: true
    26. feign:
    27. hystrix:
    28. enabled: true
    29. #hystrix 配置
    30. hystrix:
    31. command:
    32. default:
    33. execution:
    34. timeout:
    35. #如果enabled设置为false,则请求超时交给ribbon控制
    36. enabled: true
    37. isolation:
    38. thread:
    39. timeoutInMilliseconds: 10000
    40. strategy: SEMAPHORE
    41. mq:
    42. pay:
    43. exchange:
    44. seckillorder: exchange.seckillorder
    45. queue:
    46. seckillorder: queue.seckillorder
    47. routing:
    48. seckillkey: queue.seckillorder

    4 RabbitMQ延时消息回顾

    4.1 延时队列介绍

    延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
    应用场景如下:网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)
    Rabbitmq实现延时队列一般而言有两种形式:
    第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]
    第二种方式:利用rabbitmq中的插件x-delay-message

    4.2 TTL DLX实现延时队列

    4.2.1 TTL DLX介绍

    TTL
    RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
    Dead Letter Exchanges(DLX)
    RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
    x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
    x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
    第15章 秒杀 - 图22

    4.2.3 DLX延时队列实现

    4.2.3.1 创建工程

    创建springboot_rabbitmq_delay工程,并引入相关依赖

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <project xmlns="http://maven.apache.org/POM/4.0.0"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    5. <parent>
    6. <artifactId>changgou_parent</artifactId>
    7. <groupId>com.changgou</groupId>
    8. <version>1.0-SNAPSHOT</version>
    9. </parent>
    10. <modelVersion>4.0.0</modelVersion>
    11. <artifactId>springboot_rabbitmq_delay</artifactId>
    12. <dependencies>
    13. <!--starter-web-->
    14. <dependency>
    15. <groupId>org.springframework.boot</groupId>
    16. <artifactId>spring-boot-starter-web</artifactId>
    17. </dependency>
    18. <!--加入ampq-->
    19. <dependency>
    20. <groupId>org.springframework.boot</groupId>
    21. <artifactId>spring-boot-starter-amqp</artifactId>
    22. </dependency>
    23. <!--测试-->
    24. <dependency>
    25. <groupId>org.springframework.boot</groupId>
    26. <artifactId>spring-boot-starter-test</artifactId>
    27. </dependency>
    28. </dependencies>
    29. </project>

    application.yml配置

    1. spring:
    2. application:
    3. name: springboot-demo
    4. rabbitmq:
    5. host: 127.0.0.1
    6. port: 5672
    7. password: guest
    8. username: guest

    4.2.3.2 队列创建

    创建2个队列,用于接收消息的叫延时队列queue.message.delay,用于转发消息的队列叫queue.message,同时创建一个交换机,代码如下:

    1. @Configuration
    2. public class QueueConfig {
    3. /** 短信发送队列 */
    4. public static final String QUEUE_MESSAGE = "queue.message";
    5. /** 交换机 */
    6. public static final String DLX_EXCHANGE = "dlx.exchange";
    7. /** 短信发送队列 延迟缓冲(按消息) */
    8. public static final String QUEUE_MESSAGE_DELAY = "queue.message.delay";
    9. /**
    10. * 短信发送队列
    11. * @return
    12. */
    13. @Bean
    14. public Queue messageQueue() {
    15. return new Queue(QUEUE_MESSAGE, true);
    16. }
    17. /**
    18. * 短信发送队列
    19. * @return
    20. */
    21. @Bean
    22. public Queue delayMessageQueue() {
    23. return QueueBuilder.durable(QUEUE_MESSAGE_DELAY)
    24. .withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // 消息超时进入死信队列,绑定死信队列交换机
    25. .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGE) // 绑定指定的routing-key
    26. .build();
    27. }
    28. /***
    29. * 创建交换机
    30. * @return
    31. */
    32. @Bean
    33. public DirectExchange directExchange(){
    34. return new DirectExchange(DLX_EXCHANGE);
    35. }
    36. /***
    37. * 交换机与队列绑定
    38. * @param messageQueue
    39. * @param directExchange
    40. * @return
    41. */
    42. @Bean
    43. public Binding basicBinding(Queue messageQueue, DirectExchange directExchange) {
    44. return BindingBuilder.bind(messageQueue)
    45. .to(directExchange)
    46. .with(QUEUE_MESSAGE);
    47. }
    48. }

    4.2.3.3 消息监听

    创建MessageListener用于监听消息,代码如下:

    1. @Component
    2. @RabbitListener(queues = QueueConfig.QUEUE_MESSAGE)
    3. public class MessageListener {
    4. /***
    5. * 监听消息
    6. * @param msg
    7. */
    8. @RabbitHandler
    9. public void msg(@Payload Object msg){
    10. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    11. System.out.println("当前时间:"+dateFormat.format(new Date()));
    12. System.out.println("收到信息:"+msg);
    13. }
    14. }

    4.2.3.4 创建启动类
    1. @SpringBootApplication
    2. @EnableRabbit
    3. public class SpringRabbitMQApplication {
    4. public static void main(String[] args) {
    5. SpringApplication.run(SpringRabbitMQApplication.class,args);
    6. }
    7. }

    4.2.3.5 测试
    1. @SpringBootTest
    2. @RunWith(SpringRunner.class)
    3. public class RabbitMQTest {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. /***
    7. * 发送消息
    8. */
    9. @Test
    10. public void sendMessage() throws InterruptedException, IOException {
    11. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    12. System.out.println("发送当前时间:"+dateFormat.format(new Date()));
    13. Map<String,String> message = new HashMap<>();
    14. message.put("name","szitheima");
    15. rabbitTemplate.convertAndSend(QueueConfig.QUEUE_MESSAGE_DELAY, message, new MessagePostProcessor() {
    16. @Override
    17. public Message postProcessMessage(Message message) throws AmqpException {
    18. message.getMessageProperties().setExpiration("10000");
    19. return message;
    20. }
    21. });
    22. System.in.read();
    23. }
    24. }

    其中message.getMessageProperties().setExpiration(“10000”);设置消息超时时间,超时后,会将消息转入到另外一个队列。
    测试效果如下:
    第15章 秒杀 - 图23

    5 库存回滚(作业)

    5.1 秒杀流程

    如上图,步骤分析如下:

    1. 1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
    2. 2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
    3. 3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单吃句话到MySQL,如果没完成,清理排队信息回滚库存
    4. 4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,微信系统会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。

    延时队列实现订单关闭回滚库存:

    1. 1.创建一个过期队列 Queue1
    2. 2.接收消息的队列 Queue2
    3. 3.中转交换机
    4. 4.监听Queue2
    5. 1)SeckillStatus->检查Redis中是否有订单信息
    6. 2)如果有订单信息,调用删除订单回滚库存->[需要先关闭微信支付]
    7. 3)如果关闭订单时,用于已支付,修改订单状态即可
    8. 4)如果关闭订单时,发生了别的错误,记录日志,人工处理

    5.2 关闭支付

    用户如果半个小时没有支付,我们会关闭支付订单,但在关闭之前,需要先关闭微信支付,防止中途用户支付。
    修改支付微服务的WeixinPayService,添加关闭支付方法,代码如下:

    1. /***
    2. * 关闭支付
    3. * @param orderId
    4. * @return
    5. */
    6. Map<String,String> closePay(Long orderId) throws Exception;

    修改WeixinPayServiceImpl,实现关闭微信支付方法,代码如下:

    1. /***
    2. * 关闭微信支付
    3. * @param orderId
    4. * @return
    5. * @throws Exception
    6. */
    7. @Override
    8. public Map<String, String> closePay(Long orderId) throws Exception {
    9. //参数设置
    10. Map<String,String> paramMap = new HashMap<String,String>();
    11. paramMap.put("appid",appid); //应用ID
    12. paramMap.put("mch_id",partner); //商户编号
    13. paramMap.put("nonce_str",WXPayUtil.generateNonceStr());//随机字符
    14. paramMap.put("out_trade_no",String.valueOf(orderId)); //商家的唯一编号
    15. //将Map数据转成XML字符
    16. String xmlParam = WXPayUtil.generateSignedXml(paramMap,partnerkey);
    17. //确定url
    18. String url = "https://api.mch.weixin.qq.com/pay/closeorder";
    19. //发送请求
    20. HttpClient httpClient = new HttpClient(url);
    21. //https
    22. httpClient.setHttps(true);
    23. //提交参数
    24. httpClient.setXmlParam(xmlParam);
    25. //提交
    26. httpClient.post();
    27. //获取返回数据
    28. String content = httpClient.getContent();
    29. //将返回数据解析成Map
    30. return WXPayUtil.xmlToMap(content);
    31. }

    5.3 关闭订单回滚库存

    5.3.1 配置延时队列

    在application.yml文件中引入队列信息配置,如下:

    1. #位置支付交换机和队列
    2. mq:
    3. pay:
    4. exchange:
    5. order: exchange.order
    6. queue:
    7. order: queue.order
    8. seckillorder: queue.seckillorder
    9. seckillordertimer: queue.seckillordertimer
    10. seckillordertimerdelay: queue.seckillordertimerdelay
    11. routing:
    12. orderkey: queue.order
    13. seckillorderkey: queue.seckillorder

    配置队列与交换机,在SeckillApplication中添加如下方法

    1. /**
    2. * 到期数据队列
    3. * @return
    4. */
    5. @Bean
    6. public Queue seckillOrderTimerQueue() {
    7. return new Queue(env.getProperty("mq.pay.queue.seckillordertimer"), true);
    8. }
    9. /**
    10. * 超时数据队列
    11. * @return
    12. */
    13. @Bean
    14. public Queue delaySeckillOrderTimerQueue() {
    15. return QueueBuilder.durable(env.getProperty("mq.pay.queue.seckillordertimerdelay"))
    16. .withArgument("x-dead-letter-exchange", env.getProperty("mq.pay.exchange.order")) // 消息超时进入死信队列,绑定死信队列交换机
    17. .withArgument("x-dead-letter-routing-key", env.getProperty("mq.pay.queue.seckillordertimer")) // 绑定指定的routing-key
    18. .build();
    19. }
    20. /***
    21. * 交换机与队列绑定
    22. * @return
    23. */
    24. @Bean
    25. public Binding basicBinding() {
    26. return BindingBuilder.bind(seckillOrderTimerQueue())
    27. .to(basicExchange())
    28. .with(env.getProperty("mq.pay.queue.seckillordertimer"));
    29. }

    5.3.2 发送延时消息

    修改MultiThreadingCreateOrder,添加如下方法:

    1. /***
    2. * 发送延时消息到RabbitMQ中
    3. * @param seckillStatus
    4. */
    5. public void sendTimerMessage(SeckillStatus seckillStatus){
    6. rabbitTemplate.convertAndSend(env.getProperty("mq.pay.queue.seckillordertimerdelay"), (Object) JSON.toJSONString(seckillStatus), new MessagePostProcessor() {
    7. @Override
    8. public Message postProcessMessage(Message message) throws AmqpException {
    9. message.getMessageProperties().setExpiration("10000");
    10. return message;
    11. }
    12. });
    13. }

    在createOrder方法中调用上面方法,如下代码:

    1. //发送延时消息到MQ中
    2. sendTimerMessage(seckillStatus);

    5.3.3 库存回滚

    创建SeckillOrderDelayMessageListener实现监听消息,并回滚库存,代码如下:

    1. @Component
    2. @RabbitListener(queues = "${mq.pay.queue.seckillordertimer}")
    3. public class SeckillOrderDelayMessageListener {
    4. @Autowired
    5. private RedisTemplate redisTemplate;
    6. @Autowired
    7. private SeckillOrderService seckillOrderService;
    8. @Autowired
    9. private WeixinPayFeign weixinPayFeign;
    10. /***
    11. * 读取消息
    12. * 判断Redis中是否存在对应的订单
    13. * 如果存在,则关闭支付,再关闭订单
    14. * @param message
    15. */
    16. @RabbitHandler
    17. public void consumeMessage(@Payload String message){
    18. //读取消息
    19. SeckillStatus seckillStatus = JSON.parseObject(message,SeckillStatus.class);
    20. //获取Redis中订单信息
    21. String username = seckillStatus.getUsername();
    22. SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
    23. //如果Redis中有订单信息,说明用户未支付
    24. if(seckillOrder!=null){
    25. System.out.println("准备回滚---"+seckillStatus);
    26. //关闭支付
    27. Result closeResult = weixinPayFeign.closePay(seckillStatus.getOrderId());
    28. Map<String,String> closeMap = (Map<String, String>) closeResult.getData();
    29. if(closeMap!=null && closeMap.get("return_code").equalsIgnoreCase("success") &&
    30. closeMap.get("result_code").equalsIgnoreCase("success") ){
    31. //关闭订单
    32. seckillOrderService.closeOrder(username);
    33. }
    34. }
    35. }
    36. }