Semaphore 信号量也是 Java 中的一个同步器,与 CountDownLatch 和 CycleBarrier 不同的是,它内部的计数器是递增的,并且在一开始初始化 Semaphore 时可以指定一个初始值,但是并不需要知道需要同步的线程个数,而是在需要同步的地方调用 acquire 方法时指定需要同步的线程个数

案例介绍

同样下面的例子也是在主线程中开启两个子线程让它们执行,等所有子线程执行完毕后主线程再继续向下运行。

  1. public class SemaphoreTest {
  2. // 创建一个 Semaphore 实例
  3. private static Semaphore semaphore = new Semaphore(0);
  4. public static void mainString[] args throws InterruptedException {
  5. ExecutorService executorService = Executors.newFixedThreadPool(2);
  6. // 将线程 A 添加到线程池
  7. executorService.submit(new Runnable() {
  8. public void run() {
  9. try {
  10. System.out.println(Thread.currentThread() + over」);
  11. semaphore.release();
  12. } catch Exception e {
  13. e.printStackTrace();
  14. }
  15. }
  16. });
  17. // 将线程 B 添加到线程池
  18. executorService.submit(new Runnable() {
  19. public void run() {
  20. try {
  21. System.out.println(Thread.currentThread() + over」);
  22. semaphore.release();
  23. } catch Exception e {
  24. e.printStackTrace();
  25. }
  26. }
  27. });
  28. // 等待子线程执行完毕,返回
  29. semaphore.acquire(2);
  30. System.out.println(「all child thread over 」);
  31. //关闭线程池
  32. executorService.shutdown();
  33. }
  34. }

输出结果如下。

信号量 Semaphore 原理探究 - 图1

如上代码首先创建了一个信号量实例,构造函数的入参为 0,说明当前信号量计数器的值为 0。然后 main 函数向线程池添加两个线程任务,在每个线程内部调用信号量的 release 方法,这相当于让计数器值递增 1。最后在 main 线程里面调用信号量的 acquire 方法,传参为 2 说明调用 acquire 方法的线程会一直阻塞,直到信号量的计数变为 2 才会返回。看到这里也就明白了,如果构造 Semaphore 时传递的参数为N,并在M个线程中调用了该信号量的 release 方法,那么在调用 acquire 使M个线程同步时传递的参数应该是M+N

下面举个例子来模拟 CyclicBarrier 复用的功能,代码如下。

  1. public class SemaphoreTest2 {
  2. // 创建一个 Semaphore 实例
  3. private static volatile Semaphore semaphore = new Semaphore(0);
  4. public static void mainString[] args throws InterruptedException {
  5. ExecutorService executorService = Executors.newFixedThreadPool(2);
  6. // 将线程 A 添加到线程池
  7. executorService.submit(new Runnable() {
  8. public void run() {
  9. try {
  10. System.out.println(Thread.currentThread() + A task over」);
  11. semaphore.release();
  12. } catch Exception e {
  13. e.printStackTrace();
  14. }
  15. }
  16. });
  17. // 将线程 B 添加到线程池
  18. executorService.submit(new Runnable() {
  19. public void run() {
  20. try {
  21. System.out.println(Thread.currentThread() + A task over」);
  22. semaphore.release();
  23. } catch Exception e {
  24. e.printStackTrace();
  25. }
  26. }
  27. });
  28. // (1)等待子线程执行任务 A 完毕,返回
  29. semaphore.acquire(2);
  30. // 将线程 c 添加到线程池
  31. executorService.submit(new Runnable() {
  32. public void run() {
  33. try {
  34. System.out.println(Thread.currentThread() + B task over」);
  35. semaphore.release();
  36. } catch Exception e {
  37. e.printStackTrace();
  38. }
  39. }
  40. });
  41. // 将线程 d 添加到线程池
  42. executorService.submit(new Runnable() {
  43. public void run() {
  44. try {
  45. System.out.println(Thread.currentThread() + B task over」);
  46. semaphore.release();
  47. } catch Exception e {
  48. e.printStackTrace();
  49. }
  50. }
  51. });
  52. // (2)等待子线程执行 B 完毕,返回
  53. semaphore.acquire(2);
  54. System.out.println(「task is over」);
  55. // 关闭线程池
  56. executorService.shutdown();
  57. }
  58. }

输出结果为

信号量 Semaphore 原理探究 - 图2

如上代码首先将线程 A 和线程 B 加入到线程池。主线程执行代码(1)后被阻塞。线程 A 和线程 B 调用 release 方法后信号量的值变为了 2,这时候主线程的 aquire 方法会在获取到 2 个信号量后返回(返回后当前信号量值为 0)。然后主线程添加线程 C 和线程 D 到线程池,之后主线程执行代码(2)后被阻塞(因为主线程要获取 2 个信号量,而当前信号量个数为 0)。当线程 C 和线程 D 执行完 release 方法后,主线程才返回。从本例子可以看出,Semaphore 在某种程度上实现了 CyclicBarrier 的复用功能。

实现原理探究

为了能够一览 Semaphore 的内部结构,首先看下 Semaphore 的类图,如图 10-3 所示。

信号量 Semaphore 原理探究 - 图3

图 10-3

由该类图可知,Semaphore 还是使用 AQS 实现的。Sync 只是对 AQS 的一个修饰,并且 Sync 有两个实现类,用来指定获取信号量时是否采用公平策略。例如,下面的代码在创建 Semaphore 时会使用一个变量指定是否使用公平策略。

  1. public Semaphore(int permits) {
  2. sync = new NonfairSync(permits);
  3. }
  4. public Semaphore(int permits, boolean fair) {
  5. sync = fair ? new FairSync(permits) : new
  6. NonfairSync(permits);
  7. }
  8. Sync(int permits) {
  9. setState(permits);
  10. }

在如上代码中,Semaphore 默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造 Semaphore 对象。另外,如 CountDownLatch 构造函数传递的初始化信号量个数 permits 被赋给了 AQS 的 state 状态变量一样,这里 AQS 的 state 值也表示当前持有的信号量个数。

下面来看 Semaphore 实现的主要方法。

1.void acquire() 方法

当前线程调用该方法的目的是希望获取一个信号量资源。如果当前信号量个数大于 0,则当前信号量的计数会减 1,然后该方法直接返回。否则如果当前信号量个数等于 0,则当前线程会被放入 AQS 的阻塞队列。当其他线程调用了当前线程的 interrupt()方法中断了当前线程时,则当前线程会抛出 InterruptedException 异常返回。下面看下代码实现。

  1. public void acquire() throws InterruptedException {
  2. //传递参数为 1,说明要获取 1 个信号量资源
  3. sync.acquireSharedInterruptibly(1);
  4. }
  5. public final void acquireSharedInterruptiblyint arg
  6. throws InterruptedException {
  7. //(1)如果线程被中断,则抛出中断异常
  8. if Thread.interrupted())
  9. throw new InterruptedException();
  10. //(2)否则调用 Sync 子类方法尝试获取,这里根据构造函数确定使用公平策略
  11. if tryAcquireShared(arg) < 0
  12. //如果获取失败则放入阻塞队列。然后再次尝试,如果失败则调用 park 方法挂起当前线程
  13. doAcquireSharedInterruptiblyarg);
  14. }

由如上代码可知,acquire()在内部调用了 Sync 的 acquireSharedInterruptibly 方法,后者会对中断进行响应(如果当前线程被中断,则抛出中断异常)。尝试获取信号量资源的 AQS 的方法 tryAcquireShared 是由 Sync 的子类实现的,所以这里分别从两方面来讨论。先讨论非公平策略 NonfairSync 类的 tryAcquireShared 方法,代码如下。

  1. protected int tryAcquireSharedint acquires {
  2. return nonfairTryAcquireSharedacquires);
  3. }
  4. final int nonfairTryAcquireSharedint acquires {
  5. for (; ; {
  6. //获取当前信号量值
  7. int available = getState();
  8. //计算当前剩余值
  9. int remaining = available - acquires
  10. //如果当前剩余值小于 0 或者 CAS 设置成功则返回
  11. if remaining < 0 ||
  12. compareAndSetState(available, remaining))
  13. return remaining
  14. }
  15. }

如上代码先获取当前信号量值(available),然后减去需要获取的值(acquires),得到剩余的信号量个数(remaining),如果剩余值小于 0 则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入 AQS 的阻塞队列而被挂起。如果剩余值大于 0,则使用 CAS 操作设置当前信号量值为剩余值,然后返回剩余值。

另外,由于 NonFairSync 是非公平获取的,也就是说先调用 aquire 方法获取信号量的线程不一定比后来者先获取到信号量。考虑下面场景,如果线程 A 先调用了 aquire()方法获取信号量,但是当前信号量个数为 0,那么线程 A 会被放入 AQS 的阻塞队列。过一段时间后线程 C 调用了 release()方法释放了一个信号量,如果当前没有其他线程获取信号量,那么线程 A 就会被激活,然后获取该信号量,但是假如线程 C 释放信号量后,线程 C 调用了 aquire 方法,那么线程 C 就会和线程 A 去竞争这个信号量资源。如果采用非公平策略,由 nonfairTryAcquireShared 的代码可知,线程 C 完全可以在线程 A 被激活前,或者激活后先于线程 A 获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。下面看公平性的 FairSync 类是如何保证公平性的。

  1. protected int tryAcquireShared(int acquires) {
  2. for (; ; ) {
  3. if (hasQueuedPredecessors())
  4. return -1;
  5. int available = getState();
  6. int remaining = available - acquires;
  7. if (remaining < 0 ||
  8. compareAndSetState(available, remaining))
  9. return remaining;
  10. }
  11. }

可见公平性还是靠 hasQueuedPredecessors 这个函数来保证的。前面章节讲过,公平策略是看当前线程节点的前驱节点是否也在等待获取该资源,如果是则自己放弃获取的权限,然后当前线程会被放入 AQS 阻塞队列,否则就去获取。

2.void acquire(int permits)方法

该方法与 acquire()方法不同,后者只需要获取一个信号量值,而前者则获取 permits 个

  1. public void acquire(int permits) throws InterruptedException {
  2. if (permits < 0) throw new IllegalArgumentException();
  3. sync.acquireSharedInterruptibly(permits);
  4. }

3.void acquireUninterruptibly() 方法

该方法与 acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly 获取资源时(包含被阻塞后),其他线程调用了当前线程的 interrupt()方法设置了当前线程的中断标志,此时当前线程并不会抛出 InterruptedException 异常而返回。

  1. public void acquireUninterruptibly() {
  2. sync.acquireShared(1);
  3. }

4.void acquireUninterruptibly(int permits)方法

该方法与 acquire(int permits)方法的不同之处在于,该方法对中断不响应。

  1. public void acquireUninterruptibly(int permits) {
  2. if (permits < 0) throw new IllegalArgumentException();
  3. sync.acquireShared(permits);
  4. }

5.void release() 方法

该方法的作用是把当前 Semaphore 对象的信号量值增加 1,如果当前有线程因为调用 aquire 方法被阻塞而被放入了 AQS 的阻塞队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活,激活的线程会尝试获取刚增加的信号量,下面看代码实现。

  1. public void release() {
  2. //(1)arg=1
  3. sync.releaseShared(1);
  4. }
  5. public final boolean releaseSharedint arg {
  6. //(2)尝试释放资源
  7. if tryReleaseShared(arg)) {
  8. //(3)资源释放成功则调用 park 方法唤醒 AQS 队列里面最先挂起的线程
  9. doReleaseShared();
  10. return true
  11. }
  12. return false
  13. }
  14. protected final boolean tryReleaseSharedint releases {
  15. for (; ; {
  16. //(4)获取当前信号量值
  17. int current = getState();
  18. //(5)将当前信号量值增加 releases,这里为增加 1
  19. int next = current + releases
  20. if next < current // 移除处理
  21. throw new Error(「Maximum permit count exceeded」);
  22. //(6)使用 CAS 保证更新信号量值的原子性
  23. if compareAndSetState(current, next))
  24. return true
  25. }
  26. }

由代码 release()->sync.releaseShared(1)可知,release 方法每次只会对信号量值增加 1,tryReleaseShared 方法是无限循环,使用 CAS 保证了 release 方法对信号量递增 1 的原子性操作。tryReleaseShared 方法增加信号量值成功后会执行代码(3),即调用 AQS 的方法来激活因为调用 aquire 方法而被阻塞的线程。

6.void release(int permits)方法

该方法与不带参数的 release 方法的不同之处在于,前者每次调用会在信号量值原来的基础上增加 permits,而后者每次增加 1。

  1. public void release(int permits) {
  2. if (permits < 0) throw new IllegalArgumentException();
  3. sync.releaseShared(permits);
  4. }

另外可以看到,这里的 sync.releaseShared 是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用 CAS 去更新信号量的值而不会被阻塞。

小结

本节首先通过案例介绍了 Semaphore 的使用方法,Semaphore 完全可以达到 CountDownLatch 的效果,但是 Semaphore 的计数器是不可以自动重置的,不过通过变相地改变 aquire 方法的参数还是可以实现 CycleBarrier 的功能的。然后介绍了 Semaphore 的源码实现,Semaphore 也是使用 AQS 实现的,并且获取信号量时有公平策略和非公平策略之分。

总结

本章介绍了并发包中关于线程协作的一些重要类。

首先 CountDownLatch 通过计数器提供了更灵活的控制,只要检测到计数器值为 0,就可以往下执行,这相比使用 join 必须等待线程执行完毕后主线程才会继续向下运行更灵活。

另外,CyclicBarrier 也可以达到 CountDownLatch 的效果,但是后者在计数器值变为 0 后,就不能再被复用,而前者则可以使用 reset 方法重置后复用,前者对同一个算法但是输入参数不同的类似场景比较适用。

而 Semaphore 采用了信号量递增的策略,一开始并不需要关心同步的线程个数,等调用 aquire 方法时再指定需要同步的个数,并且提供了获取信号量的公平性策略。

使用本章介绍的类会大大减少你在 Java 中使用 wait、notify 等来实现线程同步的代码量,在日常开发中当需要进行线程同步时使用这些同步类会节省很多代码并且可以保证正确性。