Semaphore(信号量) 又是一个并发包里的常用组件。它是一个计数信号量,包含一组许可证,可以用acquire方法阻塞,直到获取一个许可证,可以用release方法释放一个许可证。Semaphore只是对可获取的数量进行维护,并没有真正的创建许可证对象。

使用举例

简单介绍一下Semaphore的使用,举一个LeetCode的题为例

https://leetcode-cn.com/problems/print-in-order/ 我们提供了一个类: public class Foo {   public void one() { print(“one”); }   public void two() { print(“two”); }   public void three() { print(“three”); } } 三个不同的线程将会共用一个 Foo 实例。   线程 A 将会调用 one() 方法   线程 B 将会调用 two() 方法   线程 C 将会调用 three() 方法 请设计修改程序,以确保 two() 方法在 one() 方法之后被执行,three() 方法在 two() 方法之后被执行。

这道题有很多种解法,这里用信号量来实现一种解法

  1. public class Foo {
  2. //既然有三个方法,可以定义一个有3个许可证的信号量,而且必须得是非公平的
  3. private Semaphore semaphore = new Semaphore(3);
  4. public void first(Runnable printFirst) throws InterruptedException {
  5. for(;;) {
  6. //自旋阻塞,当信号量可用凭证为3时,获取一个凭证
  7. if (semaphore.availablePermits() == 3) {
  8. //初始化凭证数量就是3,那么一开始肯定先进入了这个分支中
  9. printFirst.run();
  10. //获取一个凭证,可用凭证数量变为2
  11. semaphore.acquire();
  12. break;
  13. }
  14. }
  15. }
  16. public void second(Runnable printSecond) throws InterruptedException {
  17. for(;;) {
  18. //当first获取了一个凭证后可用数量减少,
  19. //那么此时second可以进入条件分支
  20. if (semaphore.availablePermits() == 2) {
  21. printSecond.run();
  22. semaphore.acquire();
  23. break;
  24. }
  25. }
  26. }
  27. public void third(Runnable printThird) throws InterruptedException {
  28. for(;;) {
  29. //同上
  30. if (semaphore.availablePermits() == 1) {
  31. printThird.run();
  32. semaphore.acquire();
  33. break;
  34. }
  35. }
  36. }
  37. }

上文代码中所使用的acquire方法是阻塞的,若不想阻塞可以使用tryAcquire

事实上Semaphore对于凭证数量的管理也是借鉴了AQS的status属性实现的。

常用的几个方法的源码实现

acquire

  1. public void acquire() throws InterruptedException {
  2. sync.acquireSharedInterruptibly(1);
  3. }

直接调用了AQS的acquireSharedInterruptibly方法

  1. public final void acquireSharedInterruptibly(int arg)
  2. throws InterruptedException {
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. if (tryAcquireShared(arg) < 0)
  6. doAcquireSharedInterruptibly(arg);
  7. }

acquireSharedInterruptibly方法里首先判断线程是不是被中断了,是的话抛出异常。然后调用了tryAcquireShared方法来尝试获取一个信号量,如果返回值小于0,表示获取失败,将会调用doAcquireSharedInterruptibly方法使当前线程进入队列进行等待。由于这一步存在等待操作,所以方法是阻塞的

tryAcquireShared方法是AQS留给子类实现的模板方法,在Semaphore中有公平和非公平的两种实现,以非公平为例,具体的实现是在Semaphore的内部类Sync的nonfairTryAcquireShared方法中

  1. final int nonfairTryAcquireShared(int acquires) {
  2. for (;;) {
  3. int available = getState();
  4. int remaining = available - acquires;
  5. if (remaining < 0 ||
  6. compareAndSetState(available, remaining))
  7. return remaining;
  8. }
  9. }

在Semaphore中,AQS的status用来表示当前剩余可用的凭证数量

先把需要获取的数量减下去,然后尝试用CAS的方式去设置。

如果需要获取的数量超过了当前剩余的凭证数量,则方法返回值将小于0,如果设置成功,那么返回值将大于0。在acquireSharedInterruptibly方法分析我们已经提到了,返回值小于0的话会进入doAcquireSharedInterruptibly方法,将当前线程构造成一个node然后进入AQS的队列进行等待。简单的说就是获取失败,则线程阻塞等待。

release

  1. public void release() { sync.releaseShared(1); }

release方法中同样的,直接调用了AQS的releaseShared方法

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

在AQS的releaseShared方法中,首先调用了模板方法tryReleaseShared来对sttaus进行加操作,如果成功,调用doReleaseShared方法做锁释放的后续操作,例如唤醒后续节点等。

tryReleaseShared方法的实现也是在Semaphore的内部类Sync中

  1. protected final boolean tryReleaseShared(int releases) {
  2. for (;;) {
  3. int current = getState();
  4. int next = current + releases;
  5. if (next < current) // overflow
  6. throw new Error("Maximum permit count exceeded");
  7. if (compareAndSetState(current, next))
  8. return true;
  9. }
  10. }

tryReleaseShared方法中,首先将当前的凭证数加上释放的数,然后通过CAS去设置。设置成功则返回true,否则将一直等待直到成功。成功后,如上文所说,会调用doReleaseShared方法进行一些后续的操作。

drainPermits

这个方法的作用是让当前线程获取剩余的所有许可证。实现也很简单,通过CAS将status设置成0

  1. final int drainPermits() {
  2. for (;;) {
  3. int current = getState();
  4. if (current == 0 || compareAndSetState(current, 0))
  5. return current;
  6. }
  7. }

tryAcquire

tryAcquire与acquire方法相对应,是acquire的非阻塞实现,调用的方法和acquire一样,唯一不同的是,调用失败以后不会将当前线程加入到队列中等待,而是直接返回

  1. public boolean tryAcquire() {
  2. return sync.nonfairTryAcquireShared(1) >= 0;
  3. }

搬运自我的简书 https://www.jianshu.com/p/f914eea354e9