semaphore
限流的方式有很多,从类型上分类,一般可以分为两种:
- 并发数限流
- QPS 限流
并发数限流就是限制同一时刻的最大并发请求数,而 QPS 限流指的是限制一段时间内请求数。semaphore 限流其实属于第一类,通过限制并发数,到达限流的目的。semaphore中文翻译为信号量,它其实是并发领域中一个重要编程模型,几乎所有支持并发编程的语言都支持信号量这个机制。
JDK 并发包下 Semaphore 类就是信号量的实现类,它的模型比较简单,如下图所示:
Semaphore 内部有一个计数器,使用的时候,需要提前初始化。
初始化之后,就可以调用 acquire方法,获取信号量,这时计数器将会减 1。如果此时计数器值小于 0,则会将当前线程阻塞,并且加入到等待队列,否则当前线程继续执行。
执行结束之后,调用 release方法,释放信号量,计数器将会加 1。那如果此时计数器值的小于或等于0,则会唤醒的等待队列一个线程,然后将其移出队列。
并发流量通过 Semaphore进行限流,只有拿到信号量才能继续执行,保证后端资源访问数总是在安全范围。
Semaphore 限流
Semaphore 限流常见使用方式
了解完 Semaphore 基本原理之后,实现一个限流器。
public class ConcurrencyLimit {private Semaphore semaphore;private ConcurrencyLimit() {}public static ConcurrencyLimit create(int permits) {ConcurrencyLimit concurrencyLimit = new ConcurrencyLimit();concurrencyLimit.semaphore = new Semaphore(permits);return concurrencyLimit;}public void acquire() throws InterruptedException {this.semaphore.acquire();}public void release() {this.semaphore.release();}public boolean tryAcquire() {return this.semaphore.tryAcquire();}}
限流器底层直接使用 Semaphore,写个例子实际测试一下:
ConcurrencyLimit limit = ConcurrencyLimit.create(5);ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("limit-%d").build());for (int i = 0; i < 10; i++) {executorService.execute(() -> {try {limit.acquire();System.out.println(Thread.currentThread().getName() + " START");// 模拟内部耗时TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println(Thread.currentThread().getName() + " END");limit.release();}});}
:::tips
这里要注意了,Semaphore的 acquire方法与 release方法,一定要成对出现。
如果调用 acquire,最后别忘了调用 release,可能会导致程序发生假死等诡异的情况。
:::
输出结果如下:
可以看到,同一时刻,最多只有 5 个线程开始执行任务,起到限流了目的。
其实随便搜下 Semaphore限流,可以看到实现方式跟上面差不多。
那这上面的限流实现真的没问题吗?
「可以说有,也可以说没有,这主要还是要看限流器使用场景。」
Semaphore 限流缺陷
如果换一个场景,将这个限流器用在一个 Web 服务,来看下高并发情况下会有什么问题。
@Slf4j@Componentpublic class LimitInterceptor extends HandlerInterceptorAdapter {ConcurrencyLimit concurrencyLimit;public LimitInterceptor() {this.concurrencyLimit = ConcurrencyLimit.create(10);}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {concurrencyLimit.acquire();return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {concurrencyLimit.release();}}
如上代码所示,在 SpringMVC 的拦截器中使用限流器。
任一请求执行的时候,首先将会经过 Interceptor拦截器中 preHandle 方法,在这里面调用获取信号量方法。
当请求逻辑完成之后,内部将会调用拦截器的 afterCompletion,在这里释放信号量。
在服务请求内,休眠 100ms,模拟内部接口耗时。
下面使用压测神器 「jmeter」 同时发起 500 个并发请求,模拟高并发的情况。
压测结果如下图所示:
从报表数据可以看到,虽然内部耗时仅仅只有 100 毫秒,但是接口平均请求耗时已经到达了 「2.4s」,P99 的耗时更是到达了 「4.4s」。
响应时间增长图如下所示:
可以看到并发数越大,接口响应时间也越大。
如果这个限流器真的应用在生产环境,可能刚发布上线的时候,流量比较小,接口响应一切正常。
后面一旦碰到请求数变大,接口响应时间将会拉长,然后客户请求出现大规模的超时。
当压力继续增大,服务端可能就没办法再接受新的请求。
那为什么会这样?
主要是因为 Semaphore#acquire方法如果没有获取到信号量,是会阻塞线程的,然后线程进入等待队列。
默认情况下 Semaphore 使用不公平锁竞争,那在高并发请求下,线程竞争资源比较激烈,有的线程可能运气比较好,直接拿到信号量,那这部分请求接口耗时将会是正常。
但是有部分线程可能运气不佳,直接被阻塞,一直等到最后才能拿到信号量,才能执行。
优化 Semaphore 限流
目前使用的大多数服务,追求的就是一个「低延迟,高吞吐」,那这类服务到达限流线之后,就应该直接拒绝,响应响应错误信息,快速结束请求。
那 Semaphore 实际还提供另一个tryAcquire 方法,这个方法如果拿不到信号量,将会直接返回 false,比较符合这种场景。
下面优化一下上面的限流代码,主要修改一下拦截器内 preHandle 使用的方法。
@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {if (!concurrencyLimit.tryAcquire()) {response.getWriter().println("ERROR");return false;}return true;}
那通过这种方式,可以快速返回错误信息,不用让调用者一直等待。
再使用 jmeter 模拟高并发请求,结果如下:
可以看到这次响应时间就没有上一次那么夸张。
总结
可以使用 Semaphore 快速实现一个限流器,不过使用过程一定注意使用场景,谨慎测试,切勿直接复制乱搬网上的代码。
那像大多数的互联网服务来讲,快速响应才是最重要的,所以限流一定不能使用 Semaphore#acquire阻塞式方法。
而像有些后台离线服务,不追求快速响应,只需要完成即可,那这类可以使用 Semaphore#acquire,将线程阻塞直到完成任务。
