接口幂等性

  • 提交订单按钮如何防止重复提交?
  • 表单录入页如何防止重复提交?
  • 微服务接口,客户端重试时,会对业务数据产生影响吗?
  • 在系统中,一个接口运行多次,与运行一次的效果是一致的
  • 什么情况下需要幂等性
    • 重复提交、接口重试、前端操作抖动等
  • 业务场景:
    • 用户多次点击提交订单,后台应只生成一个订单
    • 支付时,由于网络问题重发,应该只扣一次钱
  • 并不是所有的接口都要求幂等性,要根据业务而定

幂等性设计的核心思想

  • 保证幂等性的策略有哪些?
  • 幂等性的核心思想:
    • 通过唯一的业务单号保证幂等
  • 非并发情况下,查询业务单号有没有操作过,没有则执行操作
  • 并发的情况下,整个操作过程加锁

select、update、delete、insert和混合操作的接口幂等性

  • select操作:不会对业务数据有影响,天然幂等
  • delete操作:第一次已经删除,第二次也不会有影响
  • update操作:更新操作传入数据版本号,通过乐观锁实现幂等性
  • insert操作:没有唯一业务单号,使用Token保证幂等
  • 混合操作:找到操作的唯一业务单号,有则可使用分布式锁,没有可以通过Token保证幂等

delete操作幂等性

  • 根据唯一业务号删除
  • 第一次删除时,已将数据删除
  • 第二次再次执行时,由于找不到记录,所以返回的结果是0,对业务数据没有影响。可在删除前进行数据的查询
  • 删除操作没有唯一业务号,则要看具体的业务需求
  • 例如:删除所有审核未通过的商品
  • 第一次执行,将所有未通过审核的商品删除
  • 在第二次执行前,又有新的商品未审核通过
  • 执行第二次删除操作,将新的未审核通过的商品要不要删除?
  • 根据业务需求而定
    1. public int delUser(Integer userId) {
    2. User user = userMapper.selectByPrimaryKey(userId);
    3. if (user!=null){
    4. log.info("用户存在,用户为:"+userId);
    5. return userMapper.deleteByPrimaryKey(userId);
    6. }
    7. log.info("用户不存在存在,用户为:"+userId);
    8. return 0;
    9. }

update操作幂等性

  • 根据唯一业务号更新数据的情况
  • 用户查询出要修改的数据,系统将数据返回页面,将数据版本号放入隐藏域
  • 用户修改数据,点击提交,将版本号一同提交到后台
  • 后台使用版本号作为更新条件
    1. update set version=version+1,xxx=${xxx} where id =xxx and version=${version}
  • 使用乐观锁与update行锁,保证幂等
    1. <update id="updateUser">
    2. update t_user
    3. <set>
    4. <if test="username != null">
    5. username = #{username,jdbcType=VARCHAR},
    6. </if>
    7. <if test="sex != null">
    8. sex = #{sex,jdbcType=INTEGER},
    9. </if>
    10. <if test="age != null">
    11. age = #{age,jdbcType=INTEGER},
    12. </if>
    13. update_count = update_count + 1,
    14. version = version + 1
    15. </set>
    16. where id = #{id,jdbcType=INTEGER}
    17. and version = #{version,jdbcType=INTEGER}
    18. </update>
  • 更新操作没有唯一业务号,可使用Token机制

insert操作幂等性

  • 有唯一业务号的insert操作,例如:秒杀,商品ID+用户ID
  • 可通过分布式锁,保证接口幂等
  • 业务执行完成后,不进行锁释放,让其过期自动释放
    1. public int insertUser(User user) throws Exception {
    2. // 唯一业务号 username作为key
    3. InterProcessMutex lock = new InterProcessMutex(zkClient, "/" + user.getUsername());
    4. boolean isLock = lock.acquire(30, TimeUnit.SECONDS);
    5. if (isLock) {
    6. return userMapper.insertSelective(user);
    7. }
    8. return 0;
    9. }
  • 没有唯一业务号的insert操作,例如:用户注册,点击多次
  • 使用Token机制,保证幂等性
  • 进入到注册页时,后台统一生成Token,返回前台隐藏域中
  • 用户在页面点击提交时,将token一同传入后台
  • 使用token获取分布式锁,完成insert操作
  • 执行成功后,不释放锁,等待过期自动释放
    1. // 进入注册页面生成token,返回前台
    2. @RequestMapping("register")
    3. public String register(ModelMap map){
    4. String token = UUID.randomUUID().toString();
    5. tokenSet.add(token);
    6. map.addAttribute("user",new User());
    7. map.addAttribute("token",token);
    8. return "/user/user-detail";
    9. }

混合操作幂等性

  • 混合操作,一个接口包含多个操作
  • 同样可以使用Token机制

接口幂等性技术落地

  • 订单结算页面获取订单token,放在redis里,且返回前台隐藏域
    1. @ApiOperation(value = "获取订单token", notes = "获取订单token", httpMethod = "POST")
    2. @PostMapping("/getOrderToken")
    3. public IMOOCJSONResult getOrderToken(HttpSession session) {
    4. String token = UUID.randomUUID().toString();
    5. redisOperator.set("ORDER_TOKEN" + session.getId(), token, 600);
    6. return IMOOCJSONResult.ok(token);
    7. }
  • 创建订单时,使用分布式锁获取token
  1. @ApiOperation(value = "用户下单", notes = "用户下单", httpMethod = "POST")
  2. @PostMapping("/create")
  3. public IMOOCJSONResult create(
  4. @RequestBody SubmitOrderBO submitOrderBO,
  5. HttpServletRequest request,
  6. HttpServletResponse response) {
  7. // 订单key
  8. String orderTokenKey = "ORDER_TOKEN" + request.getSession().getId();
  9. // 分布式锁key
  10. String lockKey = "LOCK_KEY" + request.getSession().getId();
  11. // 获取分布式锁
  12. RLock lock = redissonClient.getLock(lockKey);
  13. lock.lock(5, TimeUnit.SECONDS);
  14. try {
  15. // 从redis获取token
  16. String orderToken = redisOperator.get(orderTokenKey);
  17. if (StringUtils.isEmpty(orderToken)) {
  18. return IMOOCJSONResult.errorMsg("orderToken不存在!");
  19. }
  20. if (!orderToken.equals(submitOrderBO.getToken())) {
  21. return IMOOCJSONResult.errorMsg("orderToken不正确!");
  22. }
  23. // 获取完 删除redis中的订单token
  24. redisOperator.del(orderTokenKey);
  25. } finally {
  26. try {
  27. // 释放锁
  28. lock.unlock();
  29. } catch (Exception e) {
  30. }
  31. }
  32. if (submitOrderBO.getPayMethod() != PayMethod.WEIXIN.type
  33. && submitOrderBO.getPayMethod() != PayMethod.ALIPAY.type) {
  34. return IMOOCJSONResult.errorMsg("支付方式不支持!");
  35. }
  36. String shopcartJson = redisOperator.get(FOODIE_SHOPCART + ":" + submitOrderBO.getUserId());
  37. if (StringUtils.isBlank(shopcartJson)) {
  38. return IMOOCJSONResult.errorMsg("购物数据不正确");
  39. }
  40. List<ShopcartBO> shopcartList = JsonUtils.jsonToList(shopcartJson, ShopcartBO.class);
  41. // 1. 创建订单
  42. PlaceOrderBO orderBO = new PlaceOrderBO(submitOrderBO, shopcartList);
  43. OrderVO orderVO = orderService.createOrder(orderBO);
  44. String orderId = orderVO.getOrderId();
  45. // 2. 创建订单以后,移除购物车中已结算(已提交)的商品
  46. /**
  47. * 1001
  48. * 2002 -> 用户购买
  49. * 3003 -> 用户购买
  50. * 4004
  51. */
  52. // 清理覆盖现有的redis汇总的购物数据
  53. shopcartList.removeAll(orderVO.getToBeRemovedShopcatdList());
  54. redisOperator.set(FOODIE_SHOPCART + ":" + submitOrderBO.getUserId(), JsonUtils.objectToJson(shopcartList));
  55. // 整合redis之后,完善购物车中的已结算商品清除,并且同步到前端的cookie
  56. CookieUtils.setCookie(request, response, FOODIE_SHOPCART, JsonUtils.objectToJson(shopcartList), true);
  57. // order status检查
  58. OrderStatusCheckBO msg = new OrderStatusCheckBO();
  59. msg.setOrderID(orderId);
  60. // 可以采用更短的Delay时间, 在consumer里面重新投递消息
  61. orderStatusProducer.output().send(
  62. MessageBuilder.withPayload(msg)
  63. .setHeader("x-delay", 3600 * 24 * 1000 + 300 * 1000)
  64. .build()
  65. );
  66. // 3. 向支付中心发送当前订单,用于保存支付中心的订单数据
  67. MerchantOrdersVO merchantOrdersVO = orderVO.getMerchantOrdersVO();
  68. merchantOrdersVO.setReturnUrl(payReturnUrl);
  69. // 为了方便测试购买,所以所有的支付金额都统一改为1分钱
  70. merchantOrdersVO.setAmount(1);
  71. HttpHeaders headers = new HttpHeaders();
  72. headers.setContentType(MediaType.APPLICATION_JSON);
  73. headers.add("imoocUserId", "imooc");
  74. headers.add("password", "imooc");
  75. HttpEntity<MerchantOrdersVO> entity =
  76. new HttpEntity<>(merchantOrdersVO, headers);
  77. ResponseEntity<IMOOCJSONResult> responseEntity =
  78. restTemplate.postForEntity(paymentUrl,
  79. entity,
  80. IMOOCJSONResult.class);
  81. IMOOCJSONResult paymentResult = responseEntity.getBody();
  82. if (paymentResult.getStatus() != 200) {
  83. logger.error("发送错误:{}", paymentResult.getMsg());
  84. return IMOOCJSONResult.errorMsg("支付中心订单创建失败,请联系管理员!");
  85. }
  86. return IMOOCJSONResult.ok(orderId);
  87. }

分布式限流

  • 分布式限流介绍
    • 分布式限流几种维度
      • QPS和连接数控制
      • 传输速率
      • 黑白名单
      • 分布式环境
        • 网关层限流:将限流规则应用在所有流量的入口处
        • 中间层限流:限流信息存储在分布式环境中某个中间件里(比如Redis缓存),每个组件都可以从这里获取到当前时刻的流量统计,从而决定是拒绝服务还是放行流量
    • 常见方案
      • Guava:客户端组件,它的作用范围仅限于“当前”这台服务器,不能对集群以内的其他服务器施加流量控制
      • 网关层限流
        • Spring Cloud中的Gateway组件的网关层限流
        • nginx限流
          • 基于IP地址和基于服务器的访问请求限流
          • 并发量(连接数)限流
          • 下行带宽速率限制
      • 中间件限流
        • Redis
      • 限流组件
        • Sentinel
    • 技术选型

分布式限流常用算法

  • 令牌桶算法
    • 令牌 获取到令牌的Request才会被处理,其他Requests要么排队要么被直接丢弃
    • 用来装令牌的地方,所有Request都从这个桶里面获取令牌 分布式接口幂等性、分布式限流 - 图1
    • 令牌生成
      这个流程涉及到令牌生成器和令牌桶,前面我们提到过令牌桶是一个装令牌的地方,既然是个桶那么必然有一个容量,也就是说令牌桶所能容纳的令牌数量是一个固定的数值。
      对于令牌生成器来说,它会根据一个预定的速率向桶中添加令牌,比如我们可以配置让它以每秒100个请求的速率发放令牌,或者每分钟50个。注意这里的发放速度是匀速,也就是说这50个令牌并非是在每个时间窗口刚开始的时候一次性发放,而是会在这个时间窗口内匀速发放。
      在令牌发放器就是一个水龙头,假如在下面接水的桶子满了,那么自然这个水(令牌)就流到了外面。在令牌发放过程中也一样,令牌桶的容量是有限的,如果当前已经放满了额定容量的令牌,那么新来的令牌就会被丢弃掉。
    • 令牌获取
      每个访问请求到来后,必须获取到一个令牌才能执行后面的逻辑。假如令牌的数量少,而访问请求较多的情况下,一部分请求自然无法获取到令牌,那么这个时候我们可以设置一个“缓冲队列”来暂存这些多余的令牌。
      缓冲队列其实是一个可选的选项,并不是所有应用了令牌桶算法的程序都会实现队列。当有缓存队列存在的情况下,那些暂时没有获取到令牌的请求将被放到这个队列中排队,直到新的令牌产生后,再从队列头部拿出一个请求来匹配令牌。
      当队列已满的情况下,这部分访问请求将被丢弃。在实际应用中我们还可以给这个队列加一系列的特效,比如设置队列中请求的存活时间,或者将队列改造为PriorityQueue,根据某种优先级排序,而不是先进先出。
  • 漏桶算法 分布式接口幂等性、分布式限流 - 图2
    • 漏桶算法的前半段和令牌桶类似,但是操作的对象不同,令牌桶是将令牌放入桶里,而漏桶是将访问请求的数据包放到桶里。同样的是,如果桶满了,那么后面新来的数据包将被丢弃。
    • 漏桶算法的后半程是有鲜明特色的,它永远只会以一个恒定的速率将数据包从桶内流出。
    • 打个比方,如果设置了漏桶可以存放100个数据包,然后流出速度是1s一个,那么不管数据包以什么速率流入桶里,也不管桶里有多少数据包,漏桶能保证这些数据包永远以1s一个的恒定速度被处理。
    • 漏桶 VS 令牌桶的区别
      • 根据它们各自的特点不难看出来,这两种算法都有一个“恒定”的速率和“不定”的速率。令牌桶是以恒定速率创建令牌,但是访问请求获取令牌的速率“不定”,反正有多少令牌发多少,令牌没了就干等。而漏桶是以“恒定”的速率处理请求,但是这些请求流入桶的速率是“不定”的
      • 从这两个特点来说,漏桶的天然特性决定了它不会发生突发流量,就算每秒1000个请求到来,那么它对后台服务输出的访问速率永远恒定。而令牌桶则不同,其特性可以“预存”一定量的令牌,因此在应对突发流量的时候可以在短时间消耗所有令牌,其突发流量处理效率会比漏桶高,但是导向后台系统的压力也会相应增多
  • 滑动窗口
    分布式接口幂等性、分布式限流 - 图3
  • 上图中黑色的大框就是时间窗口,我们设定窗口时间为5秒,它会随着时间推移向后滑动。我们将窗口内的时间划分为五个小格子,每个格子代表1秒钟,同时这个格子还包含一个计数器,用来计算在当前时间内访问的请求数量。那么这个时间窗口内的总访问量就是所有格子计数器累加后的数值。
  • 比如说,我们在每一秒内有5个用户访问,第5秒内有10个用户访问,那么在0到5秒这个时间窗口内访问量就是15。如果我们的接口设置了时间窗口内访问上限是20,那么当时间到第六秒的时候,这个时间窗口内的计数总和就变成了10,因为1秒的格子已经退出了时间窗口,因此在第六秒内可以接收的访问量就是20-10=10个。
  • 滑动窗口其实也是一种计算器算法,它有一个显著特点,当时间窗口的跨度越长时,限流效果就越平滑。打个比方,如果当前时间窗口只有两秒,而访问请求全部集中在第一秒的时候,当时间向后滑动一秒后,当前窗口的计数量将发生较大的变化,拉长时间窗口可以降低这种情况的发生概率

Guava RateLimiter客户端的限流

  • 单机版限流
  • 创建rate-limit子项目,引入依赖项
    1. <dependency>
    2. <groupId>com.google.guava</groupId>
    3. <artifactId>guava</artifactId>
    4. <version>18.0</version>
    5. </dependency>
  • 非阻塞式的限流方案 ```java // 每秒钟发放2个通行证 RateLimiter limiter = RateLimiter.create(2.0);

// 非阻塞限流 @GetMapping(“/tryAcquire”) public String tryAcquire(Integer count) { if (limiter.tryAcquire(count)) { log.info(“success, rate is {}”, limiter.getRate()); return “success”; } else { log.info(“fail, rate is {}”, limiter.getRate()); return “fail”; } }

// 限定时间的非阻塞限流 @GetMapping(“/tryAcquireWithTimeout”) public String tryAcquireWithTimeout(Integer count, Integer timeout) { if (limiter.tryAcquire(count, timeout, TimeUnit.SECONDS)) { log.info(“success, rate is {}”, limiter.getRate()); return “success”; } else { log.info(“fail, rate is {}”, limiter.getRate()); return “fail”; } } }

  1. - 同步阻塞式的限流方案
  2. ```java
  3. // 同步阻塞限流
  4. @GetMapping("/acquire")
  5. public String acquire(Integer count) {
  6. limiter.acquire(count);
  7. log.info("success, rate is {}", limiter.getRate());
  8. return "success";
  9. }

基于Nginx的分布式限流

  • 基于Nginx的IP限流
    • 添加controller方法
      1. // Nginx专用
      2. // 1. 修改host文件 -> www.alianlyy-training.top = localhost 127.0.0.1
      3. // (127.0.0.1 www.alianlyy-training.top)
      4. // 2. 修改nginx -> 将步骤1中的域名,添加到路由规则当中
      5. // 配置文件地址: /usr/local/nginx/conf/nginx.conf
      6. // 3. 添加配置项:参考resources文件夹下面的nginx.conf
      7. //
      8. // 重新加载nginx(Nginx处于启动) => sudo /usr/local/nginx/sbin/nginx -s reload
      9. @GetMapping("/nginx")
      10. public String nginx() {
      11. log.info("Nginx success");
      12. return "success";
      13. }
  • 网关层配置(修改Host文件和nginx.conf文件)
    • 修改本地Host文件(vim /etc/hosts)
    • 修改nginx.conf文件
    • 配置Nginx限流规则 ```nginx

      根据IP地址限制速度

      1) 第一个参数 $binary_remote_addr

      binary_目的是缩写内存占用,remote_addr表示通过IP地址来限流

      2) 第二个参数 zone=iplimit:20m

      iplimit是一块内存区域(记录访问频率信息),20m是指这块内存区域的大小

      3) 第三个参数 rate=1r/s

      比如100r/m,标识访问的限流频率

      limit_req_zone $binary_remote_addr zone=iplimit:20m rate=1r/s;

根据服务器级别做限流

limit_req_zone $server_name zone=serverlimit:10m rate=100r/s;

基于连接数的配置

limit_conn_zone $binary_remote_addr zone=perip:20m; limit_conn_zone $server_name zone=perserver:20m;

  1. server {
  2. server_name www.imooc-training.com;
  3. location /access-limit/ {
  4. proxy_pass http://127.0.0.1:10086/;
  5. # 基于IP地址的限制
  6. # 1) 第一个参数zone=iplimit => 引用limit_req_zone中的zone变量
  7. # 2) 第二个参数burst=2,设置一个大小为2的缓冲区域,当大量请求到来。
  8. # 请求数量超过限流频率时,将其放入缓冲区域
  9. # 3) 第三个参数nodelay=> 缓冲区满了以后,直接返回503异常
  10. limit_req zone=iplimit burst=2 nodelay;
  11. # 基于服务器级别的限制
  12. # 通常情况下,server级别的限流速率是最大的
  13. limit_req zone=serverlimit burst=100 nodelay;
  14. # 每个server最多保持100个连接
  15. limit_conn perserver 100;
  16. # 每个IP地址最多保持5个连接
  17. limit_conn perip 5;
  18. # 异常情况,返回504(默认是503)
  19. limit_req_status 504;
  20. limit_conn_status 504;
  21. }
  22. # 下载限制速度
  23. location /download/ {
  24. # 100m之后限制
  25. limit_rate_after 100m;
  26. limit_rate 256k;
  27. }
  28. }
  1. - 基于Nginx的连接数限制和单机限流
  2. - 配置单机限流(类似IP限流)
  3. ```nginx
  4. # 根据服务器级别做限流
  5. limit_req_zone $server_name zone=serverlimit:10m rate=100r/s;
  • 添加Controller方法(耗时接口)
    1. @GetMapping("/nginx-conn")
    2. public String nginxConn(@RequestParam(defaultValue = "0") int secs) {
    3. try {
    4. Thread.sleep(1000 * secs);
    5. } catch (Exception e) {
    6. }
    7. return "success";
    8. }
  • 配置nginx连接数限流规则
    1. # 基于连接数的配置
    2. limit_conn_zone $binary_remote_addr zone=perip:20m;
    3. limit_conn_zone $server_name zone=perserver:20m;
  • IP限流与Conn限流共同作用

基于Redis+Lua的分布式限流

  • Lua基本用法和介绍
    • Lua特点
      • 嵌入式开发
      • 插件开发
      • 完美集成
        • Redis内置Lua解释器
        • 执行过程原子性
        • 脚本预编译
    • 基本用法
      • Hello Lua
      • 一个简易脚本(Lua入门级语法) ```lua — 模拟限流(假的)

— 用作限流的Key local key = ‘My Key’

— 限流的最大阈值=2 local limit = 2

— 当前流量大小 local currentLimit = 0

— 是否超出限流标准 if currentLimit + 1 > limit then print ‘reject’ return false else print ‘accept’ return true end ```

  • Redis预加载Lua
    • 在Redis中执行Lua脚本
    • Lua脚本预导入Redis

电商项目改造-客户端分布式限流

  • 基于Redis+Lua实现限流
    • 编写Lua限流脚本
    • spring-data-redis组件集成Lua和Redis
      • DefaultRedisScript加载Lua脚本
      • RedisTemplate配置(调用Redis)
    • 在Controller中添加测试方法验证限流效果
  • 自定义注解封装限流逻辑
    • 基于Aspect创建自定义注解
    • 配置限流规则的切面
    • 为目标方法添加@AccessLimit注解,验证效果