AQS

Java并发包(JUC)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,譬如ReentrangLock、Semaphore,它们的实现都用到了一个共同的基类—AbstractQueuedSynchronizer,简称AQS。AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的Semaphore,ReentrantLock,ReentrantReadWriteLock 等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。

AQS 原理概览

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig、Landin and Hagersten):CLH队列是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

看个AQS(AbstractQueuedSynchronizer)原理图:

Java并发-03-AQS解剖 - 图1

  1. private volatile int state; // 共享变量,使用volatile修饰保证线程可见性

状态信息通过 protected 类型的getState()setState()compareAndSetState()进行操作

  1. // 返回同步状态的当前值
  2. protected final int getState() {
  3. return state;
  4. }
  5. // 设置同步状态的值
  6. protected final void setState(int newState) {
  7. state = newState;
  8. }
  9. // 原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
  10. protected final boolean compareAndSetState(int expect, int update) {
  11. return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  12. }

AQS 对资源的共享方式

  • Exclusive(独占):只有一个线程能执行,如 ReentrantLock。又可分为公平锁和非公平锁:
    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  • Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。

ReentrantReadWriteLock 可以看成是组合式,因为 ReentrantReadWriteLock 也就是读写锁允许多个线程同时对某一资源进行读。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可(tryReleaseShared()tryRelease()tryAcquire()tryAcquireShared()),至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

AQS底层使用了模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样:

  1. 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
  2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

  1. isHeldExclusively() // 该线程是否正在独占资源。只有用到Condition才需要去实现它。
  2. tryAcquire(int) // 独占方式。尝试获取资源,成功则返回true,失败则返回false。
  3. tryRelease(int) // 独占方式。尝试释放资源,成功则返回true,失败则返回false。
  4. tryAcquireShared(int) // 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数
  5. // 表示成功,且有剩余资源。
  6. tryReleaseShared(int) // 共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

以 ReentrantLock 为例,state 初始化为0,表示未锁定状态。A线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state+1。此后,其他线程再 tryAcquire() 时就会失败,直到A线程 unlock() 到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以 CountDownLatch 以例,state 初始化为N(注意N要与线程个数一致)。每个线程执行完后 countDown() 一次,state 会 CAS 减1。等到所有子线程都执行完后(即state=0),会 unpark() 主调用线程,然后主调用线程就会从 await() 函数返回,继续后续动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

Node

Node结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量 waitStatus 则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE(传播)。

  • CANCELLED:值为1。在同步队列中等待的线程等待超时或被中断,处于此状态的结点不会再转为其他状态,尤其此节点里的线程不会再被阻塞。
  • SIGNAL:值为-1。标志着此结点的后继结点是阻塞状态,此结点运行时后续结点需要进行登台,其释放资源或进入CANCELLED状态后需要唤醒后续结点。
  • CONDITION:值为-2。标识的结点处于等待队列中,当其他线程调用了 Condition 的 signal() 方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3。仅存在共享模式中,在共享模式中,表示Node的unpark可以向后传播。(类似读者写者问题中一个写者会阻塞N个读者,前N-1个读者就是PROPAGATE状态)。
  • 0状态:值为0。代表初始化状态。

AQS在判断状态时,可以通过用 waitStatus>0 表示取消状态,而 waitStatus<0 表示有效状态。

同步队列

同步队列中的线程不都是有意义的线程。有一个无意义的哨兵节点。同时按照JDK注释的话来说,这里也可以使用AtomicInteger,但使用原生CAS这样做是为了以后更好的增强性能。

Setup to support compareAndSet. We need to natively implement this here: For the sake of permitting future enhancements, we cannot explicitly subclass AtomicInteger, which would be efficient and useful otherwise. So, as the lesser of evils, we natively implement using hotspot intrinsics API. And while we are at it, we do the same for other CASable fields (which could otherwise be done with atomic field updaters).

Java并发-03-AQS解剖 - 图2

  1. private static final Unsafe unsafe = Unsafe.getUnsafe();
  2. private static final long stateOffset;
  3. private static final long headOffset;
  4. private static final long tailOffset;
  5. private static final long waitStatusOffset;
  6. private static final long nextOffset;
  7. static {
  8. try {
  9. stateOffset = unsafe.objectFieldOffset
  10. (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
  11. headOffset = unsafe.objectFieldOffset
  12. (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
  13. tailOffset = unsafe.objectFieldOffset
  14. (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
  15. waitStatusOffset = unsafe.objectFieldOffset
  16. (Node.class.getDeclaredField("waitStatus"));
  17. nextOffset = unsafe.objectFieldOffset
  18. (Node.class.getDeclaredField("next"));
  19. } catch (Exception ex) { throw new Error(ex); }
  20. }

ReetrantLock

ReetrantLock 是独占模式的AQS的实现,我们下面通过这个类来看看如何实现一个独占的AQS同步器。主要分析两部分:

  • 通用的 acquire-release 模板
  • 公平和非公平的 tryAcquire()tryRelease() 实现。

acquire(int)

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是 lock() 的语义,当然不仅仅只限于 lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是 acquire() 的源码:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  3. selfInterrupt();
  4. }

函数流程如下:

  1. tryAcquire():尝试直接去获取资源,如果成功则直接返回;
  2. addWaiter():如果 tryAcquire() 失败,将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued() 使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt(),将中断补上。这个补上中断仅仅是将中断标志位补上。但是此时这个中断标志是没有被清除的,也就是说此时 isInterrupted()方法返回是true。

tryAcquire(int)

Attempts to acquire in exclusive mode. This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it.

This method is always invoked by the thread performing acquire. If this method reports failure, the acquire method may queue the thread, if it is not already queued, until it is signalled by a release from some other thread.

此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是 tryLock() 的语义,还是那句话,当然不仅仅只限于 tryLock()。如下是 tryAcquire()的源码:

  1. protected boolean tryAcquire(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

这个方法是我们再自定义同步器的时候需要实现的方法,之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

addWaiter(Node)

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点:

  1. private Node addWaiter(Node mode) {
  2. // 以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
  3. Node node = new Node(Thread.currentThread(), mode);
  4. // 尝试快速方式直接放到队尾。
  5. Node pred = tail;
  6. if (pred != null) { // 从enq()方法指导,tail==null时需要先创建一个Head
  7. node.prev = pred;
  8. // 使用CAS操作向末尾插入Node,compareAndSetTail(Node expect, Node update)
  9. if (compareAndSetTail(pred, node)) {
  10. pred.next = node;
  11. return node;
  12. }
  13. }
  14. // 上一步失败则通过enq入队。
  15. enq(node);
  16. return node;
  17. }

enq(Node)

  1. private Node enq(final Node node) {
  2. // CAS"自旋",直到成功加入队尾
  3. for (;;) {
  4. Node t = tail;
  5. if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
  6. // compareAndSetHead(Node update),进一步证实 Node 队列有一个哨兵节点
  7. if (compareAndSetHead(new Node()))
  8. tail = head;
  9. } else { // 正常流程,放入队尾
  10. node.prev = t;
  11. // CAS插入队尾
  12. if (compareAndSetTail(t, node)) {
  13. t.next = node;
  14. return t;
  15. }
  16. }
  17. }
  18. }

acquireQueued(Node, int)

通过 tryAcquire()addWaiter() ,该线程获取资源失败,已经被放入等待队列尾部了。此时线程需要进入等待状态休息,直到其他线程彻底释放资源后唤醒自己。

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true; // 标记是否成功拿到资源
  3. try {
  4. boolean interrupted = false; // 标记等待过程中是否被中断过
  5. // 循环直到获取到资源
  6. for (;;) {
  7. final Node p = node.predecessor(); // 拿到前驱
  8. // 如果前驱是head,即该结点是CLH队列的老二(head节点是哨兵),就去尝试获取资源
  9. if (p == head && tryAcquire(arg)) {
  10. setHead(node); // 拿到资源后,将head指向该结点。所以head所指的当前结点,
  11. // 就是当前获取到资源的那个结点或null。
  12. p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,
  13. // 此时意味着正在获得资源的线程出队了。
  14. failed = false;
  15. return interrupted; // 返回等待过程中是否被中断过
  16. }
  17. // 如果自己可以休息了,就进入waiting状态,直到被unpark()
  18. if (shouldParkAfterFailedAcquire(p, node) &&
  19. parkAndCheckInterrupt())
  20. interrupted = true; // 如果等待过程中被中断过,哪怕只有那么一次,
  21. // 就将interrupted标记为true
  22. }
  23. } finally {
  24. // 如果正常运行会在第16行返回,只有出现异常才能运行到finally块
  25. // 按照JDK注释所说,撤销状态是由于 timeout 和 interrupt,但是此时是 tryAcquire() 抛异常
  26. if (failed)
  27. cancelAcquire(node);
  28. }
  29. }

shouldParkAfterFailedAcquire(Node, Node)

此方法主要用于检查状态,看看自己是否真的可以去休息了,万一队列前边的线程都是 CANCELLED 呢。

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. int ws = pred.waitStatus; // 拿到前驱的状态
  3. if (ws == Node.SIGNAL)
  4. // 前驱结点是SIGNAL,表示当前结点应该被阻塞。所以返回true。
  5. return true;
  6. if (ws > 0) { // 前驱结点大于 0,表示前驱结点都是 CANCELLED 状态。
  7. // 一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
  8. do {
  9. node.prev = pred = pred.prev;
  10. } while (pred.waitStatus > 0);
  11. pred.next = node;
  12. } else {
  13. // 能运行到此,表示前驱的状态为0或者PROPAGATE。
  14. // 前驱结点修改为SIGNAL表示上一次SIGNAL的传播只能到前驱
  15. // 由于新节点的加入会导致前驱为SIGNAL,所以只有末尾的节点和运行的结点为0
  16. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  17. }
  18. // 返回false会进行一次自旋
  19. return false;
  20. }

假设pred一开始指向 CANCELLED 结点。从第11行退出的时候结点连接图会发生如下转变:

Java并发-03-AQS解剖 - 图3
parkAndCheckInterrupt()

如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this); // 调用park()使线程进入waiting状态
  3. return Thread.interrupted(); // 如果被唤醒,查看自己是不是被中断的。
  4. }

park() 会让当前线程进入 waiting 状态。在此状态下,有两种途径可以唤醒该线程:

  • unpark()
  • interrupt()

这里使用 Thread.interrupted() 会清除线程的中断标志位。同时在清除以后,如果线程在等待过程中是被interrupt()唤醒的, selfInterrupt() 会补上一个中断信号。这一点与线程池有密切关系。

cancelAcquire()

  1. private void cancelAcquire(Node node) {
  2. // 忽略空结点
  3. if (node == null) return;
  4. // 释放线程
  5. node.thread = null;
  6. // 跳过前驱为 CANCELLED 的结点
  7. Node pred = node.prev;
  8. while (pred.waitStatus > 0)
  9. node.prev = pred = pred.prev;
  10. // 获得过滤后结点的后继结点
  11. Node predNext = pred.next;
  12. // 设置当前结点的状态
  13. node.waitStatus = Node.CANCELLED;
  14. // If we are the tail, remove ourselves.
  15. // compareAndSetTail(Node expect, Node update)
  16. if (node == tail && compareAndSetTail(node, pred)) {
  17. // compareAndSetNext(Node node, Node expect, Node update)
  18. compareAndSetNext(pred, predNext, null);
  19. } else {
  20. // If successor needs signal, try to set pred's next-link
  21. // so it will get one. Otherwise wake it up to propagate.
  22. int ws; // pred 结点的状态
  23. // 前驱不是head && 前驱线程不空 && ((前驱状态为SIGNAL || 前驱有效且状态替换为SIGNAL成功))
  24. // 只有被CANCELLED的结点和HEAD的thread为null
  25. // 进入CAS的时候,ws为0或者PROPAGATE
  26. // 撤销过程是向前寻找的,而传播过程是向后的,所以pred结点可能已经执行完毕,此时CAS会失败
  27. // 所以当CAS失败的时候,node实际上就是头结点,需要进入unparkSuccessor()
  28. if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL ||
  29. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  30. pred.thread != null) {
  31. // 后继有效的时候
  32. Node next = node.next;
  33. if (next != null && next.waitStatus <= 0)
  34. compareAndSetNext(pred, predNext, next);
  35. } else {
  36. unparkSuccessor(node);
  37. }
  38. node.next = node; // help GC
  39. }
  40. }

先解释一下 predNext,这个指向的是当前有效结点的后一个结点,注意它不是 node。

Java并发-03-AQS解剖 - 图4
node 是 tail 结点

此时会将 pred 的后继结点设置为 null。

Java并发-03-AQS解剖 - 图5Java并发-03-AQS解剖 - 图6Java并发-03-AQS解剖 - 图7
acquireQueued() 分析完之后,我们接下来再回到 acquire(),再贴上它的源码吧:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

再来总结下它的流程吧:

  1. 调用自定义同步器的 tryAcquire() 尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则 addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued() 使线程在等待队列中休息,有机会时(轮到自己,会被 unpark() )会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt() ,将中断信号补上。

release(int)

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是 unlock() 的语义,当然不仅仅只限于 unlock()。下面是release() 的源码:

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head; // 找到头结点
  4. // 从下文的unparkSuccessor()可以看到,表示正在运行的状态会设置为0,所以此时不需要唤醒
  5. if (h != null && h.waitStatus != 0)
  6. unparkSuccessor(h); // 唤醒等待队列里的下一个线程
  7. return true;
  8. }
  9. return false;
  10. }

逻辑并不复杂。它调用 tryRelease() 来释放资源。有一点需要注意的是,它是根据 tryRelease() 的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计 tryRelease() 的时候要明确这一点!

tryRelease(int)

Attempts to set the state to reflect a release in exclusive mode.

This method is always invoked by the thread performing release.

此方法尝试去释放指定量的资源。下面是 tryRelease() 的源码:

  1. protected boolean tryRelease(int arg) {
  2. throw new UnsupportedOperationException();
  3. }

tryAcquire() 一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease() 都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可,也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release() 是根据 tryRelease() 的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

unparkSuccessor(Node)

  1. private void unparkSuccessor(Node node) {
  2. // 这里,node一般为当前线程所在的结点。
  3. int ws = node.waitStatus;
  4. if (ws < 0) // 置零当前线程所在的结点状态,允许失败。
  5. compareAndSetWaitStatus(node, ws, 0);
  6. Node s = node.next; // 找到下一个需要唤醒的结点s
  7. if (s == null || s.waitStatus > 0) { // 如果为空或已取消
  8. s = null;
  9. // 循环是从后向前搜寻,因为 CANCELLED 结点中,next指针被破坏了。
  10. for (Node t = tail; t != null && t != node; t = t.prev)
  11. if (t.waitStatus <= 0) // 从这里可以看出,<=0的结点,都是还有效的结点。
  12. s = t;
  13. }
  14. if (s != null)
  15. LockSupport.unpark(s.thread); // 唤醒
  16. }

小结

release() 方法的目的就是在 Node 队列中找出来第一个应该被释放的结点。但是这个节点不一定是 head.next,但是由于在 cancelAcquire() 中破坏了 next 链,所以采用从后向前的方式遍历。

非公平锁

lock()

lock()

  1. final void lock() {
  2. // compareAndSetState(int expect, int update)
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. else
  6. acquire(1);
  7. }

setExclusiveOwnerThread()

  1. private transient Thread exclusiveOwnerThread;
  2. protected final void setExclusiveOwnerThread(Thread thread) {
  3. exclusiveOwnerThread = thread;
  4. }

非公平锁就是上来就抢,如果抢到了就将独占锁设置为当前线程。如果抢不到就去 acquire()

tryAcquire()

  1. protected final boolean tryAcquire(int acquires) {
  2. return nonfairTryAcquire(acquires);
  3. }

nonfairTryAcquire()

  1. final boolean nonfairTryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) { // 可能别的线程释放资源,所以如果为0,就去抢资源,抢到返回 true
  5. if (compareAndSetState(0, acquires)) {
  6. setExclusiveOwnerThread(current);
  7. return true;
  8. }
  9. }
  10. // 如果当前线程就是持有锁的线程,就直接加上去
  11. else if (current == getExclusiveOwnerThread()) {
  12. int nextc = c + acquires;
  13. if (nextc < 0) // overflow
  14. throw new Error("Maximum lock count exceeded");
  15. setState(nextc);
  16. return true;
  17. }
  18. return false;
  19. }

unlock()

unlock()

  1. public void unlock() {
  2. sync.release(1);
  3. }

release()

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);
  6. return true;
  7. }
  8. return false;
  9. }

tryRelease()

  1. protected final boolean tryRelease(int releases) {
  2. int c = getState() - releases; // 获取当前资源
  3. if (Thread.currentThread() != getExclusiveOwnerThread())
  4. throw new IllegalMonitorStateException();
  5. boolean free = false;
  6. if (c == 0) { // c=0 表示资源都释放了
  7. free = true;
  8. setExclusiveOwnerThread(null);
  9. }
  10. setState(c);
  11. return free;
  12. }

公平锁

lock()

lock()

  1. final void lock() {
  2. acquire(1);
  3. }

tryAcquire()

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. if (!hasQueuedPredecessors() &&
  6. compareAndSetState(0, acquires)) {
  7. setExclusiveOwnerThread(current);
  8. return true;
  9. }
  10. }
  11. else if (current == getExclusiveOwnerThread()) {
  12. int nextc = c + acquires;
  13. if (nextc < 0)
  14. throw new Error("Maximum lock count exceeded");
  15. setState(nextc);
  16. return true;
  17. }
  18. return false;
  19. }

公平锁个非公平锁的区别就是第5行多了一个条件!hasQueuedPredecessors()

hasQueuedPredecessors()

  1. public final boolean hasQueuedPredecessors() {
  2. // The correctness of this depends on head being initialized
  3. // before tail and on head.next being accurate if the current
  4. // thread is first in queue.
  5. Node t = tail; // Read fields in reverse initialization order
  6. Node h = head;
  7. Node s;
  8. return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
  9. }

这个方法主要就是判断当前时候是否有在 Node 队列中排队的线程,当满足如下条件时无排队线程:

  • 头指针的指向等于尾指针
  • 头指针的后继为空或头指针的后继不空但不是当前线程

Condition

await()

await()

  1. public final void await() throws InterruptedException {
  2. // Object::wait() 和 await() 都需要抛出 InterruptedException
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. // 将结点加入到 Condition 队列的末尾
  6. // 每一个Condition都有一个Node队列
  7. Node node = addConditionWaiter();
  8. // 这句表示 await() 操作会导致释放资源。返回释放的资源数
  9. int savedState = fullyRelease(node);
  10. int interruptMode = 0;
  11. // 如果不在 Sync 队列上,就要 park 它。
  12. while (!isOnSyncQueue(node)) {
  13. LockSupport.park(this);
  14. // 这里退出有两种情况:unpark 和 interrupt 。
  15. // 如果是 unpark 时,说明是 signal 了,Node 会被转移到 Sync 队列上,可以退出循环
  16. // 如果是 interrupt,进入 checkInterruptWhileWaiting()
  17. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  18. break;
  19. }
  20. // signal:interruptMode 为 0
  21. // interrupt 在 signal 之前:interruptMode 为 THROW_IT
  22. // interrupt 在 signal 之后:interruptMode 为 REINTERRUPT
  23. // 从acquireQueued出来一定是获得资源的
  24. // 如果在获得资源的时候被中断过,修改interruptMode为REINTERRUPT
  25. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  26. interruptMode = REINTERRUPT;
  27. // 如果在等待的过程中被撤销了,取消 CANCELLED 结点
  28. if (node.nextWaiter != null) // clean up if cancelled
  29. unlinkCancelledWaiters();
  30. if (interruptMode != 0)
  31. reportInterruptAfterWait(interruptMode);
  32. }

addConditionWaiter()

  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter;
  3. // 能够进入Condition队列的只有CANCELLED和CONDITION,所以这里是清除 CANCELLED 结点
  4. if (t != null && t.waitStatus != Node.CONDITION) {
  5. unlinkCancelledWaiters();
  6. t = lastWaiter;
  7. }
  8. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  9. if (t == null)
  10. firstWaiter = node;
  11. else
  12. t.nextWaiter = node;
  13. lastWaiter = node;
  14. return node;
  15. }

unlinkCancelledWaiters()

  1. private void unlinkCancelledWaiters() {
  2. Node t = firstWaiter;
  3. Node trail = null;
  4. // 整个的流程就是从前到后把结点过滤一遍,如果是CANCELLED状态结点就从Condition队列移除
  5. while (t != null) {
  6. Node next = t.nextWaiter;
  7. if (t.waitStatus != Node.CONDITION) {
  8. t.nextWaiter = null;
  9. if (trail == null)
  10. firstWaiter = next;
  11. else
  12. trail.nextWaiter = next;
  13. if (next == null)
  14. lastWaiter = trail;
  15. }
  16. else
  17. trail = t;
  18. t = next;
  19. }
  20. }

fullyRelease()

  1. final int fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. int savedState = getState();
  5. // 这个release就是我们之前介绍过EXCLUSIVE模式下的释放资源的方法,其可能产生祖苏
  6. if (release(savedState)) {
  7. failed = false;
  8. return savedState;
  9. } else {
  10. throw new IllegalMonitorStateException();
  11. }
  12. } finally {
  13. // 释放资源失败,就把当前结点标注为CANCELLED结点
  14. if (failed)
  15. node.waitStatus = Node.CANCELLED;
  16. }
  17. }

checkInterruptWhileWaiting()

Checks for interrupt, returning THROW_IE if interrupted before signalled, REINTERRUPT if after signalled, or 0 if not interrupted.

  1. /** Mode meaning to reinterrupt on exit from wait */
  2. private static final int REINTERRUPT = 1;
  3. /** Mode meaning to throw InterruptedException on exit from wait */
  4. private static final int THROW_IE = -1;
  5. private int checkInterruptWhileWaiting(Node node) {
  6. return Thread.interrupted() ?
  7. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
  8. }

transferAfterCancelledWait()

  1. final boolean transferAfterCancelledWait(Node node) {
  2. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  3. enq(node);
  4. return true;
  5. }
  6. while (!isOnSyncQueue(node))
  7. Thread.yield();
  8. return false;
  9. }

当这个方法返回 true 时,返回 THROW_IE,此时有两种情况,一是没有线程和它争抢,也就是没有 signal。二是有 signal,但是当前线程抢夺成功。两种情况都表示 interrupt 在 signal 之前。

reportInterruptAfterWait()

  1. private void reportInterruptAfterWait(int interruptMode)
  2. throws InterruptedException {
  3. if (interruptMode == THROW_IE)
  4. throw new InterruptedException();
  5. else if (interruptMode == REINTERRUPT)
  6. selfInterrupt();
  7. }

如果 interrupt 在 signal 之前就抛出 InterruptedException,如果 interrupt 在 signal 之后,补上一个中断状态。

signal()

signal()

  1. public final void signal() {
  2. // 当前线程自己 signal 是不允许的
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. Node first = firstWaiter;
  6. if (first != null)
  7. doSignal(first);
  8. }

isHeldExclusively()

  1. protected final boolean isHeldExclusively() {
  2. return getExclusiveOwnerThread() == Thread.currentThread();
  3. }

doSignal()

  1. private void doSignal(Node first) {
  2. do {
  3. if ( (firstWaiter = first.nextWaiter) == null)
  4. lastWaiter = null;
  5. first.nextWaiter = null;
  6. } while (!transferForSignal(first) && (first = firstWaiter) != null);
  7. }

transferForSignal()

  1. final boolean transferForSignal(Node node) {
  2. // If cannot change waitStatus, the node has been cancelled.
  3. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  4. return false;
  5. // 结点加入到 Sync 队列中,返回插入后 node 的前驱,即插入前 Sync 队列 的 tail
  6. Node p = enq(node);
  7. int ws = p.waitStatus;
  8. // 如果返回的结点是 CANCELLED 状态,释放 node 的线程
  9. // 如果结点状态被改变,说明其已经被执行完成
  10. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  11. LockSupport.unpark(node.thread);
  12. return true;
  13. }

signalAll()

signalAll()

  1. public final void signalAll() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;
  5. if (first != null)
  6. doSignalAll(first);
  7. }

doSignalAll()

  1. private void doSignalAll(Node first) {
  2. lastWaiter = firstWaiter = null;
  3. // 和doSignal的区别就是这里有一个循环
  4. do {
  5. Node next = first.nextWaiter;
  6. first.nextWaiter = null;
  7. transferForSignal(first);
  8. first = next;
  9. } while (first != null);
  10. }

Semaphore

acquireShared(int)

此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是acquireShared()的源码

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }

这里 tryAcquireShared() 依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里 acquireShared() 的流程就是:

  1. tryAcquireShared() 尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared() 进入等待队列,直到获取到资源为止才返回。

doAcquireShared(int)

  1. private void doAcquireShared(int arg) {
  2. final Node node = addWaiter(Node.SHARED); // 加入队列尾部
  3. boolean failed = true;
  4. try {
  5. boolean interrupted = false; // 等待过程中是否被中断过的标志
  6. for (;;) {
  7. final Node p = node.predecessor();
  8. // 第一个不是真正的线程结点,只是个哨兵
  9. if (p == head) {
  10. int r = tryAcquireShared(arg); // 尝试获取资源
  11. if (r >= 0) { // 成功
  12. // 将head指向自己,还有剩余资源可以再唤醒之后的线程
  13. setHeadAndPropagate(node, r);
  14. p.next = null; // help GC
  15. if (interrupted) // 如果等待过程中被打断过,此时将中断补上。
  16. selfInterrupt();
  17. failed = false;
  18. return;
  19. }
  20. }
  21. // 判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
  22. if (shouldParkAfterFailedAcquire(p, node) &&
  23. parkAndCheckInterrupt())
  24. interrupted = true;
  25. }
  26. } finally {
  27. if (failed)
  28. cancelAcquire(node);
  29. }
  30. }

假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。

setHeadAndPropagate

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head;
  3. setHead(node); // head指向自己
  4. // 如果还有剩余量,继续唤醒下一个邻居线程
  5. if (propagate > 0 || h == null || h.waitStatus < 0) {
  6. Node s = node.next;
  7. if (s == null || s.isShared())
  8. doReleaseShared();
  9. }
  10. }

此方法在 setHead() 的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!doReleaseShared() 我们留着下一小节的 releaseShared() 里来讲。

小结

至此,acquireShared() 也要告一段落了。让我们再梳理一下它的流程:

  • tryAcquireShared() 尝试获取资源,成功则直接返回;
  • 失败则通过 doAcquireShared() 进入等待队列 park(),直到被 unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

其实跟 acquire() 的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作。

releaseShared(int)

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) { // 尝试释放资源
  3. doReleaseShared(); // 唤醒后继结点
  4. return true;
  5. }
  6. return false;
  7. }

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的 release() 相似,但有一点稍微需要注意:独占模式下的 tryRelease() 在完全释放掉资源(state=0)后,才会返回 true 去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的 releaseShared() 则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后 tryReleaseShared(2) 返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2) 返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。而ReentrantReadWriteLock 读锁的 tryReleaseShared() 只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定 tryReleaseShared() 的返回值。

doReleaseShared()

  1. private void doReleaseShared() {
  2. for (;;) {
  3. Node h = head;
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. if (ws == Node.SIGNAL) {
  7. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  8. continue;
  9. unparkSuccessor(h); // 唤醒后继
  10. }
  11. else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  12. continue;
  13. }
  14. if (h == head) // 到达这里是肯定要经过第九行或者 Node 队列里没有结点了,此时需要退出。
  15. break;
  16. }
  17. }

unparkSuccessor

  1. private void unparkSuccessor(Node node) {
  2. int ws = node.waitStatus;
  3. if (ws < 0)
  4. compareAndSetWaitStatus(node, ws, 0);
  5. Node s = node.next;
  6. if (s == null || s.waitStatus > 0) {
  7. s = null;
  8. for (Node t = tail; t != null && t != node; t = t.prev)
  9. if (t.waitStatus <= 0)
  10. s = t;
  11. }
  12. if (s != null)
  13. LockSupport.unpark(s.thread);
  14. }

非公平实现

tryAcquire()

  1. protected int tryAcquireShared(int acquires) {
  2. return nonfairTryAcquireShared(acquires);
  3. }
  1. final int nonfairTryAcquireShared(int acquires) {
  2. for (;;) {
  3. int available = getState();
  4. int remaining = available - acquires;
  5. if (remaining < 0 ||
  6. compareAndSetState(available, remaining))
  7. return remaining;
  8. }
  9. }

上来就用 CAS 抢,抢得到就是我的。

tryReleaseShared()

  1. protected final boolean tryReleaseShared(int releases) {
  2. for (;;) {
  3. int current = getState();
  4. int next = current + releases;
  5. if (next < current) // overflow
  6. throw new Error("Maximum permit count exceeded");
  7. if (compareAndSetState(current, next))
  8. return true;
  9. }

CAS 释放资源。

公平实现

  1. protected int tryAcquireShared(int acquires) {
  2. for (;;) {
  3. if (hasQueuedPredecessors())
  4. return -1;
  5. int available = getState();
  6. int remaining = available - acquires;
  7. if (remaining < 0 ||
  8. compareAndSetState(available, remaining))
  9. return remaining;
  10. }
  11. }

唯一的区别就是如果还有正在排队的结点就返回 -1,表示失败。

AQS细读

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.

AQS提供了一个模板来帮助我们实现同步器。它使用的的主要思想是一个FIFO队列和一个int类型的状态值。子类继承之后需要重写某些方法来按自己的需求改变状态。状态的改变使用compareAndSetStategetStatesetState 三个方法。

Subclasses should be defined as non-public internal helper classes that are used to implement the synchronization properties of their enclosing class. Class AbstractQueuedSynchronizer does not implement any synchronization interface. Instead it defines methods such as acquireInterruptibly that can be invoked as appropriate by concrete locks and related synchronizers to implement their public methods.

子类应该定义非公共内部类来继承AQS。AQS自己定义了同步器方法而不是实现了相应的接口。

This class supports either or both a default exclusive mode and a shared mode. When acquired in exclusive mode, attempted acquires by other threads cannot succeed. Shared mode acquires by multiple threads may (but need not) succeed. This class does not “understand” these differences except in the mechanical sense that when a shared mode acquire succeeds, the next waiting thread (if one exists) must also determine whether it can acquire as well. Threads waiting in the different modes share the same FIFO queue. Usually, implementation subclasses support only one of these modes, but both can come into play for example in a ReadWriteLock. Subclasses that support only exclusive or only shared modes need not define the methods supporting the unused mode.

AQS提供两种同步模式,独占模式和共享模式。这就类似读者写者模式的读写操作,读操作是可以共享的,写操作不能共享。一般一个AQS实现了一个模式,但是ReadWriteLock实现了两种模式。两种模式各自有需要被覆盖的方法,需要实现哪种模式覆盖哪种模式的方法即可。在获取不到资源的时候,两种模式都等在同一个FIFO队列上。

This class defines a nested AbstractQueuedSynchronizer.ConditionObject class that can be used as a Condition implementation by subclasses supporting exclusive mode for which method isHeldExclusively reports whether synchronization is exclusively held with respect to the current thread, method release invoked with the current getState value fully releases this object, and acquire, given this saved state value, eventually restores this object to its previous acquired state. No AbstractQueuedSynchronizer method otherwise creates such a condition, so if this constraint cannot be met, do not use it. The of AbstractQueuedSynchronizer.ConditionObject depends of course on the semantics of its synchronizer implementation.

AbstractQueuedSynchronizer 内定义了 Condition 接口的实现类 ConditionObject 。条件对象是使用在独占模式下的,而且需要实现 isHeldExclusively 方法,因为在 signal 的时候是不能 signal 自己的,需要用此方法做判断。

This class provides inspection, instrumentation, and monitoring methods for the internal queue, as well as similar methods for condition objects. These can be exported as desired into classes using an AbstractQueuedSynchronizer for their synchronization mechanics.

对于内部的同步队列,这个类提供了很多方法以至于可以将他们用到同步器中。

Serialization of this class stores only the underlying atomic integer maintaining state, so deserialized objects have empty thread queues. Typical subclasses requiring serializability will define a readObject method that restores this to a known initial state upon deserialization.

序列化此类的时候只能序列化 state,对于 Node 队列需要重新定义 readObject()

Usage

To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState(), setState() and/or compareAndSetState():

  • tryAcquire()
  • tryRelease()
  • tryAcquireShared()
  • tryReleaseShared()
  • isHeldExclusively()

Each of these methods by default throws UnsupportedOperationException. Implementations of these methods must be internally thread-safe, and should in general be short and not block. Defining these methods is the only supported means of using this class. All other methods are declared final because they cannot be independently varied.

实现同步器需要实现上面相应的方法。这些方法的实现应该是线程安全且非阻塞的。

You may also find the inherited methods from AbstractOwnableSynchronizer useful to keep track of the thread owning an exclusive synchronizer. You are encouraged to use them — this enables monitoring and diagnostic tools to assist users in determining which threads hold locks.
Even though this class is based on an internal FIFO queue, it does not automatically enforce FIFO acquisition policies. The core of exclusive synchronization takes the form:

Acquire:

Release:

(Shared mode is similar but may involve cascading signals.)

  1. while (!tryAcquire(arg)) {
  2. enqueue thread if it is not already queued;
  3. possibly block current thread;
  4. }
  1. if (tryRelease(arg))
  2. unblock the first queued thread;

AQS 继承了 AbstractOwnableSynchronizer(AOS),在实现同步器的时候被鼓励实现 AOS 的方法,这有助于分析工具检测代码的运行。

  1. private transient Thread exclusiveOwnerThread;
  2. protected final void setExclusiveOwnerThread(Thread thread) {
  3. exclusiveOwnerThread = thread;
  4. }
  5. protected final Thread getExclusiveOwnerThread() {
  6. return exclusiveOwnerThread;
  7. }

Because checks in acquire are invoked before enqueuing, a newly acquiring thread may barge ahead of others that are blocked and queued. However, you can, if desired, define tryAcquire and/or tryAcquireShared to disable barging by internally invoking one or more of the inspection methods, thereby providing a fair FIFO acquisition order. In particular, most fair synchronizers can define tryAcquire to return false if hasQueuedPredecessors (a method specifically designed to be used by fair synchronizers) returns true. Other variations are possible.

acquire() 方法中在进队之前会进行权限检查,所以一个新的线程可能比正在阻塞或者正在排队的线程先获得资源。如果需要防止这种事情发生,需要在 tryAcquiretryAcquireShared 中进行自定义,比如同步锁的 hasQueuedPredecessors

Throughput and scalability are generally highest for the default barging (also known as greedy, renouncement, and convoy-avoidance) strategy. While this is not guaranteed to be fair or starvation-free, earlier queued threads are allowed to recontend before later queued threads, and each recontention has an unbiased chance to succeed against incoming threads. Also, while acquires do not “spin” in the usual sense, they may perform multiple invocations of tryAcquire interspersed with other computations before blocking. This gives most of the benefits of spins when exclusive synchronization is only briefly held, without most of the liabilities when it isn’t. If so desired, you can augment this by preceding calls to acquire methods with “fast-path” checks, possibly prechecking hasContended and/or hasQueuedThreads to only do so if the synchronizer is likely not to be contended.

默认情况下的并发性能是最好的,不过不保证公平。早期的线程在重新竞争锁的时候没有偏向。竞争资源时虽然没有显示的自旋,但是 tryAcquire 会被执行多次在阻塞之前,这具有自旋的特点。如果你想增强 acquire 使其具有快速检查的功能,可以调用 hasContendedhasQueuedThreads去查看是否目前有竞争。

This class provides an efficient and scalable basis for synchronization in part by specializing its range of use to synchronizers that can rely on int state, acquire, and release parameters, and an internal FIFO wait queue. When this does not suffice, you can build synchronizers from a lower level using atomic classes, your own custom java.util.Queue classes, and LockSupport blocking support.

Usage Examples

Here is a non-reentrant mutual exclusion lock class that uses the value zero to represent the unlocked state, and one to represent the locked state. While a non-reentrant lock does not strictly require recording of the current owner thread, this class does so anyway to make usage easier to monitor. It also supports conditions and exposes one of the instrumentation methods:

Here is a latch class that is like a CountDownLatch except that it only requires a single signal to fire. Because a latch is non-exclusive, it uses the shared acquire and release methods.

  1. class Mutex implements Lock, java.io.Serializable {
  2. // Our internal helper class
  3. private static class Sync extends AbstractQueuedSynchronizer {
  4. // Reports whether in locked state
  5. protected boolean isHeldExclusively() {
  6. return getState() == 1;
  7. }
  8. // Acquires the lock if state is zero
  9. public boolean tryAcquire(int acquires) {
  10. assert acquires == 1; // Otherwise unused
  11. if (compareAndSetState(0, 1)) {
  12. setExclusiveOwnerThread(Thread.currentThread());
  13. return true;
  14. }
  15. return false;
  16. }
  17. // Releases the lock by setting state to zero
  18. protected boolean tryRelease(int releases) {
  19. assert releases == 1; // Otherwise unused
  20. if (getState() == 0) throw new IllegalMonitorStateException();
  21. setExclusiveOwnerThread(null);
  22. setState(0);
  23. return true;
  24. }
  25. // Provides a Condition
  26. Condition newCondition() { return new ConditionObject(); }
  27. // Deserializes properly
  28. private void readObject(ObjectInputStream s)
  29. throws IOException, ClassNotFoundException {
  30. s.defaultReadObject();
  31. setState(0); // reset to unlocked state
  32. }
  33. }
  34. // The sync object does all the hard work. We just forward to it.
  35. private final Sync sync = new Sync();
  36. public void lock() { sync.acquire(1); }
  37. public boolean tryLock() { return sync.tryAcquire(1); }
  38. public void unlock() { sync.release(1); }
  39. public Condition newCondition() { return sync.newCondition(); }
  40. public boolean isLocked() { return sync.isHeldExclusively(); }
  41. public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  42. public void lockInterruptibly() throws InterruptedException {
  43. sync.acquireInterruptibly(1);
  44. }
  45. public boolean tryLock(long timeout, TimeUnit unit)
  46. throws InterruptedException {
  47. return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  48. }
  49. }
  1. class BooleanLatch {
  2. private static class Sync extends AbstractQueuedSynchronizer {
  3. boolean isSignalled() { return getState() != 0; }
  4. protected int tryAcquireShared(int ignore) {
  5. return isSignalled() ? 1 : -1;
  6. }
  7. protected boolean tryReleaseShared(int ignore) {
  8. setState(1);
  9. return true;
  10. }
  11. }
  12. private final Sync sync = new Sync();
  13. public boolean isSignalled() { return sync.isSignalled(); }
  14. public void signal() { sync.releaseShared(1); }
  15. public void await() throws InterruptedException {
  16. sync.acquireSharedInterruptibly(1);
  17. }
  18. }