1. 概述

主要介绍一下使用AQS实现的并发工具类,包括以下几个:

  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • Exchanger

2. CountDownLatch

CountDownLatch,翻译一下,倒数门闩,再翻译一下,倒数结束,门闩才会打开。该工具类主要用于让一个或者多个线程等待其他线程结束。CountDownLatch主要有三个方法,如下所示:

  1. public CountDownLatch(int count) 构造方法,指定倒数的数量,该数量不可被重置。
  2. public void countDown() 倒数,即倒数的数量-1
  3. public void await() 阻塞等待倒数结束

2.1 使用

以下为CountDownLatch的使用代码,main线程在调用了await方法之后会被阻塞,等待其他线程调用countDown方法使计数器减一,直至减到0。

  1. @Test
  2. @SuppressWarnings("all")
  3. public void testCountDownLatch() throws InterruptedException {
  4. CountDownLatch countDownLatch = new CountDownLatch(4);
  5. for (int i = 0; i < 4; i++) {
  6. new Thread(() -> {
  7. try {
  8. // doBusiness
  9. Thread.sleep(2000);
  10. System.out.println(Thread.currentThread().getName() + " 释放");
  11. countDownLatch.countDown();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }).start();
  16. }
  17. System.out.println(Thread.currentThread().getName() + " 阻塞等待其他线程");
  18. countDownLatch.await();
  19. System.out.println(Thread.currentThread().getName() + " 阻塞结束");
  20. }

运行结果:

  1. main 阻塞等待其他线程
  2. Thread-1 释放
  3. Thread-3 释放
  4. Thread-2 释放
  5. Thread-0 释放
  6. main 阻塞结束

2.2 实现

CountDownLatch的实现主要依赖其静态内部类Sync,该类继承了AQS并且实现了共享模式。代码如下所示:

  1. private static final class Sync extends AbstractQueuedSynchronizer {
  2. private static final long serialVersionUID = 4982264981922014374L;
  3. /**
  4. * 设置同步状态
  5. */
  6. Sync(int count) {
  7. setState(count);
  8. }
  9. /**
  10. * 获取同步状态
  11. */
  12. int getCount() {
  13. return getState();
  14. }
  15. /**
  16. * 获取同步状态,即CountDownLatch的await方法,当当前同步状态不为0时,阻塞等待
  17. */
  18. protected int tryAcquireShared(int acquires) {
  19. return (getState() == 0) ? 1 : -1;
  20. }
  21. /**
  22. * 释放同步状态,调用countDown方法时,会将同步状态减一
  23. * 当同步状态==0时,该方法返回true,AQS会唤醒等待队列的后继线程(即调用CountDownLatch.await方法而阻塞的线程)
  24. */
  25. protected boolean tryReleaseShared(int releases) {
  26. // Decrement count; signal when transition to zero
  27. for (;;) {
  28. int c = getState();
  29. if (c == 0)
  30. return false;
  31. int nextc = c-1;
  32. if (compareAndSetState(c, nextc))
  33. return nextc == 0;
  34. }
  35. }
  36. }

3. CyclicBarrier

CyclicBarrier,翻译一下,可循环使用的屏障。该工具类主要做的事情:让线程到达屏障时(即调用await方法)被阻塞,直到最后一个线程到达屏障,屏障才会被打开,被阻塞的线程才会继续运行。
ps:屏障拦截的线程数量可被重置
CyclicBarrier主要提供了以下方法:

  1. public CyclicBarrier(int parties) 构造方法,指定屏障拦截的线程数量
  2. public CyclicBarrierNote(int parties, Runnable barrierAction)
  3. 构造方法,指定屏障拦截的线程数量。最后一个到达屏障的线程会先执行barrierAction的逻辑,再继续自己的工作。
  4. public int await() 到达屏障,阻塞等待其他线程到达屏障

3.1 使用

  1. @Test
  2. public void testCyclicBarrier() throws BrokenBarrierException, InterruptedException {
  3. CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
  4. System.out.println(Thread.currentThread().getName() + " 执行barrierAction");
  5. });
  6. new Thread(() -> {
  7. try {
  8. Thread.sleep(1000);
  9. System.out.println(Thread.currentThread().getName() + " 到达屏障");
  10. cyclicBarrier.await();
  11. System.out.println(Thread.currentThread().getName() + " 执行业务");
  12. } catch (InterruptedException | BrokenBarrierException e) {
  13. e.printStackTrace();
  14. }
  15. }).start();
  16. System.out.println(Thread.currentThread().getName() + " 到达屏障");
  17. cyclicBarrier.await();
  18. System.out.println("屏障结束");
  19. }

运行结果:

  1. main 到达屏障
  2. Thread-0 到达屏障
  3. Thread-0 执行barrierAction
  4. Thread-0 执行业务
  5. 屏障结束

3.2 实现

CyclicBarrier的实现主要依赖于ReentrantLock以及Condition来实现的,具体看一下dowait方法,代码如下所示:

  1. private int dowait(boolean timed, long nanos)
  2. throws InterruptedException, BrokenBarrierException,
  3. TimeoutException {
  4. final ReentrantLock lock = this.lock;
  5. lock.lock();
  6. try {
  7. final Generation g = generation;
  8. // 判断当前是否被打破了
  9. if (g.broken)
  10. throw new BrokenBarrierException();
  11. // 判断当前是否被中断,被中断则打破屏障
  12. if (Thread.interrupted()) {
  13. breakBarrier();
  14. throw new InterruptedException();
  15. }
  16. int index = --count;
  17. // 如果是最后一个到达的线程,唤醒所有线程
  18. if (index == 0) { // tripped
  19. boolean ranAction = false;
  20. try {
  21. final Runnable command = barrierCommand;
  22. if (command != null)
  23. command.run();
  24. ranAction = true;
  25. // 产生新一代,唤醒所有线程
  26. nextGeneration();
  27. return 0;
  28. } finally {
  29. // 执行barrierCommand.run失败了,打破屏障
  30. if (!ranAction)
  31. breakBarrier();
  32. }
  33. }
  34. // loop until tripped, broken, interrupted, or timed out
  35. // 如果不是最后一个到达的线程,循环等待
  36. for (;;) {
  37. try {
  38. // 等待 和 超时等待 两种情况
  39. if (!timed)
  40. trip.await();
  41. else if (nanos > 0L)
  42. nanos = trip.awaitNanos(nanos);
  43. } catch (InterruptedException ie) {
  44. // g == generation && ! g.broken 说明此时当前代没有被别的线程打破屏障
  45. if (g == generation && ! g.broken) {
  46. breakBarrier();
  47. throw ie;
  48. }
  49. // 当前线程被中断了,但是此时应该是别的线程打破了屏障
  50. else {
  51. // We're about to finish waiting even if we had not
  52. // been interrupted, so this interrupt is deemed to
  53. // "belong" to subsequent execution.
  54. Thread.currentThread().interrupt();
  55. }
  56. }
  57. // 别的线程打破了屏障,抛出异常
  58. if (g.broken)
  59. throw new BrokenBarrierException();
  60. // 代 已经被别的线程更新
  61. if (g != generation)
  62. return index;
  63. // 等待超时了,打破屏障
  64. if (timed && nanos <= 0L) {
  65. breakBarrier();
  66. throw new TimeoutException();
  67. }
  68. }
  69. } finally {
  70. lock.unlock();
  71. }
  72. }

4. Semaphore

Semaphore,翻译一下,信号量。Semaphore是用来控制同时访问特定资源的线程数量。举个🌰,卫生间有三个位置,决定当前仅有三个人能进去,其他人则会在门口进行排队,只有当里面的人出来,外面的人才能够进去。

Semaphore 支持公平和非公平模式,默认非公平模式。

  • 公平模式无论是否有许可,都会判断是否线程在排队,如果有线程排队,获取线程立即失败,进入排队;
  • 非公平模式无论许可是否充足,直接尝试获取许可。

Semaphore主要有以下四个方法:

  1. public Semaphore(int permits) 定义可以访问共享资源的许可证数量
  2. public void acquire() 请求获取访问许可
  3. public void release() 是否访问许可

4.1 使用

  1. @Test
  2. @SuppressWarnings("all")
  3. public void testSemaphore() throws Exception{
  4. Semaphore restRoom = new Semaphore(2);
  5. for (int i = 0; i < 3; i++) {
  6. Thread thread = new Thread(() -> {
  7. try {
  8. System.out.println(Thread.currentThread().getId() + "想上厕所");
  9. Thread.sleep(500);
  10. restRoom.acquire(1);
  11. System.out.println(Thread.currentThread().getId() + "进入厕所");
  12. Thread.sleep(new Random().nextInt(2000));
  13. System.out.println(Thread.currentThread().getId() + "上完厕所");
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. } finally {
  17. restRoom.release();
  18. }
  19. });
  20. thread.start();
  21. }
  22. Thread.sleep(10000);
  23. }

运行结果:

  1. 10想上厕所
  2. 11想上厕所
  3. 12想上厕所
  4. 11进入厕所
  5. 10进入厕所
  6. 11上完厕所
  7. 12进入厕所
  8. 12上完厕所
  9. 10上完厕所

4.2 实现

Semaphore的实现主要依赖于其内部类Sync,具体代码如下所示:

  1. abstract static class Sync extends AbstractQueuedSynchronizer {
  2. private static final long serialVersionUID = 1192457210091910933L;
  3. /**
  4. * 构造方法,设置总许可数量
  5. */
  6. Sync(int permits) {
  7. setState(permits);
  8. }
  9. /**
  10. * 获取剩余许可数量
  11. */
  12. final int getPermits() {
  13. return getState();
  14. }
  15. /**
  16. * 非公平获取许可
  17. */
  18. final int nonfairTryAcquireShared(int acquires) {
  19. for (;;) {
  20. int available = getState();
  21. int remaining = available - acquires;
  22. if (remaining < 0 ||
  23. compareAndSetState(available, remaining))
  24. return remaining;
  25. }
  26. }
  27. /**
  28. * 自旋,释放许可
  29. */
  30. protected final boolean tryReleaseShared(int releases) {
  31. for (;;) {
  32. int current = getState();
  33. int next = current + releases;
  34. if (next < current) // overflow
  35. throw new Error("Maximum permit count exceeded");
  36. if (compareAndSetState(current, next))
  37. return true;
  38. }
  39. }
  40. /**
  41. * 自旋,减少许可数量
  42. */
  43. final void reducePermits(int reductions) {
  44. for (;;) {
  45. int current = getState();
  46. int next = current - reductions;
  47. if (next > current) // underflow
  48. throw new Error("Permit count underflow");
  49. if (compareAndSetState(current, next))
  50. return;
  51. }
  52. }
  53. /**
  54. * 丢弃所有许可
  55. */
  56. final int drainPermits() {
  57. for (;;) {
  58. int current = getState();
  59. if (current == 0 || compareAndSetState(current, 0))
  60. return current;
  61. }
  62. }
  63. }
  64. /**
  65. * 非公平模式
  66. */
  67. static final class NonfairSync extends Sync {
  68. private static final long serialVersionUID = -2694183684443567898L;
  69. NonfairSync(int permits) {
  70. super(permits);
  71. }
  72. protected int tryAcquireShared(int acquires) {
  73. return nonfairTryAcquireShared(acquires);
  74. }
  75. }
  76. /**
  77. * 公平模式
  78. */
  79. static final class FairSync extends Sync {
  80. private static final long serialVersionUID = 2014338818796000944L;
  81. FairSync(int permits) {
  82. super(permits);
  83. }
  84. protected int tryAcquireShared(int acquires) {
  85. for (;;) {
  86. // 公平非公平的区别主要就在这,公平模式时会判断当前队列是否有线程在排队,有就获取失败
  87. if (hasQueuedPredecessors())
  88. return -1;
  89. int available = getState();
  90. int remaining = available - acquires;
  91. if (remaining < 0 ||
  92. compareAndSetState(available, remaining))
  93. return remaining;
  94. }
  95. }
  96. }