Guava 是 Google 开源的 Java 类库,提供了一个工具类 RateLimiter。假设我们有一个线程池,它每秒只能处理两个任务,如果提交的任务过快,可能导致系统不稳定,这个时候就需要用到限流。

在下面代码中,我们创建了一个流速为 2 个请求/秒的限流器。直观地看,2 个请求/秒指的是每秒最多允许 2 个请求通过限流器,其实在 Guava 中,流速是一种匀速的概念,2 个请求/秒等价于 1 个请求/500 毫秒。在向线程池提交任务之前,调用 acquire() 方法就能起到限流的作用。通过示例代码的执行结果,任务提交到线程池的时间间隔基本上稳定在 500 毫秒。

  1. // 限流器流速:2个请求/秒
  2. RateLimiter limiter = RateLimiter.create(2.0);
  3. // 执行任务的线程池
  4. ExecutorService es = Executors.newFixedThreadPool(1);
  5. // 记录上一次执行时间
  6. prev = System.nanoTime();
  7. // 测试执行20次
  8. for (int i = 0; i < 20; i++) {
  9. // 限流器限流
  10. limiter.acquire();
  11. es.execute(() -> {
  12. long cur = System.nanoTime();
  13. System.out.println((cur - prev) / 1000_000);
  14. prev = cur;
  15. });
  16. }
  17. 输出结果:
  18. ...
  19. 500
  20. 499
  21. 499
  22. 500
  23. 499

令牌桶算法

Guava 的限流器采用的是令牌桶算法,其核心是要想通过限流器,必须拿到令牌。也就是说,只要我们能够限制发放令牌的速率,那么就能控制流速了。令牌桶算法的详细描述如下:

  • 令牌以固定的速率添加到令牌桶中,假设限流的速率是 r/ 秒,则令牌每 1/r 秒会添加一个;
  • 假设令牌桶的容量是 b ,如果令牌桶已满,则新的令牌会被丢弃;
  • 请求能够通过限流器的前提是令牌桶中有令牌。

这个算法中,令牌桶的容量 b 该怎么理解呢?b 其实是 burst 的简写,意义是限流器允许的最大突发流量。比如 b=10,而且令牌桶中的令牌已满,此时限流器允许 10 个请求同时通过限流器,当然只是突发流量而已,这 10 个请求会带走 10 个令牌,所以后续的流量只能按照速率 r 通过限流器。

令牌桶这个算法,如何用 Java 实现呢?很可能你的直觉会告诉你生产者 - 消费者模式:一个生产者线程定时向阻塞队列中添加令牌,而试图通过限流器的线程则作为消费者线程,只有从阻塞队列中获取到令牌,才允许通过限流器。这个算法看上去非常完美,但实际情况却是使用限流的场景大部分都是高并发场景,而且系统压力已经临近极限了,此时这个实现就有问题了。

问题就出在定时器上,在高并发场景下,当系统压力已经临近极限时,定时器的精度误差会非常大,同时定时器本身会创建调度线程,也会对系统的性能产生影响。那下面我们就来看看 Guava 是如何实现的?

Guava 实现令牌桶算法

Guava 实现令牌桶算法非常简单,其关键是记录并动态计算下一令牌发放的时间。下面以一个最简单的场景来介绍该算法的执行过程。假设令牌桶的容量为 b=1,限流速率 r=1 个请求/秒,如果当前令牌桶中没有令牌,下一个令牌的发放时间是在第 3 秒,而在第 2 秒时有一个线程 T1 请求令牌,此时该如何处理呢?
image.png
对于这个请求令牌的线程而言,很显然需要等待 1 秒,因为 1 秒以后(第 3 秒)它就能拿到令牌了。此时需要注意的是,下一个令牌发放的时间也要增加 1 秒,为什么呢?因为第 3 秒发放的令牌已经被线程 T1 预占了。处理之后如下图所示。
image.png
假设 T1 在预占了第 3 秒的令牌之后,马上又有一个线程 T2 请求令牌,如下图所示。
image.png
很显然,由于下一个令牌产生的时间是第 4 秒,所以线程 T2 要等待两秒的时间,才能获取到令牌,同时由于 T2 预占了第 4 秒的令牌,所以下一令牌产生时间还要增加 1 秒,完全处理之后,如下图所示。
image.png
上面线程 T1、T2 都是在下一令牌产生时间之前请求令牌,如果线程在下一令牌产生时间之后请求令牌会如何呢?假设在线程 T1 请求令牌之后的 5 秒,也就是第 7 秒,线程 T3 请求令牌,如下图所示。
image.png
由于在第 5 秒已经产生了一个令牌,所以此时线程 T3 可以直接拿到令牌,而无需等待。在第 7 秒,实际上限流器能够产生 3 个令牌,第 5、6、7 秒各产生一个令牌。由于我们假设令牌桶的容量是 1,所以第 6、7 秒产生的令牌就丢弃了,其实等价地你也可以认为是保留的第 7 秒的令牌,丢弃的第 5、6 秒的令牌,也就是说第 7 秒的令牌被线程 T3 占有了,于是下一令牌的的产生时间应该是第 8 秒,如下图所示。
image.png
通过上面地分析发现,我们只需要记录一个下一令牌产生的时间,并动态更新它,就能够轻松完成限流功能。我们可以将上面的这个算法代码化,示例代码如下所示,依然假设令牌桶的容量是 1。关键是 reserve() 方法,这个方法会为请求令牌的线程预分配令牌,同时返回该线程能够获取令牌的时间。

其实现逻辑就是上面提到的:如果线程请求令牌的时间在下一令牌产生时间之后,那么该线程立刻就能够获取令牌;反之,如果请求时间在下一令牌产生时间之前,那么该线程是在下一令牌产生的时间获取令牌。由于此时下一令牌已经被该线程预占,所以下一令牌产生的时间需要加上 1 秒。

  1. public class SimpleLimiter {
  2. // 下一令牌产生时间
  3. private long next = System.nanoTime();
  4. // 发放令牌间隔:纳秒
  5. private long interval = 1000_000_000;
  6. // 预占令牌,返回能够获取令牌的时间
  7. private synchronized long reserve(long now) {
  8. // 请求时间在下一令牌产生时间之后,重新计算下一令牌产生时间
  9. if (now > next) {
  10. next = now;
  11. }
  12. long at = next;
  13. // 设置下一令牌产生时间
  14. next += interval;
  15. return Math.max(at, 0L);
  16. }
  17. // 申请令牌
  18. public void acquire() {
  19. // 申请令牌时的时间
  20. long now = System.nanoTime();
  21. // 预占令牌
  22. long at = reserve(now);
  23. long waitTime = max(at-now, 0);
  24. // 按照条件等待
  25. if (waitTime > 0) {
  26. try {
  27. TimeUnit.NANOSECONDS.sleep(waitTime);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. }

如果令牌桶的容量大于 1 又该如何处理呢?按照令牌桶算法,令牌要首先从令牌桶中出,所以我们需要按需计算令牌桶中的数量,当有线程请求令牌时,先从令牌桶中出。

具体的代码实现如下所示。我们增加了一个 resync() 方法,在这个方法中,如果线程请求令牌的时间在下一令牌产生时间之后,会重新计算令牌桶中的令牌数,新产生的令牌的计算公式是:(now-next)/interval,你可对照上面的示意图来理解。reserve() 方法中,则增加了先从令牌桶中出令牌的逻辑,不过需要注意的是,如果令牌是从令牌桶中出的,那么 next 就无需增加一个 interval 了。

  1. public class SimpleLimiter {
  2. // 当前令牌桶中的令牌数量
  3. private long storedPermits = 0;
  4. // 令牌桶的容量
  5. private long maxPermits = 3;
  6. // 下一令牌产生时间
  7. private long next = System.nanoTime();
  8. // 发放令牌间隔:纳秒
  9. private long interval = 1000_000_000;
  10. // 请求时间在下一令牌产生时间之后,则重新计算令牌桶中的令牌数,并将下一个令牌发放时间重置为当前时间
  11. private void resync(long now) {
  12. if (now > next) {
  13. // 新产生的令牌数
  14. long newPermits = (now-next)/interval;
  15. // 新令牌增加到令牌桶
  16. storedPermits = min(maxPermits, storedPermits + newPermits);
  17. // 将下一个令牌发放时间重置为当前时间
  18. next = now;
  19. }
  20. }
  21. // 预占令牌,返回能够获取令牌的时间
  22. private synchronized long reserve(long now){
  23. resync(now);
  24. // 能够获取令牌的时间
  25. long at = next;
  26. // 令牌桶中能提供的令牌
  27. long fb = min(1, storedPermits);
  28. // 令牌净需求:首先减掉令牌桶中的令牌
  29. long nr = 1 - fb;
  30. // 重新计算下一令牌产生时间
  31. next = next + nr * interval;
  32. // 重新计算令牌桶中的令牌
  33. this.storedPermits -= fb;
  34. return at;
  35. }
  36. // 申请令牌
  37. public void acquire() {
  38. long now = System.nanoTime();
  39. long at = reserve(now);
  40. long waitTime = max(at-now, 0);
  41. if (waitTime > 0) {
  42. try {
  43. TimeUnit.NANOSECONDS.sleep(waitTime);
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. }