Java

认识 Semaphore

Semaphore 是什么

Semaphore 一般译作信号量,它也是一种线程同步工具,主要用于多个线程对共享资源进行并行操作的一种工具类。它代表了一种许可的概念,是否允许多线程对同一资源进行操作的许可,使用 Semaphore 可以控制并发访问资源的线程个数。

Semaphore 的使用场景

Semaphore 的使用场景主要用于流量控制,比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的数量,当连接到达了限制数量后,后面的线程只能排队等前面的线程释放数据库连接后才能获得数据库连接。
再比如交通公路上的红绿灯,绿灯亮起时只能让 100 辆车通过,红灯亮起不允许车辆通过。
再比如停车场的场景中,一个停车场有有限数量的车位,同时能够容纳多少台车,车位满了之后只有等里面的车离开停车场外面的车才可以进入。

Semaphore 使用

下面就来模拟一下停车场的业务场景:在进入停车场之前会有一个提示牌,上面显示着停车位还有多少,当车位为 0 时,不能进入停车场,当车位不为 0 时,才会允许车辆进入停车场。所以停车场有几个关键因素:停车场车位的总容量,当一辆车进入时,停车场车位的总容量 - 1,当一辆车离开时,总容量 + 1,停车场车位不足时,车辆只能在停车场外等待。

  1. public class CarParking {
  2. private static Semaphore semaphore = new Semaphore(10);
  3. public static void main(String[] args){
  4. for(int i = 0;i< 100;i++){
  5. Thread thread = new Thread(new Runnable() {
  6. @Override
  7. public void run() {
  8. System.out.println("欢迎 " + Thread.currentThread().getName() + " 来到停车场");
  9. // 判断是否允许停车
  10. if(semaphore.availablePermits() == 0) {
  11. System.out.println("车位不足,请耐心等待");
  12. }
  13. try {
  14. // 尝试获取
  15. semaphore.acquire();
  16. System.out.println(Thread.currentThread().getName() + " 进入停车场");
  17. Thread.sleep(new Random().nextInt(10000));// 模拟车辆在停车场停留的时间
  18. System.out.println(Thread.currentThread().getName() + " 驶出停车场");
  19. semaphore.release();
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }, i + "号车");
  25. thread.start();
  26. }
  27. }
  28. }

在上面这段代码中,给出了 Semaphore 的初始容量,也就是只有 10 个车位,用这 10 个车位来控制 100 辆车的流量,所以结果和预想的很相似,即大部分车都在等待状态。但是同时仍允许一些车驶入停车场,驶入停车场的车辆,就会 semaphore.acquire 占用一个车位,驶出停车场时,就会 semaphore.release 让出一个车位,让后面的车再次驶入。

Semaphore 信号量的模型

上面代码虽然比较简单,但是却能让我们了解到一个信号量模型的五脏六腑。下面是一个信号量的模型:
2021-05-15-21-17-34-636779.png
来解释一下 Semaphore ,Semaphore 有一个初始容量,这个初始容量就是 Semaphore 所能够允许的信号量。在调用 Semaphore 中的 acquire 方法后,Semaphore 的容量 -1,相对的在调用 release 方法后,Semaphore 的容量 + 1,在这个过程中,计数器一直在监控 Semaphore 数量的变化,等到流量超过 Semaphore 的容量后,多余的流量就会放入等待队列中进行排队等待。等到 Semaphore 的容量允许后,方可重新进入。
Semaphore 所控制的流量其实就是一个个的线程,因为并发工具最主要的研究对象就是线程。
它的工作流程如下
2021-05-15-21-17-34-746180.png
这幅图应该很好理解吧,这里就不再过多解释。

Semaphore 深入理解

在了解 Semaphore 的基本使用和 Semaphore 的模型后,下面还是得从源码来和看一看Semaphore 的种种细节问题!

Semaphore 基本属性

Semaphore 中只有一个属性

  1. private final Sync sync;

Sync 是 Semaphore 的同步实现,Semaphore 保证线程安全性的方式和 ReentrantLock 、CountDownLatch 类似,都是继承于 AQS 的实现。同样的,这个 Sync 也是继承于AbstractQueuedSynchronizer的一个变量,也就是说,聊 Semaphore 也绕不开 AQS,所以说 AQS 真的太重要了。

Semaphore 的公平性和非公平性

那么进入 Sync 内部看看它实现了哪些方法

  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2. private static final long serialVersionUID = 1192457210091910933L;
  3. Sync(int permits) {
  4. setState(permits);
  5. }
  6. final int getPermits() {
  7. return getState();
  8. }
  9. final int nonfairTryAcquireShared(int acquires) {
  10. for (;;) {
  11. int available = getState();
  12. int remaining = available - acquires;
  13. if (remaining < 0 ||
  14. compareAndSetState(available, remaining))
  15. return remaining;
  16. }
  17. }
  18. protected final boolean tryReleaseShared(int releases) {
  19. for (;;) {
  20. int current = getState();
  21. int next = current + releases;
  22. if (next < current) // overflow
  23. throw new Error("Maximum permit count exceeded");
  24. if (compareAndSetState(current, next))
  25. return true;
  26. }
  27. }
  28. final void reducePermits(int reductions) {
  29. for (;;) {
  30. int current = getState();
  31. int next = current - reductions;
  32. if (next > current) // underflow
  33. throw new Error("Permit count underflow");
  34. if (compareAndSetState(current, next))
  35. return;
  36. }
  37. }
  38. final int drainPermits() {
  39. for (;;) {
  40. int current = getState();
  41. if (current == 0 || compareAndSetState(current, 0))
  42. return current;
  43. }
  44. }
  45. }

首先是 Sync 的初始化,内部调用了setState并传递了 permits,AQS 中的 State 其实就是同步状态的值,而 Semaphore 的这个 permits 就是代表了许可的数量。
getPermits 其实就是调用了 getState 方法获取了一下线程同步状态值。后面的 nonfairTryAcquireShared 方法其实是在 Semaphore 中构造了 NonfairSync 中的 tryAcquireShared 调用的
2021-05-15-21-17-34-854878.png
这里需要提及一下什么是NonfairSync,除了 NonfairSync 是不是还有 FairSync 呢?查阅 JDK 源码发现确实有。
那么这里的 FairSyncNonfairSync 都代表了什么?为什么会有这两个类呢?
事实上,Semaphore 就像 ReentrantLock 一样,也存在“公平”和”不公平”两种,默认情况下 Semaphore 是一种不公平的信号量
2021-05-15-21-17-34-969722.png
Semaphore 的不公平意味着它不会保证线程获得许可的顺序,Semaphore 会在线程等待之前为调用 acquire 的线程分配一个许可,拥有这个许可的线程会自动将自己置于线程等待队列的头部。
当这个参数为 true 时,Semaphore 确保任何调用 acquire 的方法,都会按照先入先出的顺序来获取许可。

  1. final int nonfairTryAcquireShared(int acquires) {
  2. for (;;) {
  3. // 获取同步状态值
  4. int available = getState();
  5. // state 的值 - 当前线程需要获取的信号量(通常默认是 -1),只有
  6. // remaining > 0 才表示可以获取。
  7. int remaining = available - acquires;
  8. // 先判断是否小于 0 ,如果小于 0 则表示无法获取,如果是正数
  9. // 就需要使用 CAS 判断内存值和同步状态值是否一致,然后更新为同步状态值 - 1
  10. if (remaining < 0 ||
  11. compareAndSetState(available, remaining))
  12. return remaining;
  13. }
  14. }

2021-05-15-21-17-35-065465.png
从上面这幅源码对比图可以看到,NonfairSyncFairSync 最大的区别就在于tryAcquireShared方法的区别。
NonfairSync 版本中,是不会管当前等待队列中是否有排队许可的,它会直接判断信号许可量和 CAS 方法的可行性。
FairSync 版本中,它首先会判断是否有许可进行排队,如果有的话就直接获取失败。
上面说公平性和非公平性的区别一直针对的是 acquire 方法来说的,怎么现在他们两个主要的区别在于tryAcquireShared方法呢?
进入到acquire方法一探究竟
2021-05-15-21-17-35-183169.png
可以看到,在 acquire 方法中,会调用 tryAcquireShared 方法,根据其返回值判断是否调用doAcquireSharedInterruptibly方法
这里需要注意下,acquire 方法具有阻塞性,而 tryAcquire 方法不具有阻塞性。
这也就是说,调用 acquire 方法如果获取不到许可,那么 Semaphore 会阻塞,直到有可用的许可。而 tryAcquire 方法如果获取不到许可会直接返回 false
这里还需要注意下acquireUninterruptibly方法,其他 acquire 的相关方法要么是非阻塞,要么是阻塞可中断,而 acquireUninterruptibly 方法不仅在没有许可的情况下执着的等待,而且也不会中断,使用这个方法时需要注意,这个方法很容易在出现大规模线程阻塞而导致 Java 进程出现假死的情况。
有获取许可相对应的就有释放许可,但是释放许可不会区分到底是公平释放还是非公平释放。不管方式如何都是释放一个许可给 Semaphore ,同样的 Semaphore 中的许可数量会增加。
2021-05-15-21-17-35-268033.png
在上图中调用 tryReleaseShared 判断是否能进行释放后,再会调用 AQS 中的releasedShared方法进行释放。
2021-05-15-21-17-35-355651.png
上面这个释放流程只是释放一个许可,除此之外,还可以释放多个许可

  1. public void release(int permits) {
  2. if (permits < 0) throw new IllegalArgumentException();
  3. sync.releaseShared(permits);
  4. }

后面这个 releaseShared 的释放流程和上面的释放流程一致。

其他 Semaphore 方法

除了上面基本的 acquirerelease 相关方法外,也要了解一下 Semaphore 的其他方法。Semaphore 的其他方法比较少,只有下面这几个
drainPermits:获取并退还所有立即可用的许可,其实相当于使用 CAS 方法把内存值置为 0
reducePermits:和nonfairTryAcquireShared方法类似,只不过 nonfairTryAcquireShared 是使用 CAS 使内存值 + 1,而 reducePermits 是使内存值 - 1 。
isFair:对 Semaphore 许可的争夺是采用公平还是非公平的方式,对应到内部的实现就是 FairSync 和 NonfairSync。
hasQueuedThreads:当前是否有线程由于要获取 Semaphore 许可而进入阻塞。
getQueuedThreads:返回一个包含了等待获取许可的线程集合。
getQueueLength:获取正在排队而进入阻塞状态的线程个数。