封面:详解AQS家族的成员:Semaphore.png

王有志,一个分享硬核Java技术的互金摸鱼侠
加入Java人的提桶跑路群:共同富裕的Java人

今天我们来聊一聊AQS家族中另一个重要成员Semaphore,我只收集到了一道关于Semaphore的面试题,问了问“是什么”和“如何实现的”:

  • 什么是Semaphore?它是如何实现的?

按照我们的惯例,依旧是按照“是什么”,“怎么用”和“如何实现的”这3步来分析Semaphore。

Semaphore的使用

Semaphore直译过来是信号量,是计算机科学中非常Old School的处理同步与互斥的机制与互斥锁不同的是它允许指定数量的线程或进程访问共享资源
Semaphore处理同步与互斥的机制和我们平时过地铁站的闸机非常相似。刷卡打开闸机(acquire操作),通过后(访问临界区)闸机关闭(release操作),后面的人才能够继续刷卡,而在前一个人通过前,后面的人只能排队等候(队列机制)。当然,地铁站不可能只有一个闸机,拥有几个闸机,就允许几个人同时通过。
图1:地铁闸机.jpeg
信号量也是这样的,通过构造函数定义许可数量,使用时申请许可,处理完业务逻辑后释放许可:

  1. // 信号量中定义1个许可
  2. Semaphore semaphore = new Semaphore(1);
  3. // 申请许可
  4. semaphore.acquire();
  5. ......
  6. // 释放许可
  7. semaphore.release();

当我们为Semaphore定义一个许可时,它和互斥锁相同,同一时间只允许一个线程进入临界区。但是当我们定义了多个许可时,它与互斥锁的差异就体现出来了:

  1. Semaphore semaphore = new Semaphore(3);
  2. for(int i = 1; i < 5; i++) {
  3. int finalI = i;
  4. new Thread(()-> {
  5. try {
  6. semaphore.acquire();
  7. System.out.println("第[" + finalI + "]个线程获取到semaphore");
  8. TimeUnit.SECONDS.sleep(10);
  9. semaphore.release();
  10. } catch (InterruptedException e) {
  11. throw new RuntimeException(e);
  12. }
  13. }).start();
  14. }

执行这段代码可以看到,同一时间3个线程都进入了临界区,只有第4个线程被挡在了临界区外。

Semaphore的实现原理

还记得在《AQS的今生,构建出JUC的基础》中提到的同步状态吗?我们当时说它是某些同步器的计数器:

AQS中,state不仅用作表示同步状态,也是某些同步器实现的计数器,如:Semaphore中允许通过的线程数量,ReentrantLock中可重入特性的实现,都依赖于state作为计数器的特性。

先来看Semaphore与AQS的关系:
图2:Semaphore类图.png
与ReentrantLock一样,Semaphore内部实现了继承自AQS的同步器抽象类Sync,并有FairSync和NonfairSync两个实现类。接下来我们就通过剖析Semaphore的源码,来验证我们之前的说法。

构造方法

Semaphore提供了两个构造方法:

  1. public Semaphore(int permits) {
  2. sync = new NonfairSync(permits);
  3. }
  4. public Semaphore(int permits, boolean fair) {
  5. sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  6. }

可以看到Semaphore和ReentrantLock的设计思路是一致的,Semaphore内部也实现了两个同步器FairSync和NonfairSync,分别实现公平模式和非公平模式,而Semaphore的构造本质上是构造同步器的实现。我们以非公平模式的NonfairSync的实现为例:

  1. public class Semaphore implements java.io.Serializable {
  2. static final class NonfairSync extends Sync {
  3. NonfairSync(int permits) {
  4. super(permits);
  5. }
  6. }
  7. abstract static class Sync extends AbstractQueuedSynchronizer {
  8. Sync(int permits) {
  9. setState(permits);
  10. }
  11. }
  12. }
  13. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  14. protected final void setState(int newState) {
  15. state = newState;
  16. }
  17. }

追根溯源,构造器的参数permits最终还是回归到了AQS的state身上,借助了state作为计数器的特性来实现Semaphore的功能。

acquire方法

现在我们已经为Semaphore设置了一定数量的许可(permits),接下来我们就需要通过Semaphore#acquire方法获取许可,进入Semaphore所“守护”的临界区:

  1. public class Semaphore implements java.io.Serializable {
  2. public void acquire() throws InterruptedException {
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. }
  6. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  7. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  8. if (Thread.interrupted()) {
  9. throw new InterruptedException();
  10. }
  11. if (tryAcquireShared(arg) < 0) {
  12. doAcquireSharedInterruptibly(arg);
  13. }
  14. }
  15. }

这两步和ReentrantLock非常相似,先通过tryAcquireShared尝试直接获取许可,失败后通过doAcquireSharedInterruptibly加入到等待队列中。
Semaphore中直接获取许可的逻辑非常简单:

  1. static final class NonfairSync extends Sync {
  2. protected int tryAcquireShared(int acquires) {
  3. return nonfairTryAcquireShared(acquires);
  4. }
  5. }
  6. abstract static class Sync extends AbstractQueuedSynchronizer {
  7. final int nonfairTryAcquireShared(int acquires) {
  8. for (;;) {
  9. // 获取可用许可数量
  10. int available = getState();
  11. // 计算许可数量
  12. int remaining = available - acquires;
  13. if (remaining < 0 || compareAndSetState(available, remaining)) {
  14. return remaining;
  15. }
  16. }
  17. }
  18. }

首先是获取并减少可用许可的数量,当许可数量小于0时返回一个负数,或通过CAS更新许可数量成功后,返回一个正数。此时doAcquireSharedInterruptibly会将当前的申请Semaphore许可的线程添加到AQS的等待队列中。

  1. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  2. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  3. // 创建共享模式的等待节点
  4. final Node node = addWaiter(Node.SHARED);
  5. try {
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. // 再次尝试获取许可,并返回剩余许可数量
  10. int r = tryAcquireShared(arg);
  11. if (r >= 0) {
  12. // 获取成功,更新头节点
  13. setHeadAndPropagate(node, r);
  14. p.next = null;
  15. return;
  16. }
  17. }
  18. // 获取失败进入等待状态
  19. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
  20. throw new InterruptedException();
  21. }
  22. }
  23. } catch (Throwable t) {
  24. cancelAcquire(node);
  25. throw t;
  26. }
  27. }
  28. }

Semaphore的使用的doAcquireSharedInterruptibly与ReentrantLock使用的acquireQueued方法核心逻辑一直,但是有细微的实现差别:

  • 创建节点使用Node.SHARED模式;
  • 更新头节点使用了setHeadAndPropagate方法。

    1. private void setHeadAndPropagate(Node node, int propagate) {
    2. Node h = head;
    3. setHead(node);
    4. // 是否要唤醒等待中的节点
    5. if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
    6. Node s = node.next;
    7. if (s == null || s.isShared()) {
    8. // 唤醒等待中的节点
    9. doReleaseShared();
    10. }
    11. }
    12. }

    我们知道在ReentrantLock中执行acquireQueued,当成功获取锁后,只需要执行setHead(node)即可,那么为什么Semaphore还要再进行唤醒?

假设有3个许可的Semaphore同时有T1,T2,T3和T4总计4个线程竞争:

  • 它们同时进入nonfairTryAcquireShared方法,假设只有T1通过compareAndSetState(available, remaining)成功修改有效的许可数量,T1进入临界区;
  • T2,T3和T4进入doAcquireSharedInterruptibly方法,通过addWaiter(Node.SHARED)构建出AQS的等待队列(参考AQS的今生中关于addWaiter方法的分析);
  • 假设T2成为了头节点的直接后继节点,T2再次执行tryAcquireShared尝试获取许可,T3和T4执行parkAndCheckInterrupt;
  • T2成功获取许可并进入临界区,此时Semaphore剩余1个许可,而T3和T4处于暂停状态中。

这种场景中,只有两个许可产生了作用,显然不符合我们对的初衷,因此在执行setHeadAndPropagate更新头节点时,判断剩余许可的数量,当数量大于0时继续唤醒后继节点。
Tips

  • Semaphore在获取许可的流程与ReentrantLock加锁的过程高度相似~~
  • 下文分析doReleaseShared是如何唤醒等待中节点的。

    release方法

    Semaphore的release方法就非常简单了:
  1. public class Semaphore implements java.io.Serializable {
  2. public void release() {
  3. sync.releaseShared(1);
  4. }
  5. abstract static class Sync extends AbstractQueuedSynchronizer {
  6. protected final boolean tryReleaseShared(int releases) {
  7. for (;;) {
  8. int current = getState();
  9. // 计算许可数量
  10. int next = current + releases;
  11. if (next < current) {
  12. throw new Error("Maximum permit count exceeded");
  13. }
  14. // 通过CAS更新许可数量
  15. if (compareAndSetState(current, next)) {
  16. return true;
  17. }
  18. }
  19. }
  20. }
  21. }
  22. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  23. public final boolean releaseShared(int arg) {
  24. if (tryReleaseShared(arg)) {
  25. doReleaseShared();
  26. return true;
  27. }
  28. return false;
  29. }
  30. private void doReleaseShared() {
  31. for (;;) {
  32. Node h = head;
  33. // 判断AQS的等待队列是否为空
  34. if (h != null && h != tail) {
  35. int ws = h.waitStatus;
  36. // 判断当前节点是否处于待唤醒的状态
  37. if (ws == Node.SIGNAL) {
  38. if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)){
  39. continue;
  40. }
  41. unparkSuccessor(h);
  42. } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) {
  43. // 状态为0时,更新节点的状态为无条件传播
  44. continue;
  45. }
  46. }
  47. if (h == head) {
  48. break;
  49. }
  50. }
  51. }
  52. }

我们可以看到Semaphore的release方法分了两部分:

  • tryReleaseShared方法更新Semaphore的有效许可数量;
  • doReleaseShared唤醒处于等待中的节点。

唤醒的逻辑并不复杂,依旧是对节点状态waitStatus的判断,来确定是否需要执行unparkSuccessor,当状态为ws == 0,会将节点的状态更新为Node#PROPAGAT,即无条件传播。
Tips:与ReentrantLock所不同的是,Semaphore并不支持Node#CONDITION状态,同样的ReentrantLock也不支持Node#PROPAGATE状态。

结语

关于Semaphore的内容到这里就结束了,今天我们只具体分析了非公平模式下核心方法的实现,至于公平模式的实现,以及其它方法的实现,就留个大家自行探索了。


如果本文对你有帮助的话,还请多多点赞支持。如果文章中出现任何错误,还请批评指正。最后欢迎大家关注分享硬核Java技术的金融摸鱼侠王有志,我们下次再见!