如何限流?在工作中是怎么做的?说一下具体的实现?
什么是限流
限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
限流方法
计数器
实现方式
控制单位时间内的请求数量。
import java.util.concurrent.atomic.AtomicInteger;public class Counter {/*** 最大访问数量*/private final int limit = 10;/*** 访问时间差*/private final long timeout = 1000;/*** 请求时间*/private long time;/*** 当前计数器*/private AtomicInteger reqCount = new AtomicInteger(0);public boolean limit() {long now = System.currentTimeMillis();if (now < time + timeout) {// 单位时间内reqCount.addAndGet(1);return reqCount.get() <= limit;} else {// 超出单位时间time = now;reqCount = new AtomicInteger(0);return true;}}}
劣势:
假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 n-1 (n 为限流请求数量),在下一分钟的 00:01 发送 n 个请求,这样在 2 秒钟内请求到达了 2n - 1 个。
设每分钟请求数量为 60 个,每秒可以处理 1 个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时 2 秒钟有 120 个请求(每秒 60 个请求),远远大于了每秒钟处理数量的阈值。
滑动窗口
实现方式
滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位,把一分钟分成若干等分(6 份,每份 10 秒),在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加 1。当等分数量越大限流统计就越详细。
package com.example.demo1.service;import java.util.Iterator;import java.util.Random;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.stream.IntStream;public class TimeWindow {private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();/*** 间隔秒数*/private int seconds;/*** 最大限流*/private int max;public TimeWindow(int max, int seconds) {this.seconds = seconds;this.max = max;/*** 永续线程执行清理queue 任务*/new Thread(() -> {while (true) {try {// 等待 间隔秒数-1 执行清理操作Thread.sleep((seconds - 1) * 1000L);} catch (InterruptedException e) {e.printStackTrace();}clean();}}).start();}public static void main(String[] args) throws Exception {final TimeWindow timeWindow = new TimeWindow(10, 1);// 测试3个线程IntStream.range(0, 3).forEach((i) -> {new Thread(() -> {while (true) {try {Thread.sleep(new Random().nextInt(20) * 100);} catch (InterruptedException e) {e.printStackTrace();}timeWindow.take();}}).start();});}/*** 获取令牌,并且添加时间*/public void take() {long start = System.currentTimeMillis();try {int size = sizeOfValid();if (size > max) {System.err.println("超限");}synchronized (queue) {if (sizeOfValid() > max) {System.err.println("超限");System.err.println("queue中有 " + queue.size() + " 最大数量 " + max);}this.queue.offer(System.currentTimeMillis());}System.out.println("queue中有 " + queue.size() + " 最大数量 " + max);}}public int sizeOfValid() {Iterator<Long> it = queue.iterator();Long ms = System.currentTimeMillis() - seconds * 1000;int count = 0;while (it.hasNext()) {long t = it.next();if (t > ms) {// 在当前的统计时间范围内count++;}}return count;}/*** 清理过期的时间*/public void clean() {Long c = System.currentTimeMillis() - seconds * 1000;Long tl = null;while ((tl = queue.peek()) != null && tl < c) {System.out.println("清理数据");queue.poll();}}}
Leaky Bucket 漏桶
实现方式
规定固定容量的桶,有水进入,有水流出。对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度。
public class LeakBucket {/*** 时间*/private long time;/*** 总量*/private Double total;/*** 水流出去的速度*/private Double rate;/*** 当前总量*/private Double nowSize;public boolean limit() {long now = System.currentTimeMillis();nowSize = Math.max(0, (nowSize - (now - time) * rate));time = now;if ((nowSize + 1) < total) {nowSize++;return true;} else {return false;}}}
Token Bucket 令牌桶
实现方式
规定固定容量的桶, token 以固定速度往桶内填充, 当桶满时 token 不会被继续放入, 每过来一个请求把 token 从桶中移除, 如果桶中没有 token 不能请求。
public class TokenBucket {/*** 时间*/private long time;/*** 总量*/private Double total;/*** token 放入速度*/private Double rate;/*** 当前总量*/private Double nowSize;public boolean limit() {long now = System.currentTimeMillis();nowSize = Math.min(total, nowSize + (now - time) * rate);time = now;if (nowSize < 1) {// 桶里没有tokenreturn false;} else {// 存在tokennowSize -= 1;return true;}}}
工作中的使用
spring cloud gateway
- spring cloud gateway 默认使用 redis 进行限流,笔者一般只是修改修改参数属于拿来即用,并没有去从头实现上述那些算法。
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis-reactive</artifactId></dependency>
spring:cloud:gateway:routes:- id: requestratelimiter_routeuri: lb://pigx-upmsorder: 10000predicates:- Path=/admin/**filters:- name: RequestRateLimiterargs:redis-rate-limiter.replenishRate: 1 # 令牌桶的容积redis-rate-limiter.burstCapacity: 3 # 流速 每秒key-resolver: "#{@remoteAddrKeyResolver}" #SPEL表达式去的对应的bean- StripPrefix=1
@BeanKeyResolver remoteAddrKeyResolver() {return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());}
sentinel
- 通过配置来控制每个 url 的流量
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId></dependency>
spring:cloud:nacos:discovery:server-addr: localhost:8848sentinel:transport:dashboard: localhost:8080port: 8720datasource:ds:nacos:server-addr: localhost:8848dataId: spring-cloud-sentinel-nacosgroupId: DEFAULT_GROUPrule-type: flownamespace: xxxxxxxx
- 配置内容在 nacos 上进行编辑
[{"resource": "/hello","limitApp": "default","grade": 1,"count": 1,"strategy": 0,"controlBehavior": 0,"clusterMode": false}]
- resource:资源名,即限流规则的作用对象。
- limitApp:流控针对的调用来源,若为 default 则不区分调用来源。
- grade:限流阈值类型,QPS 或线程数模式,0 代表根据并发数量来限流,1 代表根据 QPS 来进行流量控制。
- count:限流阈值
- strategy:判断的根据是资源自身,还是根据其它关联资源 (refResource),还是根据链路入口
- controlBehavior:流控效果(直接拒绝 / 排队等待 / 慢启动模式)
- clusterMode:是否为集群模式
总结
sentinel 和 spring cloud gateway 两个框架都是很好的限流框架, 但是在我使用中还没有将spring-cloud-alibaba接入到项目中进行使用, 所以我会选择spring cloud gateway, 当接入完整的或者接入 Nacos 项目使用 setinel 会有更加好的体验.
