第14章 秒杀

学习目标

  • 防止秒杀重复排队
    1. 重复排队:一个人抢购商品,如果没有支付,不允许重复排队抢购
  • 并发超卖问题解决
    1. 1个商品卖给多个人:1商品多订单
  • 秒杀订单支付
    1. 秒杀支付:支付流程需要调整
  • 超时支付订单库存回滚
    1. 1.RabbitMQ延时队列
    2. 2.利用延时队列实现支付订单的监听,根据订单支付状况进行订单数据库回滚

1 防止秒杀重复排队

用户每次抢单的时候,一旦排队,我们设置一个自增值,让该值的初始值为1,每次进入抢单的时候,对它进行递增,如果值>1,则表明已经排队,不允许重复排队,如果重复排队,则对外抛出异常,并抛出异常信息100表示已经正在排队。

1.1 后台排队记录

修改SeckillOrderServiceImpl的add方法,新增递增值判断是否排队中,代码如下:

第15天 - 图1

上图代码如下:

  1. //Long userQueueCount = redisTemplate.boundHashOps("UserQueueCount").increment(username, 1);
  2. Long userQueueCount = redisTemplate.boundHashOps(SystemConstants.SEC_KILL_QUEUE_REPEAT_KEY).increment(username, 1);
  3. if (userQueueCount > 1) {
  4. throw new RuntimeException("重复排队了");
  5. }

2 并发超卖问题解决

超卖问题,这里是指多人抢购同一商品的时候,多人同时判断是否有库存,如果只剩一个,则都会判断有库存,此时会导致超卖现象产生,也就是一个商品下了多个订单的现象。

使用分布式锁来解决:

(1)在多线程的方法中修改之前的样子如下:

第15天 - 图2

(2)修改之后如下:

第15天 - 图3

第15天 - 图4

2.1 思路分析

  1. 1)首先我们进行超卖问题的解决在于 本质是多个线程同时竞争资源去修改资源,所以只要在修改资源的时候,该资源只能只有一个线程修改即可。
  2. 2)先针对库存的判断及减库存的操作 作为一个原子性操作 进行加锁操作即可。

如下图:

第15天 - 图5

  1. 这里我们采用分布式锁:使用分布式锁的方式有很多我们决定采用REDIS来实现分布式锁。这里提供一套官方使用的红锁来实现。

2.2 代码实现

(1)添加依赖:

  1. <dependency>
  2. <groupId>org.redisson</groupId>
  3. <artifactId>redisson-spring-boot-starter</artifactId>
  4. <version>3.11.6</version>
  5. </dependency>

(2)修改多线程下单的代码

第15天 - 图6

如图的代码如下:

  1. @Async
  2. public void createOrder() {
  3. try {
  4. System.out.println("准备执行...." + Thread.currentThread().getName());
  5. Thread.sleep(10000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. //从队列中获取排队信息
  10. SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundListOps("SeckillOrderQueue").rightPop();
  11. if (seckillStatus != null) {
  12. //时间区间
  13. String time = seckillStatus.getTime();// "2019052510";
  14. //用户登录名
  15. String username = seckillStatus.getUsername();// "szitheima";
  16. //用户抢购商品
  17. Long id = seckillStatus.getGoodsId();//1131814847898587136L;
  18. //获取商品数据
  19. SeckillGoods goods = (SeckillGoods) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).get(id);
  20. //如果没有库存,则直接抛出异常
  21. /*if (goods == null || goods.getStockCount() <= 0) {
  22. throw new RuntimeException("已售罄!");
  23. }*/
  24. //如果有库存,则创建秒杀商品订单
  25. SeckillOrder seckillOrder = new SeckillOrder();
  26. seckillOrder.setId(idWorker.nextId());
  27. seckillOrder.setSeckillId(id);
  28. seckillOrder.setMoney(goods.getCostPrice());
  29. seckillOrder.setUserId(username);
  30. seckillOrder.setCreateTime(new Date());
  31. seckillOrder.setStatus("0");
  32. //将秒杀订单存入到Redis中
  33. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).put(username, seckillOrder);
  34. //库存减少
  35. //goods.setStockCount(goods.getStockCount() - 1);
  36. //判断当前商品是否还有库存
  37. if (goods.getStockCount() <= 0) {
  38. //并且将商品数据同步到MySQL中
  39. seckillGoodsMapper.updateByPrimaryKeySelective(goods);
  40. //如果没有库存,则清空Redis缓存中该商品
  41. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).delete(id);
  42. } else {
  43. //如果有库存,则直数据重置到Reids中
  44. // redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).put(id, goods);
  45. }
  46. //抢单成功,更新抢单状态,排队->等待支付
  47. seckillStatus.setStatus(2);
  48. seckillStatus.setOrderId(seckillOrder.getId());
  49. seckillStatus.setMoney(Float.valueOf(seckillOrder.getMoney()));
  50. //redisTemplate.boundHashOps("UserQueueStatus").put(username,seckillStatus);
  51. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).put(username, seckillStatus);
  52. }
  53. }

(3)设置调用多线程的入口的service方法add中添加如下代码,在这里我们需要减库存

第15天 - 图7

代码如下:

  1. //避免超卖--> 使用分布式锁
  2. RLock mylock = redissonClient.getLock("Mylock");
  3. try {
  4. //上锁
  5. mylock.lock(100, TimeUnit.SECONDS);
  6. dercount(id, time);
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. return false;
  10. } finally {
  11. mylock.unlock();
  12. }

dercount(id, time);该方法如下:

  1. private void dercount(Long id, String time) {
  2. SeckillGoods goods = (SeckillGoods) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).get(id);
  3. //如果没有库存,则直接抛出异常
  4. if (goods == null || goods.getStockCount() <= 0) {
  5. throw new RuntimeException("已售罄!");
  6. }
  7. //减库存
  8. goods.setStockCount(goods.getStockCount() - 1);
  9. //存储到redis中
  10. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).put(id, goods);
  11. System.out.println("库存为:" + ((SeckillGoods) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + time).get(id)).getStockCount());
  12. }

2.3 超卖控制测试

使用jemtery来进行测试:库存为1 ,三个线程同时来买,售罄2个,下单一个。如下图:

第15天 - 图8

第15天 - 图9

3 订单支付

支付分析:

第15天 - 图10

  1. 1.如果是普通订单下单,则下单之后跳转都支付页面 传递 type表示从普通订单来
  2. 2.如果是秒杀订单下单,则下单之后跳转到支付页面 传递 type表示从秒杀订单来
  3. 3.支付页面接收到参数:订单号,金额,以及type标识 传递 给支付微服务 生成二维码链接 并传递数据到微信附件中参数中
  4. 4.用户扫描二维码支付成功,则通知给畅购支付微服务 获取到标识type
  5. 5.畅购支付微服务接收到通知,根据标识type 发送不同的消息队列中即可
  6. 6.多线程下单之后,并且需要顺便发送延时队列消息,30分钟后监听并从数据库中获取数据是否存在。存在则不用管,不存在则需要回复库存等操作。(可选)

3.1 改造订单微服务

分析:订单微服务创建订单之后需要返回订单的的ID和金额以及标记。

(1)创建VO

第15天 - 图11

  1. public class OrderVo implements Serializable {
  2. private Integer type;
  3. private String orderId;
  4. private String totalFee;
  5. public OrderVo() {
  6. }
  7. public OrderVo(Integer type, String orderId, String totalFee) {
  8. this.type = type;
  9. this.orderId = orderId;
  10. this.totalFee = totalFee;
  11. }
  12. public Integer getType() {
  13. return type;
  14. }
  15. public void setType(Integer type) {
  16. this.type = type;
  17. }
  18. public String getOrderId() {
  19. return orderId;
  20. }
  21. public void setOrderId(String orderId) {
  22. this.orderId = orderId;
  23. }
  24. public String getTotalFee() {
  25. return totalFee;
  26. }
  27. public void setTotalFee(String totalFee) {
  28. this.totalFee = totalFee;
  29. }
  30. }

(2) 修改controller

  1. @PostMapping("/add")
  2. public Result add(@RequestBody Order order) {
  3. String username = tokenDecode.getUsername();
  4. order.setUsername(username);
  5. OrderVo orderVo = orderService.add(order);
  6. //返回数据给前端 前端拿到数据进行跳转 给:订单号 金额 type类型
  7. return new Result(true, StatusCode.OK,"添加订单成功",orderVo);
  8. }

(3)修改sevice: 返回Order对象即可

  1. public OrderVo add(Order order) {
  2. //1.添加订单
  3. //1.1生成订单的主键
  4. String orderId = idWorker.nextId() + "";
  5. order.setId(orderId);
  6. //直接从购物车(redis中)中获取 循环遍历算出来总金额
  7. List<OrderItem> values = redisTemplate.boundHashOps("Cart_" + order.getUsername()).values();
  8. Integer totalNum=0;
  9. Integer totalMoney=0;
  10. for (OrderItem orderItem : values) {
  11. Integer money = orderItem.getMoney();//小计
  12. Integer num = orderItem.getNum();//购买的数量
  13. totalNum+=num;
  14. totalMoney+=money;
  15. //2.添加订单选项
  16. //2.1 设置主键
  17. String orderItemId = idWorker.nextId() + "";
  18. orderItem.setId(orderItemId);
  19. //2.2 设置所属的订单的ID
  20. orderItem.setOrderId(order.getId());
  21. orderItemMapper.insertSelective(orderItem);
  22. //3.更新库存 update tb_sku set num=num-#{num} where id=#{id} and num>=#{num}
  23. //3.1 changgou-service-goods-api创建接口 添加注解和方法
  24. //3.2 changgou-service-goods controller【实现】接口
  25. //3.3 order微服务中添加依赖
  26. //3.4 开启feignclients
  27. //3.5 注入 调用
  28. skuFeign.decCount(orderItem.getSkuId(),orderItem.getNum());
  29. }
  30. //1.2 设置总数量
  31. order.setTotalNum(totalNum);
  32. //1.3 设置总金额
  33. order.setTotalMoney(totalMoney);
  34. order.setPayMoney(totalMoney);
  35. order.setPostFee(0);//免邮费
  36. //1.4 设置创建时间 和更新时间
  37. order.setCreateTime(new Date());
  38. order.setUpdateTime(order.getCreateTime());
  39. //1.5 设置订单所属的用户 controller已经设置
  40. order.setBuyerRate("0");//未评价
  41. order.setSourceType("1");//web
  42. order.setOrderStatus("0");
  43. order.setPayStatus("0");
  44. order.setConsignStatus("0");
  45. order.setIsDelete("0");//未删除
  46. orderMapper.insertSelective(order);
  47. //4.添加积分 feign update tb_user set points=points+#{points} where username=#{username}
  48. //4.1 changgou-service-user-api中创建接口和方法
  49. //4.2 changgou-service-user的controller中【实现】方法
  50. //4.3 加入依赖 启用feignclients 注入 调用
  51. userFeign.addPoints(order.getUsername(),10);
  52. //5.清空购物车
  53. redisTemplate.delete("Cart_" + order.getUsername());
  54. //金额 单位分
  55. OrderVo orderVo = new OrderVo(1,orderId,totalMoney.toString());
  56. return orderVo;
  57. }

3.2 创建支付二维码

下单成功后,会跳转到支付选择页面,在支付选择页面要显示订单编号和订单金额,所以我们需要在下单的时候,将订单金额以及订单编号信息存储到用户查询对象中。

选择微信支付后,会跳转到微信支付页面,微信支付页面会根据用户名查看用户秒杀订单,并根据用户秒杀订单的ID创建预支付信息并获取二维码信息,展示给用户看,此时页面每3秒查询一次支付状态,如果支付成功,需要修改订单状态信息。

3.2.1 回显订单号、金额

下单后,进入支付选择页面,需要显示订单号和订单金额,所以需要在用户下单后将该数据传入到pay.html页面,所以查询订单状态的时候,需要将订单号和金额封装到查询的信息中

(1)修改SeckillStatus

  1. public class SeckillStatus implements Serializable {
  2. //秒杀用户名
  3. private String username;
  4. //创建时间
  5. private Date createTime;
  6. //秒杀状态 1:排队中,2:抢单成功,3:支付超时,4:秒杀失败,5:支付完成
  7. private Integer status;
  8. //秒杀的商品ID
  9. private Long goodsId;
  10. //应付金额 分
  11. private Float money;
  12. //订单号
  13. private Long orderId;
  14. //类型 2
  15. private Integer type;
  16. public Integer getType() {
  17. return type;
  18. }
  19. public void setType(Integer type) {
  20. this.type = type;
  21. }
  22. //时间段
  23. private String time;
  24. public SeckillStatus() {
  25. }
  26. public SeckillStatus(String username, Date createTime, Integer status, Long goodsId, String time) {
  27. this.username = username;
  28. this.createTime = createTime;
  29. this.status = status;
  30. this.goodsId = goodsId;
  31. this.time = time;
  32. }
  33. public String getUsername() {
  34. return username;
  35. }
  36. public void setUsername(String username) {
  37. this.username = username;
  38. }
  39. public Date getCreateTime() {
  40. return createTime;
  41. }
  42. public void setCreateTime(Date createTime) {
  43. this.createTime = createTime;
  44. }
  45. public Integer getStatus() {
  46. return status;
  47. }
  48. public void setStatus(Integer status) {
  49. this.status = status;
  50. }
  51. public Long getGoodsId() {
  52. return goodsId;
  53. }
  54. public void setGoodsId(Long goodsId) {
  55. this.goodsId = goodsId;
  56. }
  57. public Float getMoney() {
  58. return money;
  59. }
  60. public void setMoney(Float money) {
  61. this.money = money;
  62. }
  63. public Long getOrderId() {
  64. return orderId;
  65. }
  66. public void setOrderId(Long orderId) {
  67. this.orderId = orderId;
  68. }
  69. public String getTime() {
  70. return time;
  71. }
  72. public void setTime(String time) {
  73. this.time = time;
  74. }
  75. }

(2)修改controller

第15天 - 图12

代码如下:

  1. @GetMapping("/query")
  2. public Result<SeckillStatus> queryStatus(){
  3. String username="zhangsan";
  4. SeckillStatus seckillStatus= seckillOrderService.query(username);
  5. //秒杀的订单
  6. if(seckillStatus!=null) {
  7. seckillStatus.setType(2);
  8. }
  9. return new Result<SeckillStatus>(true,StatusCode.OK,"查询状态成功,状态是什么你自己看",seckillStatus);
  10. }

使用Postman测试,效果如下:

http://localhost:18093/seckillOrder/query

第15天 - 图13

3.2.2 创建二维码

用户创建二维码,可以先查询用户的秒杀订单抢单信息,然后再发送请求到支付微服务中创建二维码,将订单编号以及订单对应的金额传递到支付微服务:/weixin/pay/create/native

3.3 支付流程分析回顾

第15天 - 图14

如上图,步骤分析如下:

  1. 1.如果是普通订单下单,则下单之后跳转都支付页面 传递 type表示从普通订单来
  2. 2.如果是秒杀订单下单,则下单之后跳转到支付页面 传递 type表示从秒杀订单来
  3. 3.支付页面接收到参数:订单号,金额,以及type标识 传递 给支付微服务 生成二维码链接 并传递数据到微信附件中参数中
  4. 4.用户扫描二维码支付成功,则通知给畅购支付微服务 获取到标识type
  5. 5.畅购支付微服务接收到通知,根据标识type 发送不同的消息队列中即可
  6. 6.多线程下单之后,并且需要顺便发送延时队列消息,30分钟后监听并从数据库中获取数据是否存在。存在则不用管,不存在则需要回复库存等操作。(可选)

3.4 支付回调更新

支付回调这一块代码已经实现了,但之前实现的是订单信息的回调数据发送给MQ,指定了对应的队列,不过现在需要实现的是秒杀信息发送给指定队列,所以之前的代码那块需要动态指定队列。

3.4.1 支付回调队列指定

关于指定队列如下:

  1. 1.创建支付二维码需要指定队列
  2. 2.回调地址回调的时候,获取支付二维码指定的队列,将支付信息发送到指定队列中

在微信支付统一下单API中,有一个附加参数,如下:

  1. attach:附加数据,String(127),在查询API和支付通知中原样返回,可作为自定义参数使用。

我们可以在创建二维码的时候,指定该参数,该参数用于指定回调支付信息的对应队列,每次回调的时候,会获取该参数,然后将回调信息发送到该参数对应的队列去。

第15天 - 图15

3.4.1.1 改造支付方法

修改支付微服务的WeixinPayController的createNative方法,代码如下:

第15天 - 图16

代码如下

  1. @RequestMapping("/create/native")
  2. public Result<Map<String, String>> createNative(@RequestParam Map<String, String> parameter) {
  3. //todo 从令牌中获取用户的名称
  4. String username="zhangsan";
  5. parameter.put("username",username);
  6. Map<String, String> map = weixinPayService.createNative(parameter);
  7. return new Result<Map<String, String>>(true, StatusCode.OK, "创建二维码成功", map);
  8. }

修改支付微服务的WeixinPayService的createNative方法,代码如下:

第15天 - 图17

修改支付微服务的WeixinPayServiceImpl的createNative方法,代码如下:

  1. //模拟浏览器发送一个请求 获取响应(XML 装成map 返回)
  2. @Override
  3. public Map<String, String> createNative(Map<String, String> parameter) {
  4. try {
  5. //1.创建参数对象map 进行参数的组装
  6. Map<String, String> paramMap = new HashMap<>();
  7. paramMap.put("appid", wxProperties.getAppid());
  8. paramMap.put("mch_id", wxProperties.getPartner());
  9. paramMap.put("nonce_str", WXPayUtil.generateNonceStr());
  10. paramMap.put("body", "畅购的商品");
  11. paramMap.put("out_trade_no", parameter.get("out_trade_no"));
  12. paramMap.put("total_fee", parameter.get("total_fee"));// 重要 单位是分
  13. paramMap.put("spbill_create_ip", "127.0.0.1");
  14. paramMap.put("notify_url", wxProperties.getNotifyurl());
  15. paramMap.put("trade_type", "NATIVE");
  16. //==========================attachMap={username:zhangsan ,type:1}======================******************************==========
  17. Map<String,String> attachMap = new HashMap<>();
  18. attachMap.put("username",parameter.get("username"));
  19. attachMap.put("type",parameter.get("type"));
  20. //JSON
  21. paramMap.put("attach", JSON.toJSONString(attachMap));
  22. //签名 todo 将map 转换成XML 的是会自动添加签名
  23. //2.将map转成XML
  24. String xmlParam = WXPayUtil.generateSignedXml(paramMap, wxProperties.getPartnerkey());
  25. //3.使用httpclient 工具类 进行模拟浏览器发送请求 统一下单API
  26. HttpClient httpClient = new HttpClient("https://api.mch.weixin.qq.com/pay/unifiedorder");
  27. httpClient.setHttps(true);
  28. httpClient.setXmlParam(xmlParam);
  29. httpClient.post();
  30. //4.使用Httpclient 工具类 进行模拟浏览器接收响应
  31. String content = httpClient.getContent();
  32. System.out.println(content);
  33. //5.将xml 转成MAP (里面有code_url)
  34. Map<String, String> contentMap = WXPayUtil.xmlToMap(content);
  35. Map<String,String> resultMap = new HashMap<>();
  36. resultMap.put("total_fee", parameter.get("total_fee"));
  37. resultMap.put("out_trade_no",parameter.get("out_trade_no"));
  38. resultMap.put("code_url",contentMap.get("code_url"));
  39. //6.返回
  40. return resultMap;
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. }
  44. return null;
  45. }
  1. @ConfigurationProperties(prefix = "weixin")
  2. @Component
  3. public class WxProperties {
  4. private String appid;
  5. private String partner;
  6. private String partnerkey;
  7. private String notifyurl;
  8. public String getAppid() {
  9. return appid;
  10. }
  11. public void setAppid(String appid) {
  12. this.appid = appid;
  13. }
  14. public String getPartner() {
  15. return partner;
  16. }
  17. public void setPartner(String partner) {
  18. this.partner = partner;
  19. }
  20. public String getPartnerkey() {
  21. return partnerkey;
  22. }
  23. public void setPartnerkey(String partnerkey) {
  24. this.partnerkey = partnerkey;
  25. }
  26. public String getNotifyurl() {
  27. return notifyurl;
  28. }
  29. public void setNotifyurl(String notifyurl) {
  30. this.notifyurl = notifyurl;
  31. }
  32. }

我们创建二维码的时候,需要将下面几个参数传递过去

  1. username:用户名,可以根据用户名查询用户排队信息
  2. out_trade_no:商户订单号,下单必须
  3. total_fee:支付金额,支付必须
  4. type:标识类型 1 标识普通订单 2 标识秒杀订单 。可以知道将支付信息发送到哪个队列

修改WeixinPayApplication,添加对应队列以及对应交换机绑定,代码如下:

  1. @SpringBootApplication
  2. @EnableEurekaClient
  3. @EnableFeignClients
  4. public class WeixinPayApplication {
  5. public static void main(String[] args) {
  6. SpringApplication.run(WeixinPayApplication.class,args);
  7. }
  8. @Autowired
  9. private Environment env;
  10. //配置创建队列
  11. @Bean
  12. public Queue createQueue(){
  13. // queue.order
  14. return new Queue(environment.getProperty("mq.pay.queue.order"));
  15. }
  16. //创建交换机
  17. @Bean
  18. public DirectExchange createExchange(){
  19. // exchange.order
  20. return new DirectExchange(environment.getProperty("mq.pay.exchange.order"));
  21. }
  22. // 绑定队列到交换机
  23. @Bean
  24. public Binding binding(){
  25. // routing key : queue.order
  26. String property = environment.getProperty("mq.pay.routing.key");
  27. return BindingBuilder.bind(createQueue()).to(createExchange()).with(property);
  28. }
  29. //配置创建队列
  30. @Bean
  31. public Queue createSekillQueue(){
  32. // queue.order
  33. return new Queue(environment.getProperty("mq.pay.queue.seckillorder"));
  34. }
  35. //创建交换机
  36. @Bean
  37. public DirectExchange createSeckillExchange(){
  38. // exchange.order
  39. return new DirectExchange(environment.getProperty("mq.pay.exchange.seckillorder"));
  40. }
  41. // 绑定队列到交换机
  42. @Bean
  43. public Binding seckillbinding(){
  44. // routing key : queue.order
  45. String property = environment.getProperty("mq.pay.routing.seckillkey");
  46. return BindingBuilder.bind(createSekillQueue()).to(createSeckillExchange()).with(property);
  47. }
  48. }

修改application.yml,添加如下配置

  1. #位置支付交换机和队列
  2. mq:
  3. pay:
  4. exchange:
  5. order: exchange.order
  6. seckillorder: exchange.seckillorder
  7. queue:
  8. order: queue.order
  9. seckillorder: queue.seckillorder
  10. routing:
  11. key: queue.order
  12. seckillkey: queue.seckillorder

3.4.1.2 测试

使用Postman创建二维码测试

http://localhost:18092/weixin/pay/create/native?username=zhangsan&out_trade_no=1132510782836314121&total_fee=1&type=1

第15天 - 图18

以后每次支付,都需要带上对应的参数,包括前面的订单支付。

3.4.1.3 改造支付回调方法

修改com.changgou.pay.controller.WeixinPayController的notifyUrl方法,获取自定义参数,并转成Map,获取from的标识地址,并将支付信息发送到绑定的queue中,代码如下:

1:标识普通队列

2:标识秒杀队列

代码如下:

  1. @Autowired
  2. private Environment environment;
  3. /**
  4. * 用于接收微信传递过来的数据 通知地址 RequestMapping 中的值
  5. * 该链接是通过【统一下单API】中提交的参数notify_url设置 通知url必须为直接可访问的url,不能携带参数
  6. *
  7. * @return
  8. */
  9. @RequestMapping("/notify/url")
  10. public String notifyurl(HttpServletRequest request) {
  11. ByteArrayOutputStream outputStream =null;
  12. ServletInputStream inputStream=null;
  13. System.out.println("aaaaaaaa====");
  14. try {
  15. //1. 接收微信通知的数据流 读流
  16. inputStream = request.getInputStream();
  17. //2 将流转换成XML字符串 todo 写流
  18. outputStream = new ByteArrayOutputStream();
  19. byte[] buffer = new byte[1024];
  20. int len = 0;
  21. while ((len = inputStream.read(buffer)) != -1) {
  22. outputStream.write(buffer,0,len);
  23. }
  24. byte[] bytes = outputStream.toByteArray();
  25. String xml = new String(bytes, "utf-8");
  26. System.out.println(xml);
  27. System.out.println("=================================");
  28. Map<String, String> map = WXPayUtil.xmlToMap(xml);
  29. System.out.println(map);
  30. //3.发送消息(orderid,支付时间,微信支付订单号,....)给rabbitmq
  31. //判断 获取到attach的值 判断值是否是1 或者2 如果是1 标识 普通的订单 如果是2 标识 秒杀的订单
  32. //{username:zhangsan,type:1}
  33. String attach = map.get("attach");
  34. Map<String,String> attachMap = JSON.parseObject(attach, Map.class);
  35. String type = attachMap.get("type");
  36. // 根据不同的值发送到不同的队列中
  37. switch (type){
  38. case "1":// 普通订单
  39. rabbitTemplate.convertAndSend(exchange,routing, JSON.toJSONString(map));
  40. break;
  41. case "2": //秒杀订单
  42. rabbitTemplate.convertAndSend(
  43. environment.getProperty("mq.pay.exchange.seckillorder"),
  44. environment.getProperty("mq.pay.routing.seckillkey"),
  45. JSON.toJSONString(map));
  46. break;
  47. default:
  48. System.out.println("你给的都不对");
  49. break;
  50. }
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. }finally {
  54. try {
  55. if(outputStream!=null) {
  56. outputStream.close();
  57. }
  58. } catch (IOException e) {
  59. e.printStackTrace();
  60. }
  61. try {
  62. if(inputStream!=null) {
  63. inputStream.close();
  64. }
  65. } catch (IOException e) {
  66. e.printStackTrace();
  67. }
  68. }
  69. System.out.println("接收到了数据了");
  70. //4.返回给微信成功的响应
  71. return message;
  72. }

3.4.2 支付状态监听

支付状态通过回调地址发送给MQ之后,我们需要在秒杀系统中监听支付信息,如果用户已支付,则修改用户订单状态,如果支付失败,则直接删除订单,回滚库存。

在秒杀微服务中添加依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

在秒杀工程中创建com.changgou.seckill.consumer.SeckillOrderPayMessageListener,实现监听消息,代码如下:

  1. @Component
  2. //监听某一个队列的消息
  3. @RabbitListener(queues = "queue.seckillorder")
  4. public class SeckillUpdateOrderListener {
  5. @Autowired
  6. private SeckillOrderMapper seckillOrderMapper;
  7. @Autowired
  8. private SeckillGoodsMapper seckillGoodsMapper;
  9. @Autowired
  10. private RedissonClient redissonClient;
  11. @Autowired
  12. private RedisTemplate redisTemplate;
  13. @RabbitHandler//指定该方法处理字符串类型的消息
  14. public void jieshouMessage(String msg) throws Exception {
  15. //1. 接收消息本身 转成MAP对象
  16. Map<String, String> map = JSON.parseObject(msg, Map.class);
  17. if (map != null) {
  18. //通信成功
  19. if (map.get("return_code").equalsIgnoreCase("SUCCESS")) {
  20. String attachString = map.get("attach");//{username:zhangsan,type:1}
  21. Map<String, String> attachMap = JSON.parseObject(attachString, Map.class);
  22. String username = attachMap.get("username");
  23. //业务成功 支付成功
  24. if (map.get("result_code").equalsIgnoreCase("SUCCESS")) {
  25. //2. 获取到支付的成功/失败的状态 如果是成功 将redis的订单存到数据库 并删除redis的订单
  26. //获取订单 存储到mysql
  27. SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).get(username);
  28. //支付成功
  29. seckillOrder.setStatus("1");
  30. String time_end = map.get("time_end");
  31. SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
  32. Date date = simpleDateFormat.parse(time_end);
  33. seckillOrder.setPayTime(date);
  34. seckillOrder.setTransactionId(map.get("transaction_id"));
  35. seckillOrderMapper.insertSelective(seckillOrder);
  36. //删除redis中的订单
  37. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
  38. //清除重复排队标识
  39. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_QUEUE_REPEAT_KEY).delete(username);
  40. //清除用户的抢单信息
  41. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username);
  42. } else {
  43. //3. 如果 1.关闭微信支付订单 2.失败回滚库存 3.删除预订单 4.清除标记
  44. // todo 关闭微信支付订单
  45. // 获取秒杀商品对象 +1
  46. SeckillStatus seckillStatus = (SeckillStatus) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).get(username);
  47. if (seckillStatus != null) {
  48. SeckillGoods seckillGoods = (SeckillGoods) redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime()).get(seckillStatus.getGoodsId());
  49. if(seckillGoods==null){
  50. //从数据库获取商品
  51. seckillGoods = seckillGoodsMapper.selectByPrimaryKey(seckillStatus.getGoodsId());
  52. }
  53. //恢复库存 上锁
  54. RLock myLock = redissonClient.getLock("myLock");
  55. try {
  56. myLock.lock(20, TimeUnit.SECONDS);
  57. seckillGoods.setStockCount(seckillGoods.getStockCount()+1);
  58. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_GOODS_PREFIX + seckillStatus.getTime()).put(seckillStatus.getGoodsId(),seckillGoods);
  59. } catch (Exception e) {
  60. e.printStackTrace();
  61. }finally {
  62. myLock.unlock();
  63. }
  64. //删除redis中的订单
  65. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_ORDER_KEY).delete(username);
  66. //清除重复排队标识
  67. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_QUEUE_REPEAT_KEY).delete(username);
  68. //清除用户的抢单信息
  69. redisTemplate.boundHashOps(SystemConstants.SEC_KILL_USER_STATUS_KEY).delete(username);
  70. }
  71. }
  72. }
  73. }
  74. }
  75. }

修改SeckillApplication创建对应的队列以及绑定对应交换机。

  1. @SpringBootApplication
  2. @EnableEurekaClient
  3. @EnableFeignClients
  4. @MapperScan(basePackages = {"com.changgou.seckill.dao"})
  5. @EnableScheduling
  6. @EnableAsync
  7. public class SeckillApplication {
  8. public static void main(String[] args) {
  9. SpringApplication.run(SeckillApplication.class,args);
  10. }
  11. @Bean
  12. public IdWorker idWorker(){
  13. return new IdWorker(1,1);
  14. }
  15. @Autowired
  16. private Environment environment;
  17. //配置创建队列
  18. @Bean
  19. public Queue createQueue(){
  20. // queue.order
  21. return new Queue(environment.getProperty("mq.pay.queue.order"));
  22. }
  23. //创建交换机
  24. @Bean
  25. public DirectExchange createExchange(){
  26. // exchange.order
  27. return new DirectExchange(environment.getProperty("mq.pay.exchange.order"));
  28. }
  29. // 绑定队列到交换机
  30. @Bean
  31. public Binding binding(){
  32. // routing key : queue.order
  33. String property = environment.getProperty("mq.pay.routing.key");
  34. return BindingBuilder.bind(createQueue()).to(createExchange()).with(property);
  35. }
  36. //配置创建队列
  37. @Bean
  38. public Queue createSekillQueue(){
  39. // queue.order
  40. return new Queue(environment.getProperty("mq.pay.queue.seckillorder"));
  41. }
  42. //创建交换机
  43. @Bean
  44. public DirectExchange createSeckillExchange(){
  45. // exchange.order
  46. return new DirectExchange(environment.getProperty("mq.pay.exchange.seckillorder"));
  47. }
  48. // 绑定队列到交换机
  49. @Bean
  50. public Binding seckillbinding(){
  51. // routing key : queue.order
  52. String property = environment.getProperty("mq.pay.routing.seckillkey");
  53. return BindingBuilder.bind(createSekillQueue()).to(createSeckillExchange()).with(property);
  54. }
  55. }

修改application.yml文件,添加如下配置:

  1. #位置支付交换机和队列
  2. mq:
  3. pay:
  4. exchange:
  5. seckillorder: exchange.seckillorder
  6. queue:
  7. seckillorder: queue.seckillorder
  8. routing:
  9. seckillkey: queue.seckillorder

3.4.4.3 测试

使用Postman完整请求创建二维码下单测试一次。

商品ID:1131814854034853888

数量:49

第15天 - 图19

下单:

http://localhost:18093/seckill/order/add?id=1131814854034853888&time=2019052614

下单后,Redis数据

第15天 - 图20

下单查询:

http://localhost:18093/seckill/order/query

创建二维码:

http://localhost:18092/weixin/pay/create/native?username=zhangsan&out_trade_no=1132530879663575040&total_fee=1&type=2

秒杀抢单后,商品数量变化:

第15天 - 图21

支付微服务回调方法控制台:

  1. {
  2. nonce_str=Mnv06RIaIwxzg3bA,
  3. code_url=weixin://wxpay/bizpayurl?pr=iTidd5h,
  4. appid=wx8397f8696b538317,
  5. sign=1436E43FBA8A171D79A9B78B61F0A7AB,
  6. trade_type=NATIVE,
  7. return_msg=OK,
  8. result_code=SUCCESS,
  9. mch_id=1473426802,
  10. return_code=SUCCESS,
  11. prepay_id=wx2614182102123859e3869a853739004200
  12. }
  13. {money=1, queue=queue.seckillorder, username=szitheima, outtradeno=1132530879663575040}

订单微服务控制台输出

  1. {
  2. transaction_id=4200000289201905268232990890,
  3. nonce_str=a1aefe00a9bc4e8bb66a892dba38eb42,
  4. bank_type=CMB_CREDIT,
  5. openid=oNpSGwUp-194-Svy3JnVlAxtdLkc,
  6. sign=56679BC02CC82204635434817C1FCA46,
  7. fee_type=CNY,
  8. mch_id=1473426802,
  9. cash_fee=1,
  10. out_trade_no=1132530879663575040,
  11. appid=wx8397f8696b538317,
  12. total_fee=1,
  13. trade_type=NATIVE,
  14. result_code=SUCCESS,
  15. attach={
  16. "username": "szitheima",
  17. "outtradeno": "1132530879663575040",
  18. "money": "1",
  19. "queue": "queue.seckillorder"
  20. }, time_end=20190526141849, is_subscribe=N, return_code=SUCCESS
  21. }

附录:

支付微服务application.yml

  1. server:
  2. port: 18092
  3. spring:
  4. application:
  5. name: pay
  6. main:
  7. allow-bean-definition-overriding: true
  8. rabbitmq:
  9. host: 192.168.211.132
  10. port: 5672
  11. eureka:
  12. client:
  13. service-url:
  14. defaultZone: http://127.0.0.1:7001/eureka
  15. instance:
  16. prefer-ip-address: true
  17. feign:
  18. hystrix:
  19. enabled: true
  20. #hystrix 配置
  21. hystrix:
  22. command:
  23. default:
  24. execution:
  25. timeout:
  26. #如果enabled设置为false,则请求超时交给ribbon控制
  27. enabled: true
  28. isolation:
  29. strategy: SEMAPHORE
  30. #微信支付信息配置
  31. weixin:
  32. appid: wx8397f8696b538317
  33. partner: 1473426802
  34. partnerkey: T6m9iK73b0kn9g5v426MKfHQH7X8rKwb
  35. notifyurl: http://2cw4969042.wicp.vip:55644/weixin/pay/notify/url
  36. mq:
  37. pay:
  38. exchange:
  39. order: exchange.order
  40. seckillorder: exchange.seckillorder
  41. queue:
  42. order: queue.order
  43. seckillorder: queue.seckillorder
  44. routing:
  45. key: queue.order
  46. seckillkey: queue.seckillorder

秒杀微服务application.yml配置

  1. server:
  2. port: 18093
  3. spring:
  4. application:
  5. name: seckill
  6. datasource:
  7. driver-class-name: com.mysql.jdbc.Driver
  8. url: jdbc:mysql://192.168.211.132:3306/changgou_seckill?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
  9. username: root
  10. password: 123456
  11. rabbitmq:
  12. host: 192.168.211.132 #mq的服务器地址
  13. username: guest #账号
  14. password: guest #密码
  15. main:
  16. allow-bean-definition-overriding: true
  17. redis:
  18. host: 192.168.211.132
  19. port: 6379
  20. eureka:
  21. client:
  22. service-url:
  23. defaultZone: http://127.0.0.1:7001/eureka
  24. instance:
  25. prefer-ip-address: true
  26. feign:
  27. hystrix:
  28. enabled: true
  29. #hystrix 配置
  30. hystrix:
  31. command:
  32. default:
  33. execution:
  34. timeout:
  35. #如果enabled设置为false,则请求超时交给ribbon控制
  36. enabled: true
  37. isolation:
  38. thread:
  39. timeoutInMilliseconds: 10000
  40. strategy: SEMAPHORE
  41. mq:
  42. pay:
  43. exchange:
  44. seckillorder: exchange.seckillorder
  45. queue:
  46. seckillorder: queue.seckillorder
  47. routing:
  48. seckillkey: queue.seckillorder

4 RabbitMQ延时消息回顾

4.1 延时队列介绍

延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。

应用场景如下:网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)

Rabbitmq实现延时队列一般而言有两种形式:
第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]

第二种方式:利用rabbitmq中的插件x-delay-message

4.2 TTL DLX实现延时队列

4.2.1 TTL DLX介绍

TTL
RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange

x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

第15天 - 图22

4.2.3 DLX延时队列实现

4.2.3.1 创建工程

创建springboot_rabbitmq_delay工程,并引入相关依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>changgou_parent</artifactId>
  7. <groupId>com.changgou</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>springboot_rabbitmq_delay</artifactId>
  12. <dependencies>
  13. <!--starter-web-->
  14. <dependency>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-web</artifactId>
  17. </dependency>
  18. <!--加入ampq-->
  19. <dependency>
  20. <groupId>org.springframework.boot</groupId>
  21. <artifactId>spring-boot-starter-amqp</artifactId>
  22. </dependency>
  23. <!--测试-->
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. </dependency>
  28. </dependencies>
  29. </project>

application.yml配置

  1. spring:
  2. application:
  3. name: springboot-demo
  4. rabbitmq:
  5. host: 127.0.0.1
  6. port: 5672
  7. password: guest
  8. username: guest

4.2.3.2 队列创建

创建2个队列,用于接收消息的叫延时队列queue.message.delay,用于转发消息的队列叫queue.message,同时创建一个交换机,代码如下:

  1. @Configuration
  2. public class QueueConfig {
  3. /** 短信发送队列 */
  4. public static final String QUEUE_MESSAGE = "queue.message";
  5. /** 交换机 */
  6. public static final String DLX_EXCHANGE = "dlx.exchange";
  7. /** 短信发送队列 延迟缓冲(按消息) */
  8. public static final String QUEUE_MESSAGE_DELAY = "queue.message.delay";
  9. /**
  10. * 短信发送队列
  11. * @return
  12. */
  13. @Bean
  14. public Queue messageQueue() {
  15. return new Queue(QUEUE_MESSAGE, true);
  16. }
  17. /**
  18. * 短信发送队列
  19. * @return
  20. */
  21. @Bean
  22. public Queue delayMessageQueue() {
  23. return QueueBuilder.durable(QUEUE_MESSAGE_DELAY)
  24. .withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // 消息超时进入死信队列,绑定死信队列交换机
  25. .withArgument("x-dead-letter-routing-key", QUEUE_MESSAGE) // 绑定指定的routing-key
  26. .build();
  27. }
  28. /***
  29. * 创建交换机
  30. * @return
  31. */
  32. @Bean
  33. public DirectExchange directExchange(){
  34. return new DirectExchange(DLX_EXCHANGE);
  35. }
  36. /***
  37. * 交换机与队列绑定
  38. * @param messageQueue
  39. * @param directExchange
  40. * @return
  41. */
  42. @Bean
  43. public Binding basicBinding(Queue messageQueue, DirectExchange directExchange) {
  44. return BindingBuilder.bind(messageQueue)
  45. .to(directExchange)
  46. .with(QUEUE_MESSAGE);
  47. }
  48. }

4.2.3.3 消息监听

创建MessageListener用于监听消息,代码如下:

  1. @Component
  2. @RabbitListener(queues = QueueConfig.QUEUE_MESSAGE)
  3. public class MessageListener {
  4. /***
  5. * 监听消息
  6. * @param msg
  7. */
  8. @RabbitHandler
  9. public void msg(@Payload Object msg){
  10. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  11. System.out.println("当前时间:"+dateFormat.format(new Date()));
  12. System.out.println("收到信息:"+msg);
  13. }
  14. }

4.2.3.4 创建启动类
  1. @SpringBootApplication
  2. @EnableRabbit
  3. public class SpringRabbitMQApplication {
  4. public static void main(String[] args) {
  5. SpringApplication.run(SpringRabbitMQApplication.class,args);
  6. }
  7. }

4.2.3.5 测试
  1. @SpringBootTest
  2. @RunWith(SpringRunner.class)
  3. public class RabbitMQTest {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. /***
  7. * 发送消息
  8. */
  9. @Test
  10. public void sendMessage() throws InterruptedException, IOException {
  11. SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  12. System.out.println("发送当前时间:"+dateFormat.format(new Date()));
  13. Map<String,String> message = new HashMap<>();
  14. message.put("name","szitheima");
  15. rabbitTemplate.convertAndSend(QueueConfig.QUEUE_MESSAGE_DELAY, message, new MessagePostProcessor() {
  16. @Override
  17. public Message postProcessMessage(Message message) throws AmqpException {
  18. message.getMessageProperties().setExpiration("10000");
  19. return message;
  20. }
  21. });
  22. System.in.read();
  23. }
  24. }

其中message.getMessageProperties().setExpiration(“10000”);设置消息超时时间,超时后,会将消息转入到另外一个队列。

测试效果如下:

第15天 - 图23

5 库存回滚(作业)

5.1 秒杀流程

如上图,步骤分析如下:

  1. 1.用户抢单,经过秒杀系统实现抢单,下单后会将向MQ发送一个延时队列消息,包含抢单信息,延时半小时后才能监听到
  2. 2.秒杀系统同时启用延时消息监听,一旦监听到订单抢单信息,判断Redis缓存中是否存在订单信息,如果存在,则回滚
  3. 3.秒杀系统还启动支付回调信息监听,如果支付完成,则将订单吃句话到MySQL,如果没完成,清理排队信息回滚库存
  4. 4.每次秒杀下单后调用支付系统,创建二维码,如果用户支付成功了,微信系统会将支付信息发送给支付系统指定的回调地址,支付系统收到信息后,将信息发送给MQ,第3个步骤就可以监听到消息了。

延时队列实现订单关闭回滚库存:

  1. 1.创建一个过期队列 Queue1
  2. 2.接收消息的队列 Queue2
  3. 3.中转交换机
  4. 4.监听Queue2
  5. 1)SeckillStatus->检查Redis中是否有订单信息
  6. 2)如果有订单信息,调用删除订单回滚库存->[需要先关闭微信支付]
  7. 3)如果关闭订单时,用于已支付,修改订单状态即可
  8. 4)如果关闭订单时,发生了别的错误,记录日志,人工处理

5.2 关闭支付

用户如果半个小时没有支付,我们会关闭支付订单,但在关闭之前,需要先关闭微信支付,防止中途用户支付。

修改支付微服务的WeixinPayService,添加关闭支付方法,代码如下:

  1. /***
  2. * 关闭支付
  3. * @param orderId
  4. * @return
  5. */
  6. Map<String,String> closePay(Long orderId) throws Exception;

修改WeixinPayServiceImpl,实现关闭微信支付方法,代码如下:

  1. /***
  2. * 关闭微信支付
  3. * @param orderId
  4. * @return
  5. * @throws Exception
  6. */
  7. @Override
  8. public Map<String, String> closePay(Long orderId) throws Exception {
  9. //参数设置
  10. Map<String,String> paramMap = new HashMap<String,String>();
  11. paramMap.put("appid",appid); //应用ID
  12. paramMap.put("mch_id",partner); //商户编号
  13. paramMap.put("nonce_str",WXPayUtil.generateNonceStr());//随机字符
  14. paramMap.put("out_trade_no",String.valueOf(orderId)); //商家的唯一编号
  15. //将Map数据转成XML字符
  16. String xmlParam = WXPayUtil.generateSignedXml(paramMap,partnerkey);
  17. //确定url
  18. String url = "https://api.mch.weixin.qq.com/pay/closeorder";
  19. //发送请求
  20. HttpClient httpClient = new HttpClient(url);
  21. //https
  22. httpClient.setHttps(true);
  23. //提交参数
  24. httpClient.setXmlParam(xmlParam);
  25. //提交
  26. httpClient.post();
  27. //获取返回数据
  28. String content = httpClient.getContent();
  29. //将返回数据解析成Map
  30. return WXPayUtil.xmlToMap(content);
  31. }

5.3 关闭订单回滚库存

5.3.1 配置延时队列

在application.yml文件中引入队列信息配置,如下:

  1. #位置支付交换机和队列
  2. mq:
  3. pay:
  4. exchange:
  5. order: exchange.order
  6. queue:
  7. order: queue.order
  8. seckillorder: queue.seckillorder
  9. seckillordertimer: queue.seckillordertimer
  10. seckillordertimerdelay: queue.seckillordertimerdelay
  11. routing:
  12. orderkey: queue.order
  13. seckillorderkey: queue.seckillorder

配置队列与交换机,在SeckillApplication中添加如下方法

  1. /**
  2. * 到期数据队列
  3. * @return
  4. */
  5. @Bean
  6. public Queue seckillOrderTimerQueue() {
  7. return new Queue(env.getProperty("mq.pay.queue.seckillordertimer"), true);
  8. }
  9. /**
  10. * 超时数据队列
  11. * @return
  12. */
  13. @Bean
  14. public Queue delaySeckillOrderTimerQueue() {
  15. return QueueBuilder.durable(env.getProperty("mq.pay.queue.seckillordertimerdelay"))
  16. .withArgument("x-dead-letter-exchange", env.getProperty("mq.pay.exchange.order")) // 消息超时进入死信队列,绑定死信队列交换机
  17. .withArgument("x-dead-letter-routing-key", env.getProperty("mq.pay.queue.seckillordertimer")) // 绑定指定的routing-key
  18. .build();
  19. }
  20. /***
  21. * 交换机与队列绑定
  22. * @return
  23. */
  24. @Bean
  25. public Binding basicBinding() {
  26. return BindingBuilder.bind(seckillOrderTimerQueue())
  27. .to(basicExchange())
  28. .with(env.getProperty("mq.pay.queue.seckillordertimer"));
  29. }

5.3.2 发送延时消息

修改MultiThreadingCreateOrder,添加如下方法:

  1. /***
  2. * 发送延时消息到RabbitMQ中
  3. * @param seckillStatus
  4. */
  5. public void sendTimerMessage(SeckillStatus seckillStatus){
  6. rabbitTemplate.convertAndSend(env.getProperty("mq.pay.queue.seckillordertimerdelay"), (Object) JSON.toJSONString(seckillStatus), new MessagePostProcessor() {
  7. @Override
  8. public Message postProcessMessage(Message message) throws AmqpException {
  9. message.getMessageProperties().setExpiration("10000");
  10. return message;
  11. }
  12. });
  13. }

在createOrder方法中调用上面方法,如下代码:

  1. //发送延时消息到MQ中
  2. sendTimerMessage(seckillStatus);

5.3.3 库存回滚

创建SeckillOrderDelayMessageListener实现监听消息,并回滚库存,代码如下:

  1. @Component
  2. @RabbitListener(queues = "${mq.pay.queue.seckillordertimer}")
  3. public class SeckillOrderDelayMessageListener {
  4. @Autowired
  5. private RedisTemplate redisTemplate;
  6. @Autowired
  7. private SeckillOrderService seckillOrderService;
  8. @Autowired
  9. private WeixinPayFeign weixinPayFeign;
  10. /***
  11. * 读取消息
  12. * 判断Redis中是否存在对应的订单
  13. * 如果存在,则关闭支付,再关闭订单
  14. * @param message
  15. */
  16. @RabbitHandler
  17. public void consumeMessage(@Payload String message){
  18. //读取消息
  19. SeckillStatus seckillStatus = JSON.parseObject(message,SeckillStatus.class);
  20. //获取Redis中订单信息
  21. String username = seckillStatus.getUsername();
  22. SeckillOrder seckillOrder = (SeckillOrder) redisTemplate.boundHashOps("SeckillOrder").get(username);
  23. //如果Redis中有订单信息,说明用户未支付
  24. if(seckillOrder!=null){
  25. System.out.println("准备回滚---"+seckillStatus);
  26. //关闭支付
  27. Result closeResult = weixinPayFeign.closePay(seckillStatus.getOrderId());
  28. Map<String,String> closeMap = (Map<String, String>) closeResult.getData();
  29. if(closeMap!=null && closeMap.get("return_code").equalsIgnoreCase("success") &&
  30. closeMap.get("result_code").equalsIgnoreCase("success") ){
  31. //关闭订单
  32. seckillOrderService.closeOrder(username);
  33. }
  34. }
  35. }
  36. }