前言

限流是保护高并发系统的三把利器之一,另外两个是缓存和降级。限流在很多场景中用来限制并发和请求量,比如说秒杀抢购,保护自身系统和下游系统不被巨型流量冲垮等。

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务或进行流量整形。

常用的限流方式和场景有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数,Java的Semaphore也可以实现)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

比如说,我们需要限制方法被调用的并发数不能超过100(同一时间并发数),则我们可以用信号量Semaphore实现。可如果我们要限制方法在一段时间内平均被调用次数不超过100,则需要使用RateLimiter

限流的基础算法

我们先来讲解一下几个限流相关的基本算法:计数器算法、漏桶算法和令牌桶算法。

计数器算法 - 并发量限流

原理:通过限制系统的并发调用程度来限流

实现方式:

  • 确定方法的最大并发量MAX,每次进入方法前计数器+1,将结果和最大并发量MAX比较,如果大于等于MAX,则直接返回;如果小于MAX,则继续执行;退出方法后计数器-1。
  • 或者使用 Semaphore

比如限制服务的并发访问数是100,而服务处理的平均耗时是10毫秒,那么1秒内,该服务平均能提供( 1000 / 10 ) * 100 = 10000 次

优缺点:并发量限流一般用于对于服务资源有严格限制的场景,但是某个服务在业务高峰期和低峰期的并发量很难评估,这给并发阈值的设置带来了困难,需要我们花时间 根据线上业务的监控数据来逐步对并发阈值进行调优。

计数器算法 - 访问量限流

计数器算法是限流算法里最简单也是最容易实现的一种算法。

原理:通过限制单位时间窗口内的请求数量来限流

实现方式:
比如我们限制一个接口每分钟调用不能超过1000次,我们可以这样设计,定义2个变量,一个是访问次数,一个是记录访问次数计算的开始时间,如果记录时间和当前时间比较大于1分钟,则重新计时,如果在一分钟以内,我们把访问次数加一。

下面是个简单的示意图
限流算法 - 图1

  1. private volatile long startTime;
  2. // 访问计数器
  3. private static final AtomicInteger COUNT = new AtomicInteger();
  4. // QPS limit
  5. private static final int LIMIT = 1000;
  6. public boolean acquire() {
  7. long currentTime = System.currentTimeMillis();
  8. // 超过1秒钟,则重新计数
  9. if(currentTime - startTime >= 1000) {
  10. startTime = currentTime;
  11. COUNT.set(0);
  12. }
  13. // 超过qps返回false, 否则count自增
  14. if(COUNT.get() > LIMIT) {
  15. return false;
  16. } else {
  17. COUNT.getAndIncrement();
  18. return true;
  19. }
  20. }

优缺点:
对于访问量限流这种限流方式,实现简单,适用于大多数场景,阈值可以通过服务端来动态配置,甚至可以当做业务开关来使用,但也有一定的局限性,因为我们的阈值是通过分析单位时间段内调用量来设置的,如果它在单位时间段的前几秒就被流量突刺消耗完了,将导致该时间段内剩余的时间内该服务“拒绝服务”,可以将这种现象称为“突刺现象”。

漏桶算法

为了消除”突刺现象”,可以采用漏桶算法实现限流,漏桶算法这个名字就很形象,算法内部有一个容器,类似生活用到的漏斗,当请求进来时,相当于水倒入漏斗,然后从下端小口慢慢匀速的流出。不管上面流量多大,下面流出的速度始终保持不变。

图片.png

从上图中,我们可以看到,就像一个漏斗一样,进来的水量就好像访问流量一样,而出去的水量就像是我们的系统处理请求一样。当访问流量过大时,这个漏斗中就会积水,如果水太多了就会溢出。

漏桶算法的实现往往依赖于队列,请求到达如果队列未满则直接放入队列,然后有一个处理器按照固定频率从队列头取出请求进行处理。如果请求量大,则会导致队列满,那么新来的请求就会被抛弃。

不管服务调用方多么不稳定,通过漏桶算法进行限流,每10毫秒处理一次请求。因为处理的速度是固定的,请求进来的速度是未知的,可能突然进来很多请求,没来得及处理的请求就先放在桶里,既然是个桶,肯定是有容量上限,如果桶满了,那么新进来的请求就丢弃。

限流算法 - 图3

实现:

LinkedBlockingDeque + ThreadPoolExecutor

令牌桶算法

令牌桶算法则是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。桶中存放的令牌数有最大上限,超出之后就被丢弃或者拒绝。当流量或者网络请求到达时,每个请求都要获取一个令牌,如果能够获取到,则直接处理,并且令牌桶删除一个令牌。如果获取不到,该请求就要被限流,要么直接丢弃,要么在缓冲区等待。

图片.png

令牌桶和漏桶对比

  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
  • 令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;
  • 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;

    Guava RateLimiter

Guava [ˈɡwɑːvə]是Java领域优秀的开源项目,它包含了Google在Java项目中使用一些核心库,包含集合(Collections),缓存(Caching),并发编程库(Concurrency),常用注解(Common annotations),String操作,I/O操作方面的众多非常实用的函数。

maven

  1. <dependency>
  2. <groupId>com.google.guava</groupId>
  3. <artifactId>guava</artifactId>
  4. <version>30.0-jre</version>
  5. </dependency>

Guava的RateLimiter提供了令牌桶算法实现:平滑突发限流(SmoothBursty) 和 平滑预热限流(SmoothWarmingUp)实现。

图片.png

RateLimiter的类图如上所示,其中RateLimiter是入口类,它提供了两套工厂方法来创建出两个子类。这很符合《Effective Java》中的用静态工厂方法代替构造函数的建议,毕竟该书的作者也正是Guava库的主要维护者,二者配合”食用”更佳。

  1. // RateLimiter提供了两个工厂方法,最终会调用下面两个函数,生成RateLimiter的两个子类。
  2. static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
  3. RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
  4. rateLimiter.setRate(permitsPerSecond);
  5. return rateLimiter;
  6. }
  7. static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit,
  8. double coldFactor) {
  9. RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
  10. rateLimiter.setRate(permitsPerSecond);
  11. return rateLimiter;
  12. }

平滑突发限流(PTC限流默认模型)

使用RateLimiter的静态方法创建一个限流器,设置每秒放置的令牌数为5个。返回的RateLimiter对象可以保证1秒内不会给超过5个令牌,并且以固定速率进行放置,达到平滑输出的效果。

  1. private void log(Object o) {
  2. System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()) + " " + o);
  3. }
  4. public void testSmoothBursty() {
  5. RateLimiter r = RateLimiter.create(5);
  6. while (true) {
  7. // acquire 方法会返回等待的时间, 单位为s
  8. // time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
  9. log("get 1 tokens: " + r.acquire() + "s");
  10. }
  11. /**
  12. * output: 基本上都是0.2s执行一次,符合一秒发放5个令牌的设定。
  13. * 2020-12-31 15:16:38:302 get 1 tokens: 0.0s
  14. * 2020-12-31 15:16:38:456 get 1 tokens: 0.142193s
  15. * 2020-12-31 15:16:38:646 get 1 tokens: 0.189866s
  16. * 2020-12-31 15:16:38:848 get 1 tokens: 0.198998s
  17. * 2020-12-31 15:16:39:047 get 1 tokens: 0.197115s
  18. * 2020-12-31 15:16:39:246 get 1 tokens: 0.197827s
  19. * 2020-12-31 15:16:39:447 get 1 tokens: 0.199343s
  20. */
  21. }

RateLimiter使用令牌桶算法,会进行令牌的累积,如果获取令牌的频率比较低,则不会等待,直接获取令牌。同时也可以看出,令牌是有上限的。

  1. public void testSmoothBursty2() {
  2. RateLimiter r = RateLimiter.create(2);
  3. while (true) {
  4. log("get 1 tokens: " + r.acquire(1) + "s");
  5. try {
  6. Thread.sleep(2000);
  7. } catch (Exception e) {}
  8. log("get 1 tokens: " + r.acquire(1) + "s");
  9. log("get 1 tokens: " + r.acquire(1) + "s");
  10. log("get 1 tokens: " + r.acquire(1) + "s");
  11. log("get 1 tokens: " + r.acquire(1) + "s");
  12. log("get 1 tokens: " + r.acquire(1) + "s");
  13. log("get 1 tokens: " + r.acquire(1) + "s");
  14. log("get 1 tokens: " + r.acquire(1) + "s");
  15. log("end");
  16. break;
  17. /**
  18. * output:
  19. * 2020-12-31 15:21:50:852 get 1 tokens: 0.0s
  20. * 2020-12-31 15:21:52:856 get 1 tokens: 0.0s
  21. * 2020-12-31 15:21:52:856 get 1 tokens: 0.0s
  22. * 2020-12-31 15:21:52:856 get 1 tokens: 0.0s
  23. * 2020-12-31 15:21:53:362 get 1 tokens: 0.498762s
  24. * 2020-12-31 15:21:53:856 get 1 tokens: 0.492606s
  25. * 2020-12-31 15:21:54:360 get 1 tokens: 0.49916s
  26. * 2020-12-31 15:21:54:858 get 1 tokens: 0.494751s
  27. * 2020-12-31 15:21:54:859 end
  28. */
  29. }
  30. }

RateLimiter由于会累积令牌,所以可以应对突发流量。在下面代码中,有一个请求会直接请求5个令牌,但是由于此时令牌桶中有累积的令牌,足以快速响应。

RateLimiter在没有足够令牌发放时,采用滞后处理的方式,也就是前一个请求获取令牌所需等待的时间由下一次请求来承受,也就是代替前一个请求进行等待。

  1. public void testSmoothBursty3() {
  2. RateLimiter r = RateLimiter.create(5);
  3. while (true) {
  4. log("get 5 tokens: " + r.acquire(5) + "s");
  5. log("get 1 tokens: " + r.acquire(1) + "s");
  6. log("get 1 tokens: " + r.acquire(1) + "s");
  7. log("get 1 tokens: " + r.acquire(1) + "s");
  8. log("end");
  9. /**
  10. * output:
  11. * 2020-12-31 15:23:38:353 get 5 tokens: 0.0s
  12. * 2020-12-31 15:23:39:307 get 1 tokens: 0.944744s 滞后效应,需要替前一个请求进行等待
  13. * 2020-12-31 15:23:39:503 get 1 tokens: 0.191202s
  14. * 2020-12-31 15:23:39:699 get 1 tokens: 0.195371s
  15. * 2020-12-31 15:23:39:699 end
  16. * 2020-12-31 15:23:39:903 get 5 tokens: 0.199419s
  17. * 2020-12-31 15:23:40:904 get 1 tokens: 0.995646s 滞后效应,需要替前一个请求进行等待
  18. * 2020-12-31 15:23:41:101 get 1 tokens: 0.1936s
  19. * 2020-12-31 15:23:41:305 get 1 tokens: 0.197227s
  20. */
  21. }
  22. }

平滑预热限流

RateLimiterSmoothWarmingUp是带有预热期的平滑限流,它启动后会有一段预热期,逐步将分发频率提升到配置的速率。

比如下面代码中的例子,创建一个平均分发令牌速率为2,预热期为3秒钟。由于设置了预热时间是3秒,令牌桶一开始并不会0.5秒发一个令牌,而是形成一个平滑线性下降的坡度,频率越来越高,在3秒钟之内达到原本设置的频率,以后就以固定的频率输出。这种功能适合系统刚启动需要一点时间来“热身”的场景。

  1. public void testSmoothwarmingUp() {
  2. RateLimiter r = RateLimiter.create(2, 3, TimeUnit.SECONDS);
  3. while (true) {
  4. log("get 1 tokens: " + r.acquire(1) + "s");
  5. log("get 1 tokens: " + r.acquire(1) + "s");
  6. log("get 1 tokens: " + r.acquire(1) + "s");
  7. log("get 1 tokens: " + r.acquire(1) + "s");
  8. log("end");
  9. /**
  10. * output:
  11. * 2020-12-31 15:28:21:683 get 1 tokens: 0.0s
  12. * 2020-12-31 15:28:22:982 get 1 tokens: 1.291541s
  13. * 2020-12-31 15:28:23:981 get 1 tokens: 0.992956s
  14. * 2020-12-31 15:28:24:644 get 1 tokens: 0.660037s 上边三次获取的时间相加正好为3秒
  15. * 2020-12-31 15:28:24:644 end
  16. * 2020-12-31 15:28:25:147 get 1 tokens: 0.496783s 正常速率0.5秒一个令牌
  17. * 2020-12-31 15:28:25:647 get 1 tokens: 0.493721s
  18. * 2020-12-31 15:28:26:142 get 1 tokens: 0.494212s
  19. * 2020-12-31 15:28:26:648 get 1 tokens: 0.499116s
  20. */
  21. }
  22. }

源码分析

看完了RateLimiter的基本使用示例后,我们来学习一下它的实现原理。先了解一下几个比较重要的成员变量的含义。

  1. //SmoothRateLimiter.java
  2. //当前存储令牌数
  3. double storedPermits;
  4. //最大存储令牌数
  5. double maxPermits;
  6. //添加令牌时间间隔
  7. double stableIntervalMicros;
  8. /**
  9. * 下一次请求可以获取令牌的起始时间
  10. * 由于RateLimiter允许预消费,上次请求预消费令牌后
  11. * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
  12. */
  13. private long nextFreeTicketMicros = 0L;

平滑突发限流

RateLimiter的原理就是每次调用acquire时用当前时间和nextFreeTicketMicros进行比较,根据二者的间隔和添加单位令牌的时间间隔stableIntervalMicros来刷新存储令牌数storedPermits。然后acquire会进行休眠,直到nextFreeTicketMicros

acquire函数如下所示,它会调用reserve函数计算获取目标令牌数所需等待的时间,然后使用SleepStopwatch进行休眠,最后返回等待时间。

  1. public double acquire(int permits) {
  2. // 计算获取令牌所需等待的时间
  3. long microsToWait = reserve(permits);
  4. // 进行线程sleep
  5. stopwatch.sleepMicrosUninterruptibly(microsToWait);
  6. return 1.0 * microsToWait / SECONDS.toMicros(1L);
  7. }
  8. final long reserve(int permits) {
  9. checkPermits(permits);
  10. // 由于涉及并发操作,所以使用synchronized进行并发操作
  11. synchronized (mutex()) {
  12. return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  13. }
  14. }
  15. final long reserveAndGetWaitLength(int permits, long nowMicros) {
  16. // 计算从当前时间开始,能够获取到目标数量令牌时的时间
  17. long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  18. // 两个时间相减,获得需要等待的时间
  19. return max(momentAvailable - nowMicros, 0);
  20. }


reserveEarliestAvailable是刷新令牌数和下次获取令牌时间nextFreeTicketMicros的关键函数。它有三个步骤,一是调用resync函数增加令牌数,二是计算预支付令牌所需额外等待的时间,三是更新下次获取令牌时间nextFreeTicketMicros和存储令牌数storedPermits

这里涉及RateLimiter的一个特性,也就是可以预先支付令牌,并且所需等待的时间在下次获取令牌时再实际执行。详细的代码逻辑的解释请看注释。

  1. final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  2. // 刷新令牌数,相当于每次acquire时在根据时间进行令牌的刷新
  3. resync(nowMicros);
  4. long returnValue = nextFreeTicketMicros;
  5. // 获取当前已有的令牌数和需要获取的目标令牌数进行比较,计算出可以目前即可得到的令牌数。
  6. double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  7. // freshPermits是需要预先支付的令牌,也就是目标令牌数减去目前即可得到的令牌数
  8. double freshPermits = requiredPermits - storedPermitsToSpend;
  9. // 因为会突然涌入大量请求,而现有令牌数又不够用,因此会预先支付一定的令牌数
  10. // waitMicros即是产生预先支付令牌的数量时间,则将下次要添加令牌的时间应该计算时间加上watiMicros
  11. long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
  12. + (long) (freshPermits * stableIntervalMicros);
  13. // storedPermitsToWaitTime在SmoothWarmingUp和SmoothBuresty的实现不同,用于实现预热缓冲期
  14. // SmoothBuresty的storedPermitsToWaitTime直接返回0,所以watiMicros就是预先支付的令牌所需等待的时间
  15. try {
  16. // 更新nextFreeTicketMicros,本次预先支付的令牌所需等待的时间让下一次请求来实际等待。
  17. this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
  18. } catch (ArithmeticException e) {
  19. this.nextFreeTicketMicros = Long.MAX_VALUE;
  20. }
  21. // 更新令牌数,最低数量为0
  22. this.storedPermits -= storedPermitsToSpend;
  23. // 返回旧的nextFreeTicketMicros数值,无需为预支付的令牌多加等待时间。
  24. return returnValue;
  25. }
  26. // SmoothBurest
  27. long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  28. return 0L;
  29. }


resync函数用于增加存储令牌,核心逻辑就是(nowMicros - nextFreeTicketMicros) / stableIntervalMicros。当前时间大于nextFreeTicketMicros时进行刷新,否则直接返回。

  1. void resync(long nowMicros) {
  2. // 当前时间晚于nextFreeTicketMicros,所以刷新令牌和nextFreeTicketMicros
  3. if (nowMicros > nextFreeTicketMicros) {
  4. // coolDownIntervalMicros函数获取每机秒生成一个令牌,SmoothWarmingUp和SmoothBuresty的实现不同
  5. // SmoothBuresty的coolDownIntervalMicros直接返回stableIntervalMicros
  6. // 当前时间减去要更新令牌的时间获取时间间隔,再除以添加令牌时间间隔获取这段时间内要添加的令牌数
  7. storedPermits = min(maxPermits,
  8. storedPermits
  9. + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
  10. nextFreeTicketMicros = nowMicros;
  11. }
  12. // 如果当前时间早于nextFreeTicketMicros,则获取令牌的线程要一直等待到nextFreeTicketMicros,该线程获取令牌所需
  13. // 额外等待的时间由下一次获取的线程来代替等待。
  14. }
  15. double coolDownIntervalMicros() {
  16. return stableIntervalMicros;
  17. }

下面我们举个例子,让大家更好的理解resyncreserveEarliestAvailable函数的逻辑。

比如RateLimiterstableIntervalMicros为500,也就是1秒发两个令牌,storedPermits为0,nextFreeTicketMicros为1553918495748。线程一acquire(2),当前时间为1553918496248,首先resync函数计算,(1553918496248 - 1553918495748)/500 = 1,所以当前可获取令牌数为1,但是由于可以预支付,所以nextFreeTicketMicros= nextFreeTicketMicro + 1 * 500 = 1553918496748。线程一无需等待。

紧接着,线程二也来acquire(2),首先resync函数发现当前时间早于nextFreeTicketMicros,所以无法增加令牌数,所以需要预支付2个令牌,nextFreeTicketMicros= nextFreeTicketMicro + 2 * 500 = 1553918497748。线程二需要等待1553918496748时刻,也就是线程一获取时计算的nextFreeTicketMicros时刻。同样的,线程三获取令牌时也需要等待到线程二计算的nextFreeTicketMicros时刻。

平滑预热限流

上述就是平滑突发限流RateLimiter的实现,下面我们来看一下加上预热缓冲期的实现原理。

SmoothWarmingUp实现预热缓冲的关键在于其分发令牌的速率会随时间和令牌数而改变,速率会先慢后快。表现形式如下图所示,令牌刷新的时间间隔由长逐渐变短。等存储令牌数从maxPermits到达thresholdPermits时,发放令牌的时间价格也由coldInterval降低到了正常的stableInterval。
图片.png

SmoothWarmingUp的相关代码如下所示,相关的逻辑都写在注释中。

  1. // SmoothWarmingUp,等待时间就是计算上图中梯形或者正方形的面积。
  2. long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  3. /**
  4. * 当前permits超出阈值的部分
  5. */
  6. double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  7. long micros = 0;
  8. /**
  9. * 如果当前存储的令牌数超出thresholdPermits
  10. */
  11. if (availablePermitsAboveThreshold > 0.0) {
  12. /**
  13. * 在阈值右侧并且需要被消耗的令牌数量
  14. */
  15. double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
  16. /**
  17. * 梯形的面积
  18. *
  19. * 高 * (顶 * 底) / 2
  20. *
  21. * 高是 permitsAboveThresholdToTake 也就是右侧需要消费的令牌数
  22. * 底 较长 permitsToTime(availablePermitsAboveThreshold)
  23. * 顶 较短 permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)
  24. */
  25. micros = (long) (permitsAboveThresholdToTake
  26. * (permitsToTime(availablePermitsAboveThreshold)
  27. + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0);
  28. /**
  29. * 减去已经获取的在阈值右侧的令牌数
  30. */
  31. permitsToTake -= permitsAboveThresholdToTake;
  32. }
  33. /**
  34. * 平稳时期的面积,正好是长乘宽
  35. */
  36. micros += (stableIntervalMicros * permitsToTake);
  37. return micros;
  38. }
  39. double coolDownIntervalMicros() {
  40. /**
  41. * 每秒增加的令牌数为 warmup时间/maxPermits. 这样的话,在warmuptime时间内,就就增张的令牌数量
  42. * 为 maxPermits
  43. */
  44. return warmupPeriodMicros / maxPermits;
  45. }

分布式限流

**RateLimiter**只能用于单机的限流,如果想要集群限流,则需要引入**redis**或者阿里开源的**sentinel**中间件。

分布式限流最关键的是要将限流服务做成原子化,而解决方案可以使使用redis+lua或者nginx+lua技术进行实现,通过这两种技术可以实现的高并发和高性能。

首先我们来使用redis+lua实现时间窗内某个接口的请求数限流,实现了该功能后可以改造为限流总并发/请求数和限制总资源数。Lua本身就是一种编程语言,也可以使用它实现复杂的漏桶或令牌桶算法。

redis+lua实现中的lua脚本(计数器算法):

  1. local key = KEYS[1] --限流KEY(一秒一个)
  2. local limit = tonumber(ARGV[1]) --限流大小
  3. local current = tonumber(redis.call('get', key) or "0")
  4. if current + 1 > limit then --如果超出限流大小
  5. return 0
  6. else --请求数+1,并设置1秒过期
  7. redis.call("INCRBY", key, "1")
  8. redis.call("expire", key, "1")
  9. return 1
  10. end

如上操作因是在一个lua脚本中,又因Redis是单线程模型,因此是线程安全的。

如下是Java中判断是否需要限流的代码:

  1. public static boolean acquire() throws Exception {
  2. String luaScript = Files.toString(new File("limit.lua"), Charset.defaultCharset());
  3. Jedis jedis = new Jedis("192.168.147.52", 6379);
  4. String key = "ip:" + System.currentTimeMillis() / 1000; //此处将当前时间戳取秒数
  5. String limit = "3"; //限流大小
  6. return ((Long) jedis.eval(luaScript, Lists.newArrayList(key), Lists.newArrayList(limit))) == 1;
  7. }

因为Redis的限制(Lua中有写操作不能使用带随机性质的读操作,如TIME)不能在Redis Lua中使用TIME获取时间戳,因此只好从应用获取然后传入,在某些极端情况下(机器时钟不准的情况下),限流会存在一些小问题。

使用Nginx+Lua实现的Lua脚本:

  1. local locks = require "resty.lock"
  2. local function acquire()
  3. local lock =locks:new("locks")
  4. local elapsed, err =lock:lock("limit_key") --互斥锁
  5. local limit_counter =ngx.shared.limit_counter --计数器
  6. local key = "ip:" ..os.time()
  7. local limit = 5 --限流大小
  8. local current =limit_counter:get(key)
  9. if current ~= nil and current + 1> limit then --如果超出限流大小
  10. lock:unlock()
  11. return 0
  12. end
  13. if current == nil then
  14. limit_counter:set(key, 1, 1) --第一次需要设置过期时间,设置key的值为1,过期时间为1
  15. else
  16. limit_counter:incr(key, 1) --第二次开始加1即可
  17. end
  18. lock:unlock()
  19. return 1
  20. end
  21. ngx.print(acquire())

实现中我们需要使用lua-resty-lock互斥锁模块来解决原子性问题(在实际工程中使用时请考虑获取锁的超时问题),并使用ngx.shared.DICT共享字典来实现计数器。如果需要限流则返回0,否则返回1。使用时需要先定义两个共享字典(分别用来存放锁和计数器数据):

nginx配置代码

  1. http {
  2. ……
  3. lua_shared_dict locks 10m;
  4. lua_shared_dict limit_counter 10m;
  5. }

有人会纠结如果应用并发量非常大那么redis或者nginx是不是能抗得住;不过这个问题要从多方面考虑:你的流量是不是真的有这么大,是不是可以通过一致性哈希将分布式限流进行分片,是不是可以当并发量太大降级为应用级限流;对策非常多,可以根据实际情况调节;像在京东使用Redis+Lua来限流抢购流量,一般流量是没有问题的。

对于分布式限流目前遇到的场景是业务上的限流,而不是流量入口的限流;流量入口限流应该在接入层完成,而接入层一般使用Nginx。

参考