- 幂等性设计的核心思想
- select、update、delete、insert和混合操作的接口幂等性
- delete操作幂等性
- update操作幂等性
- insert操作幂等性
- 混合操作幂等性
- 接口幂等性技术落地
- 分布式限流
- 分布式限流常用算法
- Guava RateLimiter客户端的限流
- 基于Nginx的分布式限流
- 根据IP地址限制速度
- 1) 第一个参数 $binary_remote_addr
- binary_目的是缩写内存占用,remote_addr表示通过IP地址来限流
- 2) 第二个参数 zone=iplimit:20m
- iplimit是一块内存区域(记录访问频率信息),20m是指这块内存区域的大小
- 3) 第三个参数 rate=1r/s
- 比如100r/m,标识访问的限流频率
- 根据服务器级别做限流
- 基于连接数的配置
- 基于Redis+Lua的分布式限流
- 电商项目改造-客户端分布式限流
接口幂等性
- 提交订单按钮如何防止重复提交?
- 表单录入页如何防止重复提交?
- 微服务接口,客户端重试时,会对业务数据产生影响吗?
- 在系统中,一个接口运行多次,与运行一次的效果是一致的
- 什么情况下需要幂等性
- 重复提交、接口重试、前端操作抖动等
- 业务场景:
- 用户多次点击提交订单,后台应只生成一个订单
- 支付时,由于网络问题重发,应该只扣一次钱
- 并不是所有的接口都要求幂等性,要根据业务而定
幂等性设计的核心思想
- 保证幂等性的策略有哪些?
- 幂等性的核心思想:
- 通过唯一的业务单号保证幂等
- 非并发情况下,查询业务单号有没有操作过,没有则执行操作
- 并发的情况下,整个操作过程加锁
select、update、delete、insert和混合操作的接口幂等性
- select操作:不会对业务数据有影响,天然幂等
- delete操作:第一次已经删除,第二次也不会有影响
- update操作:更新操作传入数据版本号,通过乐观锁实现幂等性
- insert操作:没有唯一业务单号,使用Token保证幂等
- 混合操作:找到操作的唯一业务单号,有则可使用分布式锁,没有可以通过Token保证幂等
delete操作幂等性
- 根据唯一业务号删除
- 第一次删除时,已将数据删除
- 第二次再次执行时,由于找不到记录,所以返回的结果是0,对业务数据没有影响。可在删除前进行数据的查询
- 删除操作没有唯一业务号,则要看具体的业务需求
- 例如:删除所有审核未通过的商品
- 第一次执行,将所有未通过审核的商品删除
- 在第二次执行前,又有新的商品未审核通过
- 执行第二次删除操作,将新的未审核通过的商品要不要删除?
- 根据业务需求而定
public int delUser(Integer userId) {
User user = userMapper.selectByPrimaryKey(userId);
if (user!=null){
log.info("用户存在,用户为:"+userId);
return userMapper.deleteByPrimaryKey(userId);
}
log.info("用户不存在存在,用户为:"+userId);
return 0;
}
update操作幂等性
- 根据唯一业务号更新数据的情况
- 用户查询出要修改的数据,系统将数据返回页面,将数据版本号放入隐藏域
- 用户修改数据,点击提交,将版本号一同提交到后台
- 后台使用版本号作为更新条件
update set version=version+1,xxx=${xxx} where id =xxx and version=${version}
- 使用乐观锁与update行锁,保证幂等
<update id="updateUser">
update t_user
<set>
<if test="username != null">
username = #{username,jdbcType=VARCHAR},
</if>
<if test="sex != null">
sex = #{sex,jdbcType=INTEGER},
</if>
<if test="age != null">
age = #{age,jdbcType=INTEGER},
</if>
update_count = update_count + 1,
version = version + 1
</set>
where id = #{id,jdbcType=INTEGER}
and version = #{version,jdbcType=INTEGER}
</update>
- 更新操作没有唯一业务号,可使用Token机制
insert操作幂等性
- 有唯一业务号的insert操作,例如:秒杀,商品ID+用户ID
- 可通过分布式锁,保证接口幂等
- 业务执行完成后,不进行锁释放,让其过期自动释放
public int insertUser(User user) throws Exception {
// 唯一业务号 username作为key
InterProcessMutex lock = new InterProcessMutex(zkClient, "/" + user.getUsername());
boolean isLock = lock.acquire(30, TimeUnit.SECONDS);
if (isLock) {
return userMapper.insertSelective(user);
}
return 0;
}
- 没有唯一业务号的insert操作,例如:用户注册,点击多次
- 使用Token机制,保证幂等性
- 进入到注册页时,后台统一生成Token,返回前台隐藏域中
- 用户在页面点击提交时,将token一同传入后台
- 使用token获取分布式锁,完成insert操作
- 执行成功后,不释放锁,等待过期自动释放
// 进入注册页面生成token,返回前台
@RequestMapping("register")
public String register(ModelMap map){
String token = UUID.randomUUID().toString();
tokenSet.add(token);
map.addAttribute("user",new User());
map.addAttribute("token",token);
return "/user/user-detail";
}
混合操作幂等性
- 混合操作,一个接口包含多个操作
- 同样可以使用Token机制
接口幂等性技术落地
- 订单结算页面获取订单token,放在redis里,且返回前台隐藏域
@ApiOperation(value = "获取订单token", notes = "获取订单token", httpMethod = "POST")
@PostMapping("/getOrderToken")
public IMOOCJSONResult getOrderToken(HttpSession session) {
String token = UUID.randomUUID().toString();
redisOperator.set("ORDER_TOKEN" + session.getId(), token, 600);
return IMOOCJSONResult.ok(token);
}
- 创建订单时,使用分布式锁获取token
@ApiOperation(value = "用户下单", notes = "用户下单", httpMethod = "POST")
@PostMapping("/create")
public IMOOCJSONResult create(
@RequestBody SubmitOrderBO submitOrderBO,
HttpServletRequest request,
HttpServletResponse response) {
// 订单key
String orderTokenKey = "ORDER_TOKEN" + request.getSession().getId();
// 分布式锁key
String lockKey = "LOCK_KEY" + request.getSession().getId();
// 获取分布式锁
RLock lock = redissonClient.getLock(lockKey);
lock.lock(5, TimeUnit.SECONDS);
try {
// 从redis获取token
String orderToken = redisOperator.get(orderTokenKey);
if (StringUtils.isEmpty(orderToken)) {
return IMOOCJSONResult.errorMsg("orderToken不存在!");
}
if (!orderToken.equals(submitOrderBO.getToken())) {
return IMOOCJSONResult.errorMsg("orderToken不正确!");
}
// 获取完 删除redis中的订单token
redisOperator.del(orderTokenKey);
} finally {
try {
// 释放锁
lock.unlock();
} catch (Exception e) {
}
}
if (submitOrderBO.getPayMethod() != PayMethod.WEIXIN.type
&& submitOrderBO.getPayMethod() != PayMethod.ALIPAY.type) {
return IMOOCJSONResult.errorMsg("支付方式不支持!");
}
String shopcartJson = redisOperator.get(FOODIE_SHOPCART + ":" + submitOrderBO.getUserId());
if (StringUtils.isBlank(shopcartJson)) {
return IMOOCJSONResult.errorMsg("购物数据不正确");
}
List<ShopcartBO> shopcartList = JsonUtils.jsonToList(shopcartJson, ShopcartBO.class);
// 1. 创建订单
PlaceOrderBO orderBO = new PlaceOrderBO(submitOrderBO, shopcartList);
OrderVO orderVO = orderService.createOrder(orderBO);
String orderId = orderVO.getOrderId();
// 2. 创建订单以后,移除购物车中已结算(已提交)的商品
/**
* 1001
* 2002 -> 用户购买
* 3003 -> 用户购买
* 4004
*/
// 清理覆盖现有的redis汇总的购物数据
shopcartList.removeAll(orderVO.getToBeRemovedShopcatdList());
redisOperator.set(FOODIE_SHOPCART + ":" + submitOrderBO.getUserId(), JsonUtils.objectToJson(shopcartList));
// 整合redis之后,完善购物车中的已结算商品清除,并且同步到前端的cookie
CookieUtils.setCookie(request, response, FOODIE_SHOPCART, JsonUtils.objectToJson(shopcartList), true);
// order status检查
OrderStatusCheckBO msg = new OrderStatusCheckBO();
msg.setOrderID(orderId);
// 可以采用更短的Delay时间, 在consumer里面重新投递消息
orderStatusProducer.output().send(
MessageBuilder.withPayload(msg)
.setHeader("x-delay", 3600 * 24 * 1000 + 300 * 1000)
.build()
);
// 3. 向支付中心发送当前订单,用于保存支付中心的订单数据
MerchantOrdersVO merchantOrdersVO = orderVO.getMerchantOrdersVO();
merchantOrdersVO.setReturnUrl(payReturnUrl);
// 为了方便测试购买,所以所有的支付金额都统一改为1分钱
merchantOrdersVO.setAmount(1);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.add("imoocUserId", "imooc");
headers.add("password", "imooc");
HttpEntity<MerchantOrdersVO> entity =
new HttpEntity<>(merchantOrdersVO, headers);
ResponseEntity<IMOOCJSONResult> responseEntity =
restTemplate.postForEntity(paymentUrl,
entity,
IMOOCJSONResult.class);
IMOOCJSONResult paymentResult = responseEntity.getBody();
if (paymentResult.getStatus() != 200) {
logger.error("发送错误:{}", paymentResult.getMsg());
return IMOOCJSONResult.errorMsg("支付中心订单创建失败,请联系管理员!");
}
return IMOOCJSONResult.ok(orderId);
}
分布式限流
- 分布式限流介绍
- 分布式限流几种维度
- QPS和连接数控制
- 传输速率
- 黑白名单
- 分布式环境
- 网关层限流:将限流规则应用在所有流量的入口处
- 中间层限流:限流信息存储在分布式环境中某个中间件里(比如Redis缓存),每个组件都可以从这里获取到当前时刻的流量统计,从而决定是拒绝服务还是放行流量
- 常见方案
- Guava:客户端组件,它的作用范围仅限于“当前”这台服务器,不能对集群以内的其他服务器施加流量控制
- 网关层限流
- Spring Cloud中的Gateway组件的网关层限流
- nginx限流
- 基于IP地址和基于服务器的访问请求限流
- 并发量(连接数)限流
- 下行带宽速率限制
- 中间件限流
- Redis
- 限流组件
- Sentinel
- 技术选型
- 分布式限流几种维度
分布式限流常用算法
- 令牌桶算法
- 令牌 获取到令牌的Request才会被处理,其他Requests要么排队要么被直接丢弃
- 桶 用来装令牌的地方,所有Request都从这个桶里面获取令牌
- 令牌生成
这个流程涉及到令牌生成器和令牌桶,前面我们提到过令牌桶是一个装令牌的地方,既然是个桶那么必然有一个容量,也就是说令牌桶所能容纳的令牌数量是一个固定的数值。
对于令牌生成器来说,它会根据一个预定的速率向桶中添加令牌,比如我们可以配置让它以每秒100个请求的速率发放令牌,或者每分钟50个。注意这里的发放速度是匀速,也就是说这50个令牌并非是在每个时间窗口刚开始的时候一次性发放,而是会在这个时间窗口内匀速发放。
在令牌发放器就是一个水龙头,假如在下面接水的桶子满了,那么自然这个水(令牌)就流到了外面。在令牌发放过程中也一样,令牌桶的容量是有限的,如果当前已经放满了额定容量的令牌,那么新来的令牌就会被丢弃掉。 - 令牌获取
每个访问请求到来后,必须获取到一个令牌才能执行后面的逻辑。假如令牌的数量少,而访问请求较多的情况下,一部分请求自然无法获取到令牌,那么这个时候我们可以设置一个“缓冲队列”来暂存这些多余的令牌。
缓冲队列其实是一个可选的选项,并不是所有应用了令牌桶算法的程序都会实现队列。当有缓存队列存在的情况下,那些暂时没有获取到令牌的请求将被放到这个队列中排队,直到新的令牌产生后,再从队列头部拿出一个请求来匹配令牌。
当队列已满的情况下,这部分访问请求将被丢弃。在实际应用中我们还可以给这个队列加一系列的特效,比如设置队列中请求的存活时间,或者将队列改造为PriorityQueue,根据某种优先级排序,而不是先进先出。
- 漏桶算法
- 漏桶算法的前半段和令牌桶类似,但是操作的对象不同,令牌桶是将令牌放入桶里,而漏桶是将访问请求的数据包放到桶里。同样的是,如果桶满了,那么后面新来的数据包将被丢弃。
- 漏桶算法的后半程是有鲜明特色的,它永远只会以一个恒定的速率将数据包从桶内流出。
- 打个比方,如果设置了漏桶可以存放100个数据包,然后流出速度是1s一个,那么不管数据包以什么速率流入桶里,也不管桶里有多少数据包,漏桶能保证这些数据包永远以1s一个的恒定速度被处理。
- 漏桶 VS 令牌桶的区别
- 根据它们各自的特点不难看出来,这两种算法都有一个“恒定”的速率和“不定”的速率。令牌桶是以恒定速率创建令牌,但是访问请求获取令牌的速率“不定”,反正有多少令牌发多少,令牌没了就干等。而漏桶是以“恒定”的速率处理请求,但是这些请求流入桶的速率是“不定”的
- 从这两个特点来说,漏桶的天然特性决定了它不会发生突发流量,就算每秒1000个请求到来,那么它对后台服务输出的访问速率永远恒定。而令牌桶则不同,其特性可以“预存”一定量的令牌,因此在应对突发流量的时候可以在短时间消耗所有令牌,其突发流量处理效率会比漏桶高,但是导向后台系统的压力也会相应增多
- 滑动窗口
- 上图中黑色的大框就是时间窗口,我们设定窗口时间为5秒,它会随着时间推移向后滑动。我们将窗口内的时间划分为五个小格子,每个格子代表1秒钟,同时这个格子还包含一个计数器,用来计算在当前时间内访问的请求数量。那么这个时间窗口内的总访问量就是所有格子计数器累加后的数值。
- 比如说,我们在每一秒内有5个用户访问,第5秒内有10个用户访问,那么在0到5秒这个时间窗口内访问量就是15。如果我们的接口设置了时间窗口内访问上限是20,那么当时间到第六秒的时候,这个时间窗口内的计数总和就变成了10,因为1秒的格子已经退出了时间窗口,因此在第六秒内可以接收的访问量就是20-10=10个。
- 滑动窗口其实也是一种计算器算法,它有一个显著特点,当时间窗口的跨度越长时,限流效果就越平滑。打个比方,如果当前时间窗口只有两秒,而访问请求全部集中在第一秒的时候,当时间向后滑动一秒后,当前窗口的计数量将发生较大的变化,拉长时间窗口可以降低这种情况的发生概率
Guava RateLimiter客户端的限流
- 单机版限流
- 创建rate-limit子项目,引入依赖项
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
- 非阻塞式的限流方案 ```java // 每秒钟发放2个通行证 RateLimiter limiter = RateLimiter.create(2.0);
// 非阻塞限流 @GetMapping(“/tryAcquire”) public String tryAcquire(Integer count) { if (limiter.tryAcquire(count)) { log.info(“success, rate is {}”, limiter.getRate()); return “success”; } else { log.info(“fail, rate is {}”, limiter.getRate()); return “fail”; } }
// 限定时间的非阻塞限流 @GetMapping(“/tryAcquireWithTimeout”) public String tryAcquireWithTimeout(Integer count, Integer timeout) { if (limiter.tryAcquire(count, timeout, TimeUnit.SECONDS)) { log.info(“success, rate is {}”, limiter.getRate()); return “success”; } else { log.info(“fail, rate is {}”, limiter.getRate()); return “fail”; } } }
- 同步阻塞式的限流方案
```java
// 同步阻塞限流
@GetMapping("/acquire")
public String acquire(Integer count) {
limiter.acquire(count);
log.info("success, rate is {}", limiter.getRate());
return "success";
}
基于Nginx的分布式限流
- 基于Nginx的IP限流
- 添加controller方法
// Nginx专用
// 1. 修改host文件 -> www.alianlyy-training.top = localhost 127.0.0.1
// (127.0.0.1 www.alianlyy-training.top)
// 2. 修改nginx -> 将步骤1中的域名,添加到路由规则当中
// 配置文件地址: /usr/local/nginx/conf/nginx.conf
// 3. 添加配置项:参考resources文件夹下面的nginx.conf
//
// 重新加载nginx(Nginx处于启动) => sudo /usr/local/nginx/sbin/nginx -s reload
@GetMapping("/nginx")
public String nginx() {
log.info("Nginx success");
return "success";
}
- 添加controller方法
- 网关层配置(修改Host文件和nginx.conf文件)
- 修改本地Host文件(vim /etc/hosts)
- 修改nginx.conf文件
- 配置Nginx限流规则
```nginx
根据IP地址限制速度
1) 第一个参数 $binary_remote_addr
binary_目的是缩写内存占用,remote_addr表示通过IP地址来限流
2) 第二个参数 zone=iplimit:20m
iplimit是一块内存区域(记录访问频率信息),20m是指这块内存区域的大小
3) 第三个参数 rate=1r/s
比如100r/m,标识访问的限流频率
limit_req_zone $binary_remote_addr zone=iplimit:20m rate=1r/s;
根据服务器级别做限流
limit_req_zone $server_name zone=serverlimit:10m rate=100r/s;
基于连接数的配置
limit_conn_zone $binary_remote_addr zone=perip:20m; limit_conn_zone $server_name zone=perserver:20m;
server {
server_name www.imooc-training.com;
location /access-limit/ {
proxy_pass http://127.0.0.1:10086/;
# 基于IP地址的限制
# 1) 第一个参数zone=iplimit => 引用limit_req_zone中的zone变量
# 2) 第二个参数burst=2,设置一个大小为2的缓冲区域,当大量请求到来。
# 请求数量超过限流频率时,将其放入缓冲区域
# 3) 第三个参数nodelay=> 缓冲区满了以后,直接返回503异常
limit_req zone=iplimit burst=2 nodelay;
# 基于服务器级别的限制
# 通常情况下,server级别的限流速率是最大的
limit_req zone=serverlimit burst=100 nodelay;
# 每个server最多保持100个连接
limit_conn perserver 100;
# 每个IP地址最多保持5个连接
limit_conn perip 5;
# 异常情况,返回504(默认是503)
limit_req_status 504;
limit_conn_status 504;
}
# 下载限制速度
location /download/ {
# 100m之后限制
limit_rate_after 100m;
limit_rate 256k;
}
}
- 基于Nginx的连接数限制和单机限流
- 配置单机限流(类似IP限流)
```nginx
# 根据服务器级别做限流
limit_req_zone $server_name zone=serverlimit:10m rate=100r/s;
- 添加Controller方法(耗时接口)
@GetMapping("/nginx-conn")
public String nginxConn(@RequestParam(defaultValue = "0") int secs) {
try {
Thread.sleep(1000 * secs);
} catch (Exception e) {
}
return "success";
}
- 配置nginx连接数限流规则
# 基于连接数的配置
limit_conn_zone $binary_remote_addr zone=perip:20m;
limit_conn_zone $server_name zone=perserver:20m;
- IP限流与Conn限流共同作用
基于Redis+Lua的分布式限流
- Lua基本用法和介绍
- Lua特点
- 嵌入式开发
- 插件开发
- 完美集成
- Redis内置Lua解释器
- 执行过程原子性
- 脚本预编译
- 基本用法
- Hello Lua
- 一个简易脚本(Lua入门级语法) ```lua — 模拟限流(假的)
- Lua特点
— 用作限流的Key local key = ‘My Key’
— 限流的最大阈值=2 local limit = 2
— 当前流量大小 local currentLimit = 0
— 是否超出限流标准 if currentLimit + 1 > limit then print ‘reject’ return false else print ‘accept’ return true end ```
- Redis预加载Lua
- 在Redis中执行Lua脚本
- Lua脚本预导入Redis
电商项目改造-客户端分布式限流
- 基于Redis+Lua实现限流
- 编写Lua限流脚本
- spring-data-redis组件集成Lua和Redis
- DefaultRedisScript加载Lua脚本
- RedisTemplate配置(调用Redis)
- 在Controller中添加测试方法验证限流效果
- 自定义注解封装限流逻辑
- 基于Aspect创建自定义注解
- 配置限流规则的切面
- 为目标方法添加@AccessLimit注解,验证效果