一、什么是限流?

先打个比喻,2013年刚毕业,去广州上班,工作地点在广州东站的中石油大厦,租房地点位于地铁三号线—龙归地铁线,三号线的首发站是白云机场站,三号线的终点是广州南站(高铁站),全程大概需要2个小时,广州的地铁算是建设的比较早了。刚开始的时候,龙归这个地方租房的人不是很多,坐地铁就比较自由。大概到2015年底,不知道是什么原因,很多人突然搬到了这里,上班早高峰时,大家挤地铁就比较困难。主要是坐地铁的人超级多,乘客搭乘容易拥堵,考虑到整个线路的运营,主要为后续站点预留位置(首发站要是上满了,就只能拒载了,中途站点乘客无法上车)方便大家的出行,然后地铁工作人员都开始限行了。
具体怎么实行的呢?早高峰实行,6:30~8:30,其余时间不限行。地铁出口有好几个,但他们只保留一个入口,其余出口,只出不进。在这个入口外围用限行栏弯弯曲曲的围成一个通道,所有要搭地铁的乘客都在里面排队,每隔10min中放行一批人(人数有限),队列中的人,只能等下一次放行窗口才能进入地铁站,人流量特别多的其他站点也是这么限行的。
上面提到的限行,就跟我们今天的主题限流类似了,单位时间内限制用户的请求次数量(也就是qps)。如果超出这个数量限制,那么响应失败,或者阻塞到能响应的窗口。

二、为什么要限流?

在上面的例子中,如果不限流会怎么样?对于当前站点来说,大家同时在入口匝机那里抢入,容易造成混乱发生,导致坐地铁的效率会被降低。对其他站点来说,龙归这个站点上的人越多,那么后续站点上的人可就少了(系统不可用),相当影响大家的出行体验。

常识是在开发高并发、高可用系统时,有三把利器来保护我们的系统,缓存、限流、降级。限流可以保护我们的系统资源,也可以保证系统的正常运行。

三、怎么实现限流?

3.1 计数器法
3.2 滑动窗口法
3.3 漏桶法
3.4 令牌桶法
令牌桶算法是以固定的速率向桶中放入令牌(桶一开始是空的,并且桶的容量是固定的,当放入桶的token数量达到了桶的容量,多出的令牌直接丢弃),每当有用户请求过来时,就从桶中取出一个令牌给用户,用户就能正常向系统发送请求,如果用户请求时,桶中没有令牌的话,那用户请求就会被限制。
令牌桶算法跟漏桶法相比,它支持流量的突发。

四、单机限流

本文主要讲述令牌桶的使用,具体的实现以后再做分析。最经典的是guava给我们提供的RateLimitor工具。
如何引入guava的工具包到项目类路径下就不多说了,直接看RateLimitor类。

  1. public abstract class RateLimiter {
  2. public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
  3. checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
  4. return create(
  5. permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
  6. }
  7. @CanIgnoreReturnValue
  8. public double acquire(int permits) {
  9. long microsToWait = reserve(permits);
  10. stopwatch.sleepMicrosUninterruptibly(microsToWait);
  11. return 1.0 * microsToWait / SECONDS.toMicros(1L);
  12. }
  13. public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  14. long timeoutMicros = max(unit.toMicros(timeout), 0);
  15. checkPermits(permits);
  16. long microsToWait;
  17. synchronized (mutex()) {
  18. long nowMicros = stopwatch.readMicros();
  19. if (!canAcquire(nowMicros, timeoutMicros)) {
  20. return false;
  21. } else {
  22. microsToWait = reserveAndGetWaitLength(permits, nowMicros);
  23. }
  24. }
  25. stopwatch.sleepMicrosUninterruptibly(microsToWait);
  26. return true;
  27. }
  28. }

说明,
1 create用来初始化一个限流器。
2 acquire用来获取许可(permit),如果桶中没有令牌可以取出,那么acquire将阻塞到桶中有令牌可以取出为止。
3 tryAcquire用来尝试获取令牌,如果可以从桶中获取到令牌,则直接返回;如果当时桶中没有令牌可取时,那么如果在超时范围内可以取到令牌,那么会阻塞到获取令牌后立即返回。

4.1 acquire
一次取出1个许可

  1. @Test
  2. public void acquire() {
  3. //平均1秒钟生成一个访问许可,在500ms时生产速率达到最大值
  4. RateLimiter limiter = RateLimiter.create(2.0D, 500, TimeUnit.MILLISECONDS);
  5. LOGGER.info("start to tryAcquire permit.");
  6. for (int i = 0; i < 10; i +=1) {
  7. //阻塞,直至能获取到许可为止,返回为了获取到许可而休眠的时间
  8. double permit = limiter.acquire();
  9. LOGGER.info("{} acquire, time: {}.", i, permit);
  10. }
  11. System.out.println(limiter.toString());
  12. }

运行结果
RateLimitor-2.jpg

一次取出多个许可

  1. @Test
  2. public void acquireMore() {
  3. //平均1秒钟生成一个访问许可,在500ms时生产速率达到最大值
  4. RateLimiter limiter = RateLimiter.create(1.0D, 500, TimeUnit.MILLISECONDS);
  5. LOGGER.info("start to acquire permit.");
  6. for (int i = 0; i < 10; i +=1) {
  7. //阻塞,一次获取多个许可,直至能获取到许可为止,返回为了获取到许可而休眠的时间
  8. double permit = limiter.acquire(2);
  9. LOGGER.info("{} acquire, time: {}.", i, permit);
  10. }
  11. System.out.println(limiter.toString());
  12. }

结果
RateLimitor_2.jpg

4.2 tryAcquire

  1. @Test
  2. public void tryAcquire() {
  3. //平均1秒钟生成一个访问许可,在500ms时生产速率达到最大值
  4. RateLimiter limiter = RateLimiter.create(5.0D, 500, TimeUnit.MILLISECONDS);
  5. LOGGER.info("start to tryAcquire permit.");
  6. for (int i = 0; i < 20; i +=1) {
  7. //非阻塞,尝试获取访问许可,立即返回
  8. boolean permitFlag = limiter.tryAcquire();
  9. LOGGER.info("{} tryAcquire: {}", i, permitFlag);
  10. try {
  11. TimeUnit.MILLISECONDS.sleep(100);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. LOGGER.info("end to tryAcquire permit.");
  17. System.out.println(limiter);
  18. }

结果
RateLimitor-1.jpg

4.3 tryAcquire(int permits, int timeout, TimeUnit unit)
这个方法在超时时间内,可能是阻塞的。

  1. @Test
  2. public void tryAcquireBlock1() {
  3. RateLimiter limiter = RateLimiter.create(10.0D, 10, TimeUnit.MILLISECONDS);
  4. LOGGER.info("start to tryAcquire blockly.");
  5. Runnable task = () -> {
  6. Thread t = Thread.currentThread();
  7. for(int i = 0; i < 100 ; i +=1) {
  8. boolean acquire = limiter.tryAcquire(1,50, TimeUnit.MILLISECONDS);
  9. String info = String.format("%s, %s tryAcquire: %s.", t.getName(), i, acquire);
  10. if(acquire) {
  11. LOGGER.info(info);
  12. }
  13. //任务内部每10ms就去争抢一次许可
  14. try {
  15. TimeUnit.MILLISECONDS.sleep(10);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. };
  21. Thread t1 = new Thread(task, "try-1");
  22. t1.start();
  23. Thread t2 = new Thread(task, "try-2");
  24. t2.start();
  25. Thread t3 = new Thread(task, "try-3");
  26. t3.start();
  27. try {
  28. TimeUnit.SECONDS.sleep(1);
  29. System.out.println(limiter);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }

结果
RateLimiter-4.jpg

五、分布式限流

https://blog.wangqi.love/articles/Java/限流技术总结.html

http://tech.dianwoda.com/2017/09/11/talk-about-rate-limit/
http://moguhu.com/article/detail?articleId=73

http://moguhu.com/article/detail?articleId=75

https://my.oschina.net/editorial-story/blog/2999730