一、什么是限流?
先打个比喻,2013年刚毕业,去广州上班,工作地点在广州东站的中石油大厦,租房地点位于地铁三号线—龙归地铁线,三号线的首发站是白云机场站,三号线的终点是广州南站(高铁站),全程大概需要2个小时,广州的地铁算是建设的比较早了。刚开始的时候,龙归这个地方租房的人不是很多,坐地铁就比较自由。大概到2015年底,不知道是什么原因,很多人突然搬到了这里,上班早高峰时,大家挤地铁就比较困难。主要是坐地铁的人超级多,乘客搭乘容易拥堵,考虑到整个线路的运营,主要为后续站点预留位置(首发站要是上满了,就只能拒载了,中途站点乘客无法上车)方便大家的出行,然后地铁工作人员都开始限行了。
具体怎么实行的呢?早高峰实行,6:30~8:30,其余时间不限行。地铁出口有好几个,但他们只保留一个入口,其余出口,只出不进。在这个入口外围用限行栏弯弯曲曲的围成一个通道,所有要搭地铁的乘客都在里面排队,每隔10min中放行一批人(人数有限),队列中的人,只能等下一次放行窗口才能进入地铁站,人流量特别多的其他站点也是这么限行的。
上面提到的限行,就跟我们今天的主题限流类似了,单位时间内限制用户的请求次数量(也就是qps)。如果超出这个数量限制,那么响应失败,或者阻塞到能响应的窗口。
二、为什么要限流?
在上面的例子中,如果不限流会怎么样?对于当前站点来说,大家同时在入口匝机那里抢入,容易造成混乱发生,导致坐地铁的效率会被降低。对其他站点来说,龙归这个站点上的人越多,那么后续站点上的人可就少了(系统不可用),相当影响大家的出行体验。
常识是在开发高并发、高可用系统时,有三把利器来保护我们的系统,缓存、限流、降级。限流可以保护我们的系统资源,也可以保证系统的正常运行。
三、怎么实现限流?
3.1 计数器法
3.2 滑动窗口法
3.3 漏桶法
3.4 令牌桶法
令牌桶算法是以固定的速率向桶中放入令牌(桶一开始是空的,并且桶的容量是固定的,当放入桶的token数量达到了桶的容量,多出的令牌直接丢弃),每当有用户请求过来时,就从桶中取出一个令牌给用户,用户就能正常向系统发送请求,如果用户请求时,桶中没有令牌的话,那用户请求就会被限制。
令牌桶算法跟漏桶法相比,它支持流量的突发。
四、单机限流
本文主要讲述令牌桶的使用,具体的实现以后再做分析。最经典的是guava给我们提供的RateLimitor工具。
如何引入guava的工具包到项目类路径下就不多说了,直接看RateLimitor类。
public abstract class RateLimiter {
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
return create(
permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}
@CanIgnoreReturnValue
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) {
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
}
说明,
1 create用来初始化一个限流器。
2 acquire用来获取许可(permit),如果桶中没有令牌可以取出,那么acquire将阻塞到桶中有令牌可以取出为止。
3 tryAcquire用来尝试获取令牌,如果可以从桶中获取到令牌,则直接返回;如果当时桶中没有令牌可取时,那么如果在超时范围内可以取到令牌,那么会阻塞到获取令牌后立即返回。
4.1 acquire
一次取出1个许可
@Test
public void acquire() {
//平均1秒钟生成一个访问许可,在500ms时生产速率达到最大值
RateLimiter limiter = RateLimiter.create(2.0D, 500, TimeUnit.MILLISECONDS);
LOGGER.info("start to tryAcquire permit.");
for (int i = 0; i < 10; i +=1) {
//阻塞,直至能获取到许可为止,返回为了获取到许可而休眠的时间
double permit = limiter.acquire();
LOGGER.info("{} acquire, time: {}.", i, permit);
}
System.out.println(limiter.toString());
}
运行结果
一次取出多个许可
@Test
public void acquireMore() {
//平均1秒钟生成一个访问许可,在500ms时生产速率达到最大值
RateLimiter limiter = RateLimiter.create(1.0D, 500, TimeUnit.MILLISECONDS);
LOGGER.info("start to acquire permit.");
for (int i = 0; i < 10; i +=1) {
//阻塞,一次获取多个许可,直至能获取到许可为止,返回为了获取到许可而休眠的时间
double permit = limiter.acquire(2);
LOGGER.info("{} acquire, time: {}.", i, permit);
}
System.out.println(limiter.toString());
}
结果
4.2 tryAcquire
@Test
public void tryAcquire() {
//平均1秒钟生成一个访问许可,在500ms时生产速率达到最大值
RateLimiter limiter = RateLimiter.create(5.0D, 500, TimeUnit.MILLISECONDS);
LOGGER.info("start to tryAcquire permit.");
for (int i = 0; i < 20; i +=1) {
//非阻塞,尝试获取访问许可,立即返回
boolean permitFlag = limiter.tryAcquire();
LOGGER.info("{} tryAcquire: {}", i, permitFlag);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.info("end to tryAcquire permit.");
System.out.println(limiter);
}
结果
4.3 tryAcquire(int permits, int timeout, TimeUnit unit)
这个方法在超时时间内,可能是阻塞的。
@Test
public void tryAcquireBlock1() {
RateLimiter limiter = RateLimiter.create(10.0D, 10, TimeUnit.MILLISECONDS);
LOGGER.info("start to tryAcquire blockly.");
Runnable task = () -> {
Thread t = Thread.currentThread();
for(int i = 0; i < 100 ; i +=1) {
boolean acquire = limiter.tryAcquire(1,50, TimeUnit.MILLISECONDS);
String info = String.format("%s, %s tryAcquire: %s.", t.getName(), i, acquire);
if(acquire) {
LOGGER.info(info);
}
//任务内部每10ms就去争抢一次许可
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread t1 = new Thread(task, "try-1");
t1.start();
Thread t2 = new Thread(task, "try-2");
t2.start();
Thread t3 = new Thread(task, "try-3");
t3.start();
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(limiter);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
五、分布式限流
https://blog.wangqi.love/articles/Java/限流技术总结.html
http://tech.dianwoda.com/2017/09/11/talk-about-rate-limit/
http://moguhu.com/article/detail?articleId=73