- 幂等性设计的核心思想
- select、update、delete、insert和混合操作的接口幂等性
- delete操作幂等性
- update操作幂等性
- insert操作幂等性
- 混合操作幂等性
- 接口幂等性技术落地
- 分布式限流
- 分布式限流常用算法
- Guava RateLimiter客户端的限流
- 基于Nginx的分布式限流
- 根据IP地址限制速度
- 1) 第一个参数 $binary_remote_addr
- binary_目的是缩写内存占用,remote_addr表示通过IP地址来限流
- 2) 第二个参数 zone=iplimit:20m
- iplimit是一块内存区域(记录访问频率信息),20m是指这块内存区域的大小
- 3) 第三个参数 rate=1r/s
- 比如100r/m,标识访问的限流频率
- 根据服务器级别做限流
- 基于连接数的配置
- 基于Redis+Lua的分布式限流
- 电商项目改造-客户端分布式限流
接口幂等性
- 提交订单按钮如何防止重复提交?
- 表单录入页如何防止重复提交?
- 微服务接口,客户端重试时,会对业务数据产生影响吗?
- 在系统中,一个接口运行多次,与运行一次的效果是一致的
- 什么情况下需要幂等性
- 重复提交、接口重试、前端操作抖动等
- 业务场景:
- 用户多次点击提交订单,后台应只生成一个订单
- 支付时,由于网络问题重发,应该只扣一次钱
- 并不是所有的接口都要求幂等性,要根据业务而定
幂等性设计的核心思想
- 保证幂等性的策略有哪些?
- 幂等性的核心思想:
- 通过唯一的业务单号保证幂等
- 非并发情况下,查询业务单号有没有操作过,没有则执行操作
- 并发的情况下,整个操作过程加锁
select、update、delete、insert和混合操作的接口幂等性
- select操作:不会对业务数据有影响,天然幂等
- delete操作:第一次已经删除,第二次也不会有影响
- update操作:更新操作传入数据版本号,通过乐观锁实现幂等性
- insert操作:没有唯一业务单号,使用Token保证幂等
- 混合操作:找到操作的唯一业务单号,有则可使用分布式锁,没有可以通过Token保证幂等
delete操作幂等性
- 根据唯一业务号删除
- 第一次删除时,已将数据删除
- 第二次再次执行时,由于找不到记录,所以返回的结果是0,对业务数据没有影响。可在删除前进行数据的查询
- 删除操作没有唯一业务号,则要看具体的业务需求
- 例如:删除所有审核未通过的商品
- 第一次执行,将所有未通过审核的商品删除
- 在第二次执行前,又有新的商品未审核通过
- 执行第二次删除操作,将新的未审核通过的商品要不要删除?
- 根据业务需求而定
public int delUser(Integer userId) {User user = userMapper.selectByPrimaryKey(userId);if (user!=null){log.info("用户存在,用户为:"+userId);return userMapper.deleteByPrimaryKey(userId);}log.info("用户不存在存在,用户为:"+userId);return 0;}
update操作幂等性
- 根据唯一业务号更新数据的情况
- 用户查询出要修改的数据,系统将数据返回页面,将数据版本号放入隐藏域
- 用户修改数据,点击提交,将版本号一同提交到后台
- 后台使用版本号作为更新条件
update set version=version+1,xxx=${xxx} where id =xxx and version=${version}
- 使用乐观锁与update行锁,保证幂等
<update id="updateUser">update t_user<set><if test="username != null">username = #{username,jdbcType=VARCHAR},</if><if test="sex != null">sex = #{sex,jdbcType=INTEGER},</if><if test="age != null">age = #{age,jdbcType=INTEGER},</if>update_count = update_count + 1,version = version + 1</set>where id = #{id,jdbcType=INTEGER}and version = #{version,jdbcType=INTEGER}</update>
- 更新操作没有唯一业务号,可使用Token机制
insert操作幂等性
- 有唯一业务号的insert操作,例如:秒杀,商品ID+用户ID
- 可通过分布式锁,保证接口幂等
- 业务执行完成后,不进行锁释放,让其过期自动释放
public int insertUser(User user) throws Exception {// 唯一业务号 username作为keyInterProcessMutex lock = new InterProcessMutex(zkClient, "/" + user.getUsername());boolean isLock = lock.acquire(30, TimeUnit.SECONDS);if (isLock) {return userMapper.insertSelective(user);}return 0;}
- 没有唯一业务号的insert操作,例如:用户注册,点击多次
- 使用Token机制,保证幂等性
- 进入到注册页时,后台统一生成Token,返回前台隐藏域中
- 用户在页面点击提交时,将token一同传入后台
- 使用token获取分布式锁,完成insert操作
- 执行成功后,不释放锁,等待过期自动释放
// 进入注册页面生成token,返回前台@RequestMapping("register")public String register(ModelMap map){String token = UUID.randomUUID().toString();tokenSet.add(token);map.addAttribute("user",new User());map.addAttribute("token",token);return "/user/user-detail";}
混合操作幂等性
- 混合操作,一个接口包含多个操作
- 同样可以使用Token机制
接口幂等性技术落地
- 订单结算页面获取订单token,放在redis里,且返回前台隐藏域
@ApiOperation(value = "获取订单token", notes = "获取订单token", httpMethod = "POST")@PostMapping("/getOrderToken")public IMOOCJSONResult getOrderToken(HttpSession session) {String token = UUID.randomUUID().toString();redisOperator.set("ORDER_TOKEN" + session.getId(), token, 600);return IMOOCJSONResult.ok(token);}
- 创建订单时,使用分布式锁获取token
@ApiOperation(value = "用户下单", notes = "用户下单", httpMethod = "POST")@PostMapping("/create")public IMOOCJSONResult create(@RequestBody SubmitOrderBO submitOrderBO,HttpServletRequest request,HttpServletResponse response) {// 订单keyString orderTokenKey = "ORDER_TOKEN" + request.getSession().getId();// 分布式锁keyString lockKey = "LOCK_KEY" + request.getSession().getId();// 获取分布式锁RLock lock = redissonClient.getLock(lockKey);lock.lock(5, TimeUnit.SECONDS);try {// 从redis获取tokenString orderToken = redisOperator.get(orderTokenKey);if (StringUtils.isEmpty(orderToken)) {return IMOOCJSONResult.errorMsg("orderToken不存在!");}if (!orderToken.equals(submitOrderBO.getToken())) {return IMOOCJSONResult.errorMsg("orderToken不正确!");}// 获取完 删除redis中的订单tokenredisOperator.del(orderTokenKey);} finally {try {// 释放锁lock.unlock();} catch (Exception e) {}}if (submitOrderBO.getPayMethod() != PayMethod.WEIXIN.type&& submitOrderBO.getPayMethod() != PayMethod.ALIPAY.type) {return IMOOCJSONResult.errorMsg("支付方式不支持!");}String shopcartJson = redisOperator.get(FOODIE_SHOPCART + ":" + submitOrderBO.getUserId());if (StringUtils.isBlank(shopcartJson)) {return IMOOCJSONResult.errorMsg("购物数据不正确");}List<ShopcartBO> shopcartList = JsonUtils.jsonToList(shopcartJson, ShopcartBO.class);// 1. 创建订单PlaceOrderBO orderBO = new PlaceOrderBO(submitOrderBO, shopcartList);OrderVO orderVO = orderService.createOrder(orderBO);String orderId = orderVO.getOrderId();// 2. 创建订单以后,移除购物车中已结算(已提交)的商品/*** 1001* 2002 -> 用户购买* 3003 -> 用户购买* 4004*/// 清理覆盖现有的redis汇总的购物数据shopcartList.removeAll(orderVO.getToBeRemovedShopcatdList());redisOperator.set(FOODIE_SHOPCART + ":" + submitOrderBO.getUserId(), JsonUtils.objectToJson(shopcartList));// 整合redis之后,完善购物车中的已结算商品清除,并且同步到前端的cookieCookieUtils.setCookie(request, response, FOODIE_SHOPCART, JsonUtils.objectToJson(shopcartList), true);// order status检查OrderStatusCheckBO msg = new OrderStatusCheckBO();msg.setOrderID(orderId);// 可以采用更短的Delay时间, 在consumer里面重新投递消息orderStatusProducer.output().send(MessageBuilder.withPayload(msg).setHeader("x-delay", 3600 * 24 * 1000 + 300 * 1000).build());// 3. 向支付中心发送当前订单,用于保存支付中心的订单数据MerchantOrdersVO merchantOrdersVO = orderVO.getMerchantOrdersVO();merchantOrdersVO.setReturnUrl(payReturnUrl);// 为了方便测试购买,所以所有的支付金额都统一改为1分钱merchantOrdersVO.setAmount(1);HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON);headers.add("imoocUserId", "imooc");headers.add("password", "imooc");HttpEntity<MerchantOrdersVO> entity =new HttpEntity<>(merchantOrdersVO, headers);ResponseEntity<IMOOCJSONResult> responseEntity =restTemplate.postForEntity(paymentUrl,entity,IMOOCJSONResult.class);IMOOCJSONResult paymentResult = responseEntity.getBody();if (paymentResult.getStatus() != 200) {logger.error("发送错误:{}", paymentResult.getMsg());return IMOOCJSONResult.errorMsg("支付中心订单创建失败,请联系管理员!");}return IMOOCJSONResult.ok(orderId);}
分布式限流
- 分布式限流介绍
- 分布式限流几种维度
- QPS和连接数控制
- 传输速率
- 黑白名单
- 分布式环境
- 网关层限流:将限流规则应用在所有流量的入口处
- 中间层限流:限流信息存储在分布式环境中某个中间件里(比如Redis缓存),每个组件都可以从这里获取到当前时刻的流量统计,从而决定是拒绝服务还是放行流量
- 常见方案
- Guava:客户端组件,它的作用范围仅限于“当前”这台服务器,不能对集群以内的其他服务器施加流量控制
- 网关层限流
- Spring Cloud中的Gateway组件的网关层限流
- nginx限流
- 基于IP地址和基于服务器的访问请求限流
- 并发量(连接数)限流
- 下行带宽速率限制
- 中间件限流
- Redis
- 限流组件
- Sentinel
- 技术选型
- 分布式限流几种维度
分布式限流常用算法
- 令牌桶算法
- 令牌 获取到令牌的Request才会被处理,其他Requests要么排队要么被直接丢弃
- 桶 用来装令牌的地方,所有Request都从这个桶里面获取令牌
- 令牌生成
这个流程涉及到令牌生成器和令牌桶,前面我们提到过令牌桶是一个装令牌的地方,既然是个桶那么必然有一个容量,也就是说令牌桶所能容纳的令牌数量是一个固定的数值。
对于令牌生成器来说,它会根据一个预定的速率向桶中添加令牌,比如我们可以配置让它以每秒100个请求的速率发放令牌,或者每分钟50个。注意这里的发放速度是匀速,也就是说这50个令牌并非是在每个时间窗口刚开始的时候一次性发放,而是会在这个时间窗口内匀速发放。
在令牌发放器就是一个水龙头,假如在下面接水的桶子满了,那么自然这个水(令牌)就流到了外面。在令牌发放过程中也一样,令牌桶的容量是有限的,如果当前已经放满了额定容量的令牌,那么新来的令牌就会被丢弃掉。 - 令牌获取
每个访问请求到来后,必须获取到一个令牌才能执行后面的逻辑。假如令牌的数量少,而访问请求较多的情况下,一部分请求自然无法获取到令牌,那么这个时候我们可以设置一个“缓冲队列”来暂存这些多余的令牌。
缓冲队列其实是一个可选的选项,并不是所有应用了令牌桶算法的程序都会实现队列。当有缓存队列存在的情况下,那些暂时没有获取到令牌的请求将被放到这个队列中排队,直到新的令牌产生后,再从队列头部拿出一个请求来匹配令牌。
当队列已满的情况下,这部分访问请求将被丢弃。在实际应用中我们还可以给这个队列加一系列的特效,比如设置队列中请求的存活时间,或者将队列改造为PriorityQueue,根据某种优先级排序,而不是先进先出。
- 漏桶算法
- 漏桶算法的前半段和令牌桶类似,但是操作的对象不同,令牌桶是将令牌放入桶里,而漏桶是将访问请求的数据包放到桶里。同样的是,如果桶满了,那么后面新来的数据包将被丢弃。
- 漏桶算法的后半程是有鲜明特色的,它永远只会以一个恒定的速率将数据包从桶内流出。
- 打个比方,如果设置了漏桶可以存放100个数据包,然后流出速度是1s一个,那么不管数据包以什么速率流入桶里,也不管桶里有多少数据包,漏桶能保证这些数据包永远以1s一个的恒定速度被处理。
- 漏桶 VS 令牌桶的区别
- 根据它们各自的特点不难看出来,这两种算法都有一个“恒定”的速率和“不定”的速率。令牌桶是以恒定速率创建令牌,但是访问请求获取令牌的速率“不定”,反正有多少令牌发多少,令牌没了就干等。而漏桶是以“恒定”的速率处理请求,但是这些请求流入桶的速率是“不定”的
- 从这两个特点来说,漏桶的天然特性决定了它不会发生突发流量,就算每秒1000个请求到来,那么它对后台服务输出的访问速率永远恒定。而令牌桶则不同,其特性可以“预存”一定量的令牌,因此在应对突发流量的时候可以在短时间消耗所有令牌,其突发流量处理效率会比漏桶高,但是导向后台系统的压力也会相应增多
- 滑动窗口
- 上图中黑色的大框就是时间窗口,我们设定窗口时间为5秒,它会随着时间推移向后滑动。我们将窗口内的时间划分为五个小格子,每个格子代表1秒钟,同时这个格子还包含一个计数器,用来计算在当前时间内访问的请求数量。那么这个时间窗口内的总访问量就是所有格子计数器累加后的数值。
- 比如说,我们在每一秒内有5个用户访问,第5秒内有10个用户访问,那么在0到5秒这个时间窗口内访问量就是15。如果我们的接口设置了时间窗口内访问上限是20,那么当时间到第六秒的时候,这个时间窗口内的计数总和就变成了10,因为1秒的格子已经退出了时间窗口,因此在第六秒内可以接收的访问量就是20-10=10个。
- 滑动窗口其实也是一种计算器算法,它有一个显著特点,当时间窗口的跨度越长时,限流效果就越平滑。打个比方,如果当前时间窗口只有两秒,而访问请求全部集中在第一秒的时候,当时间向后滑动一秒后,当前窗口的计数量将发生较大的变化,拉长时间窗口可以降低这种情况的发生概率
Guava RateLimiter客户端的限流
- 单机版限流
- 创建rate-limit子项目,引入依赖项
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>18.0</version></dependency>
- 非阻塞式的限流方案 ```java // 每秒钟发放2个通行证 RateLimiter limiter = RateLimiter.create(2.0);
// 非阻塞限流 @GetMapping(“/tryAcquire”) public String tryAcquire(Integer count) { if (limiter.tryAcquire(count)) { log.info(“success, rate is {}”, limiter.getRate()); return “success”; } else { log.info(“fail, rate is {}”, limiter.getRate()); return “fail”; } }
// 限定时间的非阻塞限流 @GetMapping(“/tryAcquireWithTimeout”) public String tryAcquireWithTimeout(Integer count, Integer timeout) { if (limiter.tryAcquire(count, timeout, TimeUnit.SECONDS)) { log.info(“success, rate is {}”, limiter.getRate()); return “success”; } else { log.info(“fail, rate is {}”, limiter.getRate()); return “fail”; } } }
- 同步阻塞式的限流方案```java// 同步阻塞限流@GetMapping("/acquire")public String acquire(Integer count) {limiter.acquire(count);log.info("success, rate is {}", limiter.getRate());return "success";}
基于Nginx的分布式限流
- 基于Nginx的IP限流
- 添加controller方法
// Nginx专用// 1. 修改host文件 -> www.alianlyy-training.top = localhost 127.0.0.1// (127.0.0.1 www.alianlyy-training.top)// 2. 修改nginx -> 将步骤1中的域名,添加到路由规则当中// 配置文件地址: /usr/local/nginx/conf/nginx.conf// 3. 添加配置项:参考resources文件夹下面的nginx.conf//// 重新加载nginx(Nginx处于启动) => sudo /usr/local/nginx/sbin/nginx -s reload@GetMapping("/nginx")public String nginx() {log.info("Nginx success");return "success";}
- 添加controller方法
- 网关层配置(修改Host文件和nginx.conf文件)
- 修改本地Host文件(vim /etc/hosts)
- 修改nginx.conf文件
- 配置Nginx限流规则
```nginx
根据IP地址限制速度
1) 第一个参数 $binary_remote_addr
binary_目的是缩写内存占用,remote_addr表示通过IP地址来限流
2) 第二个参数 zone=iplimit:20m
iplimit是一块内存区域(记录访问频率信息),20m是指这块内存区域的大小
3) 第三个参数 rate=1r/s
比如100r/m,标识访问的限流频率
limit_req_zone $binary_remote_addr zone=iplimit:20m rate=1r/s;
根据服务器级别做限流
limit_req_zone $server_name zone=serverlimit:10m rate=100r/s;
基于连接数的配置
limit_conn_zone $binary_remote_addr zone=perip:20m; limit_conn_zone $server_name zone=perserver:20m;
server {server_name www.imooc-training.com;location /access-limit/ {proxy_pass http://127.0.0.1:10086/;# 基于IP地址的限制# 1) 第一个参数zone=iplimit => 引用limit_req_zone中的zone变量# 2) 第二个参数burst=2,设置一个大小为2的缓冲区域,当大量请求到来。# 请求数量超过限流频率时,将其放入缓冲区域# 3) 第三个参数nodelay=> 缓冲区满了以后,直接返回503异常limit_req zone=iplimit burst=2 nodelay;# 基于服务器级别的限制# 通常情况下,server级别的限流速率是最大的limit_req zone=serverlimit burst=100 nodelay;# 每个server最多保持100个连接limit_conn perserver 100;# 每个IP地址最多保持5个连接limit_conn perip 5;# 异常情况,返回504(默认是503)limit_req_status 504;limit_conn_status 504;}# 下载限制速度location /download/ {# 100m之后限制limit_rate_after 100m;limit_rate 256k;}}
- 基于Nginx的连接数限制和单机限流- 配置单机限流(类似IP限流)```nginx# 根据服务器级别做限流limit_req_zone $server_name zone=serverlimit:10m rate=100r/s;
- 添加Controller方法(耗时接口)
@GetMapping("/nginx-conn")public String nginxConn(@RequestParam(defaultValue = "0") int secs) {try {Thread.sleep(1000 * secs);} catch (Exception e) {}return "success";}
- 配置nginx连接数限流规则
# 基于连接数的配置limit_conn_zone $binary_remote_addr zone=perip:20m;limit_conn_zone $server_name zone=perserver:20m;
- IP限流与Conn限流共同作用
基于Redis+Lua的分布式限流
- Lua基本用法和介绍
- Lua特点
- 嵌入式开发
- 插件开发
- 完美集成
- Redis内置Lua解释器
- 执行过程原子性
- 脚本预编译
- 基本用法
- Hello Lua
- 一个简易脚本(Lua入门级语法) ```lua — 模拟限流(假的)
- Lua特点
— 用作限流的Key local key = ‘My Key’
— 限流的最大阈值=2 local limit = 2
— 当前流量大小 local currentLimit = 0
— 是否超出限流标准 if currentLimit + 1 > limit then print ‘reject’ return false else print ‘accept’ return true end ```
- Redis预加载Lua
- 在Redis中执行Lua脚本
- Lua脚本预导入Redis
电商项目改造-客户端分布式限流
- 基于Redis+Lua实现限流
- 编写Lua限流脚本
- spring-data-redis组件集成Lua和Redis
- DefaultRedisScript加载Lua脚本
- RedisTemplate配置(调用Redis)
- 在Controller中添加测试方法验证限流效果
- 自定义注解封装限流逻辑
- 基于Aspect创建自定义注解
- 配置限流规则的切面
- 为目标方法添加@AccessLimit注解,验证效果
