Java Semaphore
Semaphore 信号量限流 - 图1

semaphore

限流的方式有很多,从类型上分类,一般可以分为两种:

  • 并发数限流
  • QPS 限流

并发数限流就是限制同一时刻的最大并发请求数,而 QPS 限流指的是限制一段时间内请求数。
semaphore 限流其实属于第一类,通过限制并发数,到达限流的目的。
semaphore中文翻译为信号量,它其实是并发领域中一个重要编程模型,几乎所有支持并发编程的语言都支持信号量这个机制。
JDK 并发包下 Semaphore 类就是信号量的实现类,它的模型比较简单,如下图所示:
Semaphore 信号量限流 - 图2
Semaphore 内部有一个计数器,使用的时候,需要提前初始化。
初始化之后,就可以调用 acquire方法,获取信号量,这时计数器将会减 1。如果此时计数器值小于 0,则会将当前线程阻塞,并且加入到等待队列,否则当前线程继续执行。
执行结束之后,调用 release方法,释放信号量,计数器将会加 1。那如果此时计数器值的小于或等于0,则会唤醒的等待队列一个线程,然后将其移出队列。
并发流量通过 Semaphore进行限流,只有拿到信号量才能继续执行,保证后端资源访问数总是在安全范围。
Semaphore 信号量限流 - 图3

Semaphore 限流

Semaphore 限流常见使用方式

了解完 Semaphore 基本原理之后,实现一个限流器。

  1. public class ConcurrencyLimit {
  2. private Semaphore semaphore;
  3. private ConcurrencyLimit() {
  4. }
  5. public static ConcurrencyLimit create(int permits) {
  6. ConcurrencyLimit concurrencyLimit = new ConcurrencyLimit();
  7. concurrencyLimit.semaphore = new Semaphore(permits);
  8. return concurrencyLimit;
  9. }
  10. public void acquire() throws InterruptedException {
  11. this.semaphore.acquire();
  12. }
  13. public void release() {
  14. this.semaphore.release();
  15. }
  16. public boolean tryAcquire() {
  17. return this.semaphore.tryAcquire();
  18. }
  19. }

限流器底层直接使用 Semaphore,写个例子实际测试一下:

  1. ConcurrencyLimit limit = ConcurrencyLimit.create(5);
  2. ExecutorService executorService = Executors.newCachedThreadPool(
  3. new ThreadFactoryBuilder()
  4. .setNameFormat("limit-%d")
  5. .build());
  6. for (int i = 0; i < 10; i++) {
  7. executorService.execute(() -> {
  8. try {
  9. limit.acquire();
  10. System.out.println(Thread.currentThread().getName() + " START");
  11. // 模拟内部耗时
  12. TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. } finally {
  16. System.out.println(Thread.currentThread().getName() + " END");
  17. limit.release();
  18. }
  19. });
  20. }

:::tips 这里要注意了,Semaphoreacquire方法与 release方法,一定要成对出现。
如果调用 acquire,最后别忘了调用 release,可能会导致程序发生假死等诡异的情况。 ::: 输出结果如下:
Semaphore 信号量限流 - 图4
可以看到,同一时刻,最多只有 5 个线程开始执行任务,起到限流了目的。
其实随便搜下 Semaphore限流,可以看到实现方式跟上面差不多。
那这上面的限流实现真的没问题吗?
「可以说有,也可以说没有,这主要还是要看限流器使用场景。」

Semaphore 限流缺陷

如果换一个场景,将这个限流器用在一个 Web 服务,来看下高并发情况下会有什么问题。

  1. @Slf4j
  2. @Component
  3. public class LimitInterceptor extends HandlerInterceptorAdapter {
  4. ConcurrencyLimit concurrencyLimit;
  5. public LimitInterceptor() {
  6. this.concurrencyLimit = ConcurrencyLimit.create(10);
  7. }
  8. @Override
  9. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  10. concurrencyLimit.acquire();
  11. return true;
  12. }
  13. @Override
  14. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
  15. concurrencyLimit.release();
  16. }
  17. }

如上代码所示,在 SpringMVC 的拦截器中使用限流器。
任一请求执行的时候,首先将会经过 Interceptor拦截器中 preHandle 方法,在这里面调用获取信号量方法。
当请求逻辑完成之后,内部将会调用拦截器的 afterCompletion,在这里释放信号量。
在服务请求内,休眠 100ms,模拟内部接口耗时。
Semaphore 信号量限流 - 图5
下面使用压测神器 「jmeter」 同时发起 500 个并发请求,模拟高并发的情况。
Semaphore 信号量限流 - 图6
压测结果如下图所示:
Semaphore 信号量限流 - 图7
从报表数据可以看到,虽然内部耗时仅仅只有 100 毫秒,但是接口平均请求耗时已经到达了 「2.4s」,P99 的耗时更是到达了 「4.4s」
响应时间增长图如下所示:
Semaphore 信号量限流 - 图8
可以看到并发数越大,接口响应时间也越大。
如果这个限流器真的应用在生产环境,可能刚发布上线的时候,流量比较小,接口响应一切正常。
后面一旦碰到请求数变大,接口响应时间将会拉长,然后客户请求出现大规模的超时。
当压力继续增大,服务端可能就没办法再接受新的请求。


那为什么会这样?
主要是因为 Semaphore#acquire方法如果没有获取到信号量,是会阻塞线程的,然后线程进入等待队列。
默认情况下 Semaphore 使用不公平锁竞争,那在高并发请求下,线程竞争资源比较激烈,有的线程可能运气比较好,直接拿到信号量,那这部分请求接口耗时将会是正常。
但是有部分线程可能运气不佳,直接被阻塞,一直等到最后才能拿到信号量,才能执行。

优化 Semaphore 限流

目前使用的大多数服务,追求的就是一个「低延迟,高吞吐」,那这类服务到达限流线之后,就应该直接拒绝,响应响应错误信息,快速结束请求。
Semaphore 实际还提供另一个tryAcquire 方法,这个方法如果拿不到信号量,将会直接返回 false,比较符合这种场景。
下面优化一下上面的限流代码,主要修改一下拦截器内 preHandle 使用的方法。

  1. @Override
  2. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
  3. if (!concurrencyLimit.tryAcquire()) {
  4. response.getWriter().println("ERROR");
  5. return false;
  6. }
  7. return true;
  8. }

那通过这种方式,可以快速返回错误信息,不用让调用者一直等待。
再使用 jmeter 模拟高并发请求,结果如下:
Semaphore 信号量限流 - 图9
可以看到这次响应时间就没有上一次那么夸张。

总结

可以使用 Semaphore 快速实现一个限流器,不过使用过程一定注意使用场景,谨慎测试,切勿直接复制乱搬网上的代码。
那像大多数的互联网服务来讲,快速响应才是最重要的,所以限流一定不能使用 Semaphore#acquire阻塞式方法。
而像有些后台离线服务,不追求快速响应,只需要完成即可,那这类可以使用 Semaphore#acquire,将线程阻塞直到完成任务。