[toc]

AQS 知识总结

AQS,全称 Abstract Queue Synchronizer,中文名称“同步阻塞队列”,在java.util.concurrent.locks包下面,是一个抽象的可以实现阻塞线程、排队控制、唤醒线程等操作的同步器基础框架类,AQS 可以实现排它锁、共享锁、条件锁、计数器等相关功能。

AQS 维护着一个同步队列,队列节点其实就是一个工作线程,每个节点都维护着其前驱节点与后继节点,节点中的 waitStatus 字段标识着节点是否被取消或是正在等待一个条件等。

如果用一句话来总结 AQS 的功能,那就是 AQS 必须要保证整个队列入队和出队操作的原子性,同时要保证入队的线程在休眠后必须能够被唤醒

在 AQS 的实现中,任何节点获取到资源时都会被设置成头节点,头节点的所有后继节点都是需要陷入休眠的节点,所以头节点必须要唤醒后继节点。后继节点是由其前驱节点唤醒的,所以 AQS 的管理中,一个节点如果需要陷入休眠,那么它必须将自己链接到一个能够唤醒它的前驱节点,通俗点讲,就是找个“好爹”,而每个节点在释放锁时,也要忠实的履行职责,唤醒后继节点。

AQS 是一个抽象的可以实现阻塞线程、排队控制、唤醒线程等操作的同步器基础框架类,现在再来理解这句话,阻塞线程、排队控制、唤醒线程,AQS 并没有实现获取锁的核心逻辑,获取锁的真正逻辑是用户实现的,AQS 只是帮我们管理线程,没得到锁的线程进行休眠,并且要保证休眠的线程能够被唤醒,可以说这就是整个 AQS 要做的事情。

  1. static final class Node {
  2. // 标志线程已被中断
  3. static final int CANCELLED = 1;
  4. // 标志节点在释放时,必须唤醒后继节点
  5. static final int SIGNAL = -1;
  6. // 标志节点正在等待一个条件,用以实现条件队列
  7. static final int CONDITION = -2;
  8. // 指示下一个 acquireShared 应该无条件地传播,即下一个节点 waitStatus 应该为 SIGNAL
  9. static final int PROPAGATE = -3;
  10. // waitStatus 为上述 4 种值,同时可能为 0,代表无意义
  11. volatile int waitStatus;
  12. volatile Node prev;
  13. volatile Node next;
  14. volatile Thread thread;
  15. // 为 null 代表独占模式,否则代表共享模式
  16. Node nextWaiter;
  17. }

独占模式

原理

AQS 队列与其他队列并无两样,也是个先进先出的队列,因此新的节点入队时会被放置在队尾,如果节点获取锁(也称资源)失败,节点将会陷入休眠,在休眠时节点会为自己找一个“好爹”以负责唤醒自己(节点会修改前驱节点的 ws = signal)

释放锁时,如果有必要,节点必须唤醒后继节点,让后续节点醒过来继续尝试获取锁。

在一开始时,AQS 的 head 是一个伪头节点(假定已经初始化),直到某个节点成功获取锁才会被设置成头节点。

获取资源分析

acquire 方法是获取锁的入口方法:

  1. public final void acquire(int arg) {
  2. // 如果尝试获取锁不成功,那么进入 acquireQueued 堵塞,不停的尝试获取锁,如果 acquireQueued 返回 true,则线程自我中断,线程销毁
  3. // 如果尝试获取锁成功,则不必堵塞或中断,线程继续运行;或者 acquireQueued 返回 false 同样如此
  4. // 目标: tryAcquire(arg) 返回 true,或 acquireQueued 返回 false,否则线程将中断
  5. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6. selfInterrupt();
  7. }

tryAcquire 方法是我们自己实现的核心方法,acquire 方法会至少调用一次 tryAcquire 方法,这也意味着新加入的节点可能在第一次 tryAcquire 就成功了,那么这个新节点将不会入队而是直接运行,新节点直接抢占运行,而那些在等待队列中的线程却没得到运行,发生不公平的现象,AQS 提供了 hasQueuedPredecessors() 方法以判断当前线程是否是等待时间最久的线程,以实现公平的锁。

如果所有线程一次 tryAcquire 就成功了,那么没有排队的线程,AQS 也就没有什么事情可以做了。

如果第一次 tryAcquire 失败,节点会通过 addWaiter 方法被加入至队列尾部,然后执行 acquireQueued 方法,该方法是不停堵塞并尝试获取锁的核心方法

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean interrupted = false;
  3. try {
  4. // 无线循环,直到 获取锁 或 休眠成功
  5. for (; ; ) {
  6. // 获取上一个节点,如果上一个节点就是队头,并且当前节点成功获取到锁,则设置当前节点为头
  7. final Node p = node.predecessor();
  8. if (p == head && tryAcquire(arg)) {
  9. setHead(node);
  10. // 由于是独占模式,当前节点获取锁成功,那么上一个节点一定释放了锁,可以光荣退休
  11. p.next = null;
  12. // 返回 false,这将导致线程开始工作
  13. return interrupted;
  14. }
  15. // 获取锁失败,休眠等待唤醒
  16. // 否则查看自己是否能够休眠,如果能够休眠则进行休眠
  17. if (shouldParkAfterFailedAcquire(p, node))
  18. interrupted |= parkAndCheckInterrupt();
  19. }
  20. } catch (Throwable t) {
  21. cancelAcquire(node);
  22. if (interrupted)
  23. selfInterrupt();
  24. throw t;
  25. }
  26. }

shouldParkAfterFailedAcquire(p, node) 是节点是否能够陷入休眠的核心方法:

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus;
  3. // 如果前一个节点会通知自己,那么自己可以放心地睡
  4. if (ws == Node.SIGNAL) // ws == 1
  5. return true;
  6. if (ws > 0) {
  7. // 如果前节点死掉了,则重新链接到一个新的存活节点(找一个“好爹”)
  8. do {
  9. node.prev = pred = pred.prev;
  10. } while (pred.waitStatus > 0);
  11. pred.next = node;
  12. } else {
  13. // 否则前驱节点必须要承担起唤醒后续节点的重任
  14. pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
  15. }
  16. // 否则,自己不能休眠,没有节点会唤醒自己,必须要在 acquireQueued 方法内不停尝试 获取锁 以及 休眠
  17. return false;
  18. }

可以发现在 shouldParkAfterFailedAcquire(p, node) 方法中,节点只有在前节点保证会唤醒自己的情况下才会陷入休眠,如果前节点死了,节点会遍历直至找到一个正常的节点链接上去;如果前节点正常,但是 ws != Node.SIGNAL,那么 AQS 会将前节点的 ws 设置为 Node.SIGNAL,然后继续重试。

释放资源分析

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. // 在独占模式下,必须要唤醒后继节点
  5. // h 是可能为 null 的,这说明队列为空,那么也没有队列可以唤醒了
  6. // h.waitStatus 也是可能为 0 的,获取锁时,如果线程第二次就成功了,那么将不会陷入休眠,也不会设置自身状态,而直接成为队头,这种情况下,h.waitStatus == 0,这说明没有后续节点找到自己当“爹”
  7. if (h != null && h.waitStatus != 0)
  8. unparkSuccessor(h);
  9. return true;
  10. }
  11. return false;
  12. }

unparkSuccessor 是唤醒线程的核心方法,当线程被唤醒时,线程从 acquireQueued 醒过来,会继续进入循环尝试获取锁。

这就是 AQS 管理独占模式队列的逻辑:能不管就不管,如果没办法获取锁,AQS 不得不管了,那么就保证你陷入休眠时有人唤醒,保证某些“承担大任”的节点出队时要唤醒后继节点。

共享模式

原理

共享模式允许多个节点共享资源,这也意味着队列中可能会有多个正在获取资源的节点,共享模式中节点释放资源并不会把节点移出队列,因为它们可能还会释放部分资源。

共享模式中陷入休眠的逻辑与独占模式并无两样,但共享模式下在尝试获取资源时,如果发现还有资源可用,共享模式下会唤醒头节点的后继节点,让后继节点继续尝试获取资源。

当节点获取到资源时,仍然会把节点设置为头节点,头节点必须要唤醒后继节点。

当节点释放资源时,共享模式中可能会有非头节点释放资源,这种情况下也必须要唤醒头节点的后继节点

获取资源分析

  1. public final void acquireShared(int arg) {
  2. // 如果为 -1 表示失败了,则进入同步队列堵塞获取锁
  3. // 如果成功了,不用入队
  4. if (tryAcquireShared(arg) < 0)
  5. doAcquireShared(arg);
  6. }

tryAcquireShared(arg) 是具体获取资源的方法,这个方法的返回值为:

  1. ret = -1:获取资源失败。
  2. ret = 0:获取资源成功,但没有其他资源可用。
  3. ret = 1:获取资源成功,并且还有资源可用。

来看 doAcquireShared 方法:

  1. private void doAcquireShared(int arg) {
  2. // 标记为共享节点添加到队列
  3. final Node node = addWaiter(Node.SHARED);
  4. boolean interrupted = false;
  5. try {
  6. for (; ; ) {
  7. final Node p = node.predecessor();
  8. if (p == head) {
  9. int r = tryAcquireShared(arg);
  10. /**
  11. * r = -1: 失败
  12. * r = 0:成功,但没有其他资源了,不允许其他共享者进入了
  13. * r = 1:成功,还有资源,允许其他共享者进入
  14. */
  15. if (r >= 0) {
  16. // 这是我们第一次见这个函数
  17. setHeadAndPropagate(node, r);
  18. p.next = null; // help GC
  19. // 注意,这里返回了,因此不会堵塞
  20. return;
  21. }
  22. }
  23. // 堵塞等待唤醒
  24. if (shouldParkAfterFailedAcquire(p, node))
  25. interrupted |= parkAndCheckInterrupt();
  26. }
  27. } catch (Throwable t) {
  28. cancelAcquire(node);
  29. throw t;
  30. } finally {
  31. if (interrupted)
  32. selfInterrupt();
  33. }
  34. }

可以发送与独占模式还是比较相似的,只有 setHead 变成了 setHeadAndPropagate ,来看看这个函数:

  1. private void setHeadAndPropagate(Node node, int r) {
  2. // 注意!!!这里 h 是旧头部
  3. Node h = head;
  4. // 设置 node 为新头部
  5. setHead(node);
  6. // r > 0,说明还有资源可用,所以尝试唤醒休眠线程,这个很好理解
  7. // h.waitStatus 是旧头部的状态,h.waitStatus 不可能为 SIGNAL,因为当为 SIGNAL 时,后继线程应该正在休眠;一旦 h 唤醒后继节点后,h 自身也会通过 CAS 将 ws 设置为别的状态,总之,不可能为 SIGNAL
  8. // h.waitStatus 只可能为 PROPAGATE,为啥为 PROPAGATE 也要唤醒节点?我们后面再讲
  9. if (r > 0 || h.waitStatus < 0) {
  10. Node s = node.next;
  11. if (s != null && !s.isShared()) {
  12. return;
  13. }
  14. // 因为还有资源可用,所以尝试唤醒头节点的后继节点
  15. doReleaseShared();
  16. }
  17. }

当资源可用时或者旧头部的状态为 PROPAGATE 时,此时需要继续唤醒后继节点,这就是这个方法的逻辑。

释放资源分析

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

在释放资源时,一旦释放成功,则必会进入 doReleaseShared() 方法:

  1. private void doReleaseShared() {
  2. for (; ; ) {
  3. // 找到当前头部节点,尝试唤醒后继节点
  4. Node h = head;
  5. if (h != null && h != tail) {
  6. int ws = h.waitStatus;
  7. // 如果需要唤醒后继节点的话,那么尝试将自身状态设置为 0,并唤醒后继节点
  8. if (ws == Node.SIGNAL) {
  9. if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue;
  10. unparkSuccessor(h);
  11. }
  12. // 否则就将自己状态设置为 Node.PROPAGATE
  13. else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
  14. continue; // loop on failed CAS
  15. }
  16. if (h == head) // loop if head changed
  17. break;
  18. }
  19. }

doReleaseShared() 方法会尝试释放头节点的后继节点,而只要 tryReleaseShared 成功,就一定会执行 doReleaseShared() 方法,也就是说任何一个节点释放资源时,都会保证去尝试唤醒一个正在休眠的节点

到这里,共享模式已经可以实现了,那 Node.PROPAGATE 字段有何意义呢?

Node.PROPAGATE 意义

Node.PROPAGATE 字段的出现是为了解决一个 Bug JDK-6801020

本节摘抄至 AQS源码深入分析之共享模式-你知道为什么AQS中要有PROPAGATE这个状态吗?_雕爷的架构之路-CSDN博客_aqs propagate

来看看离现在非常久远的Java 5u22中的该处代码是如何实现的:

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. setHead(node);
  3. if (propagate > 0 && node.waitStatus != 0) {
  4. Node s = node.next;
  5. if (s == null || s.isShared())
  6. unparkSuccessor(node);
  7. }
  8. }
  9. public final boolean releaseShared(int arg) {
  10. if (tryReleaseShared(arg)) {
  11. Node h = head;
  12. if (h != null && h.waitStatus != 0)
  13. unparkSuccessor(h);
  14. return true;
  15. }
  16. return false;
  17. }

可以看到,早期版本的实现相比于现在的实现来说简单了很多,总结起来最主要的区别有以下几个:

在setHeadAndPropagate方法中,早期版本对节点waitStatus状态的判断只是!=0,而现在改为了<0;
早期版本的releaseShared方法中的执行逻辑和独占锁下的release方法是一样的,而现在将具体的唤醒逻辑写在了doReleaseShared方法里面,和setHeadAndPropagate方法共同调用。
而可能出现bug的测试代码如下:

  1. import java.util.concurrent.Semaphore;
  2. public class TestSemaphore {
  3. private static Semaphore sem = new Semaphore(0);
  4. private static class Thread1 extends Thread {
  5. @Override
  6. public void run() {
  7. sem.acquireUninterruptibly();
  8. }
  9. }
  10. private static class Thread2 extends Thread {
  11. @Override
  12. public void run() {
  13. sem.release();
  14. }
  15. }
  16. public static void main(String[] args) throws InterruptedException {
  17. for (int i = 0; i < 10000000; i++) {
  18. Thread t1 = new Thread1();
  19. Thread t2 = new Thread1();
  20. Thread t3 = new Thread2();
  21. Thread t4 = new Thread2();
  22. t1.start();
  23. t2.start();
  24. t3.start();
  25. t4.start();
  26. t1.join();
  27. t2.join();
  28. t3.join();
  29. t4.join();
  30. System.out.println(i);
  31. }
  32. }
  33. }

其实上面所做的操作无非就是创建了四个线程:t1和t2用于获取信号量,而t3和t4用于释放信号量,其中的10000000次for循环是为了放大出现bug的几率,join操作是为了阻塞主线程。现在就可以说出出现bug的现象了:也就是这里可能会出现线程被hang住的情况发生。

可以想象这样一种场景:假如说当前CLH队列中有一个空节点和两个被阻塞的节点(t1和t2想要获取信号量但获取不到被阻塞在CLH队列中(state初始为0)):head->t1->t2(tail)。

  • 时刻1:t3调用release->releaseShared->tryReleaseShared,将state+1变为1,同时发现此时的head节点不为null并且waitStatus为-1,于是继续调用unparkSuccessor方法,在该方法中会将head的waitStatus改为0;
  • 时刻2:t1被上面t3调用的unparkSuccessor方法所唤醒,调用了tryAcquireShared,将state-1又变为了0。注意,此时还没有调用接下来的setHeadAndPropagate方法;
  • 时刻3:t4调用release->releaseShared->tryReleaseShared,将state+1变为1,同时发现此时的head节点虽然不为null,但是waitStatus为0,所以就不会执行unparkSuccessor方法;
  • 时刻4:t1执行setHeadAndPropagate->setHead,将头节点置为自己。但在此时propagate也就是剩余的state已经为0了(propagate是在时刻2时通过传参的方式传进来的,那个时候-1后剩余的state是0),所以也不会执行unparkSuccessor方法。

至此可以发现一轮循环走完后,CLH队列中的t2线程永远不会被唤醒,主线程也就永远处在阻塞中,这里也就出现了bug。那么来看一下现在的AQS代码在引入了PROPAGATE状态后,在面对同样的场景下是如何解决这个bug的:

  • 时刻1:t3调用release->releaseShared->tryReleaseShared,将state+1变为1,继续调用doReleaseShared方法,将head的waitStatus改为0,同时调用unparkSuccessor方法;
  • 时刻2:t1被上面t3调用的unparkSuccessor方法所唤醒,调用了tryAcquireShared,将state-1又变为了0。注意,此时还没有调用接下来的setHeadAndPropagate方法;
  • 时刻3:t4调用release->releaseShared->tryReleaseShared,将state+1变为1,同时继续调用doReleaseShared方法,此时会将head的waitStatus改为PROPAGATE;
  • 时刻4:t1执行setHeadAndPropagate->setHead,将新的head节点置为自己。虽然此时propagate依旧是0,但是“h.waitStatus < 0”这个条件是满足的(h现在是PROPAGATE状态),同时下一个节点也就是t2也是共享节点,所以会执行doReleaseShared方法,将新的head节点(t1)的waitStatus改为0,同时调用unparkSuccessor方法,此时也就会唤醒t2了。

至此就可以看出,在引入了PROPAGATE状态后,可以有效避免在高并发场景下可能出现的、线程没有被成功唤醒的情况出现。

条件队列

原理

条件队列由 AQS 内部类 ConditionObject 维护,关于条件变量介绍与使用可参考我的其他博客:线程间同步方式

条件变量是一组等待唤醒的逻辑,在 AQS的实现中,等待的逻辑其实就是将线程加入由 ConditionObject 维护的条件队列中,由于 ConditionObject 内部并没有保证同步,所以等待时必须保证持有锁,AQS 保证线程在进入休眠时会释放锁。

唤醒的逻辑也很清晰,在 AQS 中,唤醒线程其实就是将由 ConditionObject 维护的条件队列中的头节点移至 AQS 中的同步等待队列中,唤醒该节点,然后节点就会进入 acquireQueued 方法,与独占模式中争抢资源的逻辑一致。

等待

  1. public final void await() throws InterruptedException {
  2. // 将节点添加至 条件队列
  3. Node node = addConditionWaiter();
  4. // 尝试释放自己持有的锁,如果未持有锁会抛出异常
  5. int savedState = fullyRelease(node);
  6. int interruptMode = 0;
  7. // 如果不在同步等待队列的话(即还没被唤醒,如果被唤醒了会被移到等待队列中)
  8. while (!isOnSyncQueue(node)) {
  9. // 没有信号唤醒,那就休眠等待
  10. LockSupport.park(this);
  11. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  12. break;
  13. }
  14. // 被唤醒,以被添加到等待队列
  15. // 进入 acquireQueued,与独占模式逻辑相同,不停 争抢锁 或 休眠
  16. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  17. interruptMode = REINTERRUPT;
  18. if (interruptMode != 0)
  19. reportInterruptAfterWait(interruptMode);
  20. }

唤醒

  1. private void doSignal(Node first) {
  2. do {
  3. if ( (firstWaiter = first.nextWaiter) == null)
  4. lastWaiter = null;
  5. first.nextWaiter = null;
  6. // 将队列中的头节点(等待时间最长的)作为参数,调用 transferForSignal
  7. } while (!transferForSignal(first) && (first = firstWaiter) != null);
  8. }
  1. final boolean transferForSignal(Node node) {
  2. // 如果无法设置,那么线程肯定被取消了,因为等待中的队列 ws 只可能是 CONDITION
  3. if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
  4. return false;
  5. // 将节点加入同步等待队列,这个返回的是前一个节点
  6. Node p = enq(node);
  7. int ws = p.waitStatus;
  8. // 将前一个节点设置为 SIGNAL,如果设置成功,自己可以放心休眠
  9. if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
  10. LockSupport.unpark(node.thread);
  11. return true;
  12. }

实践 —— 代码实现

可重入锁实现

  1. public class MyReentrantLock {
  2. private AbstractQueuedSynchronizer sync;
  3. /**
  4. * 实现不公平、可重入锁的 AQS
  5. */
  6. private class UnFairSync extends AbstractQueuedSynchronizer {
  7. @Override
  8. protected boolean tryAcquire(int arg) {
  9. int state = getState();
  10. // 如果资源可用,或持有锁的是当前线程,则尝试封锁资源
  11. if (state == 0 || getExclusiveOwnerThread() == Thread.currentThread()) {
  12. if (compareAndSetState(state, state + arg)) {
  13. setExclusiveOwnerThread(Thread.currentThread());
  14. return true;
  15. }
  16. }
  17. return false;
  18. }
  19. @Override
  20. protected boolean tryRelease(int arg) {
  21. int state = getState();
  22. return compareAndSetState(state, state - arg);
  23. }
  24. }
  25. /**
  26. * 实现公平、可重入锁的 AQS
  27. */
  28. private class FairSync extends AbstractQueuedSynchronizer {
  29. @Override
  30. protected boolean tryAcquire(int arg) {
  31. // 如果当前线程不是等待队列中的第一个线程,则不让其站有锁
  32. // 这是公平的锁,只有等待队列中等待时间最长的能够拥有锁
  33. if (getFirstQueuedThread() != Thread.currentThread()) {
  34. return false;
  35. }
  36. int state = getState();
  37. // 如果资源可用,或持有锁的是当前线程,则尝试封锁资源
  38. // 如果资源可用,或持有锁的是当前线程,则尝试封锁资源
  39. if (state == 0 || getExclusiveOwnerThread() == Thread.currentThread()) {
  40. if (compareAndSetState(state, state + arg)) {
  41. setExclusiveOwnerThread(Thread.currentThread());
  42. return true;
  43. }
  44. }
  45. return false;
  46. }
  47. @Override
  48. protected boolean tryRelease(int arg) {
  49. int state = getState();
  50. return compareAndSetState(state, state - arg);
  51. }
  52. }
  53. public MyReentrantLock(boolean fair) {
  54. this.sync = fair ? new FairSync() : new UnFairSync();
  55. }
  56. public MyReentrantLock() {
  57. this(false); //默认不公平锁
  58. }
  59. public void lock() {
  60. sync.acquire(1);
  61. }
  62. public void unLock() {
  63. sync.release(1);
  64. }
  65. }

信号量实现

  1. public class MySemaphore {
  2. AbstractQueuedSynchronizer sync;
  3. public MySemaphore(int totalResource) {
  4. sync = new Sync(totalResource);
  5. }
  6. private class Sync extends AbstractQueuedSynchronizer {
  7. public Sync(int s) {
  8. setState(s);
  9. }
  10. @Override
  11. protected int tryAcquireShared(int arg) {
  12. int state = getState();
  13. System.out.println("state = " + state);
  14. // 如果资源够的话,则获取资源
  15. if (state >= arg && compareAndSetState(state, state - arg)) {
  16. // 如果获取资源之后还有资源的话,返回 1,否则返回 0
  17. return state - arg > 0 ? 1 : 0;
  18. }
  19. // 获取资源失败
  20. return -1;
  21. }
  22. @Override
  23. protected boolean tryReleaseShared(int arg) {
  24. int state = getState();
  25. return compareAndSetState(state, state + arg);
  26. }
  27. }
  28. public void acquire(int resource) {
  29. sync.acquireShared(resource);
  30. }
  31. public void release(int resource) {
  32. sync.releaseShared(resource);
  33. }
  34. }

计数器实现

  1. public class MyCountDownLatch {
  2. private AbstractQueuedSynchronizer sync;
  3. private class Sync extends AbstractQueuedSynchronizer {
  4. public Sync(int n) {
  5. // 初始化任务数
  6. setState(n);
  7. }
  8. @Override
  9. protected int tryAcquireShared(int arg) {
  10. int state = getState();
  11. // 等于 0 时代表任务已全部完成,返回 1,允许等待的线程 await
  12. // 大于 0 时,说明还有任务,允许其他线程领取任务,await 的需要继续等待
  13. // 不可能小于 0
  14. return state == 0 ? 1 : -1;
  15. }
  16. @Override
  17. protected boolean tryReleaseShared(int arg) {
  18. int state = getState();
  19. if (state > 0) {
  20. // 尝试将任务数减少 1
  21. return compareAndSetState(state, state - 1);
  22. }
  23. return false;
  24. }
  25. }
  26. public MyCountDownLatch(int n) {
  27. sync = new Sync(n);
  28. }
  29. public void countDown() {
  30. sync.releaseShared(1);
  31. }
  32. public void await() {
  33. sync.acquireShared(1);
  34. }
  35. }

测试代码

  1. public class Main {
  2. public static void main(String[] args) throws InterruptedException {
  3. test1();// 测试可重入锁
  4. test2();// 测试信号量
  5. test3();// 计数器
  6. }
  7. synchronized static void countDown(MyCountDownLatch cdl, int id) {
  8. cdl.countDown();
  9. System.out.println("线程" + id + " 完成一个任务");
  10. }
  11. static void test3() {
  12. int N = 10;
  13. MyCountDownLatch cdl = new MyCountDownLatch(N);
  14. System.out.println("总任务数:" + N);
  15. for (int i = 0; i < N; i++) {
  16. final int id = i;
  17. new Thread(()-> {
  18. countDown(cdl, id);
  19. }).start();
  20. }
  21. cdl.await();
  22. System.out.println("任务全部完成,继续运行!");
  23. }
  24. static void test2() throws InterruptedException {
  25. MySemaphore sem = new MySemaphore(5);
  26. for (int i = 1; i <= 10; i++) {
  27. int id = i;
  28. new Thread(() -> {
  29. sem.acquire(1);
  30. System.out.println("线程" + id + "获取 1 个资源");
  31. }).start();
  32. }
  33. // 等待资源全部被获取
  34. Thread.sleep(100);
  35. // 释放 3 个资源
  36. System.out.println("\n主线程释放 3 个资源\n");
  37. sem.release(3);
  38. // 等待资源全部被获取
  39. Thread.sleep(100);
  40. // 释放 3 个资源
  41. System.out.println("\n主线程释放 2 个资源\n");
  42. sem.release(2);
  43. }
  44. static void test1() throws InterruptedException {
  45. MyReentrantLock lock = new MyReentrantLock();
  46. for (int i = 1; i <= 100; i++) {
  47. int id = i;
  48. new Thread(() -> {
  49. lock.lock();
  50. System.out.println("线程" + id + " 持有锁");
  51. System.out.println("线程" + id + " 释放锁");
  52. System.out.println();
  53. lock.unLock();
  54. }).start();
  55. }
  56. }
  57. }