JUC 显示锁(Lock)是一种是一种使用纯 Java 语言实现的,非常灵活的锁,可以进行无条件的,可轮询的,定时的,可中断的锁获取和释放操作。

显示锁的由来

在 Java 中,内置锁(synchronized)有虚拟机自动抢占和释放同步对象的监视器(monitor),而且每一个对象都拥有一个监视器,这使得使用 synchronized 变得异常简便。但是 synchronized 的功能比较单一,在一些场景中无法胜任。例如:

  1. 限时抢锁:设置超时时长,如果超个这个时间还是无法获取到锁,则放弃对锁的抢占,不至于无限等待
  2. 可中断抢锁:在抢锁时,外部线程给抢锁线程发一个中断信号,就能唤起等待锁的线程,并终止抢占过程
  3. 多个等待队列:为锁维持多个等待队列,以便提高锁的效率。比如在生产者-消费者模式实现中,生产者和消费者共用一把锁,该锁上维持两个等待队列,即一个生产者队列和一个消费者队列

除此之外,在高并发的场景中,Java 对象锁会膨胀成为重量级锁,而重量级锁的线程阻塞和线程唤醒需要在用户态和内核态来回切换,导致性能低下。因此,显示锁就是为了解决这些问题而生的。JDK1.5 中引入了 Lock 接口,Lock 是 Java 代码级别的锁,也称之为显示锁。

Lock 接口

Lock 接口位于 java.util.concurrent.locks 包中,是 JUC 显式锁的一个抽象。与 synchronized 关键字不同,显式锁不再作为 Java 内置特性来实现,而是作为 Java 语言可编程特性来实现。这就为多种不同功能的锁实现留下了空间,各种锁实现可能有不同的调度算法、性能特性或者锁定语义。

Lock 接口的主要方法如下:

  1. public interface Lock {
  2. /**
  3. * 获取锁,成功则向下运行,失败则自旋检测线程
  4. */
  5. void lock();
  6. /**
  7. * 可中断抢锁,当前线程在抢锁过程中可以响应中断信号
  8. */
  9. void lockInterruptibly() throws InterruptedException;
  10. /**
  11. * 尝试抢锁,线程为非阻塞模式,在调用tryLock()方法后立即返回。若抢锁成功则返回true,否则返回false
  12. */
  13. boolean tryLock();
  14. /**
  15. * 限时抢锁。到达超时时间返回false,也可以相应中断信号
  16. */
  17. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  18. /**
  19. * 释放锁
  20. */
  21. void unlock();
  22. /**
  23. * 获取与显示锁绑定的Condition对象,用于等待-通知方式的线程间通信
  24. */
  25. Condition newCondition();
  26. }

据 Lock 接口提供的方法来看,Lock 锁至少比 synchronized 多出以下优势:

  1. 可中断获取锁

使用 synchronized 关键字获取锁的时候,如果线程没有获取到被阻塞,阻塞期间该线程是不响应中断信号(interrupt)的;而调用 Lock.lockInterruptibly() 方法获取锁时,如果线程被中断,线程将抛出中断异常。

  1. 可非阻塞获取锁

使用 synchronized 关键字获取锁时,如果没有成功获取,线程只有被阻塞;而调用 Lock.tryLock() 方法获取锁时,如果没有获取成功,线程也不会被阻塞,而是直接返回 false。

  1. 可限时获取锁

调用 Lock.tryLock(long time,TimeUnit unit) 方法,显式锁可以设置限定抢占锁的超时时间。而在使用 synchronized 关键字获取锁时,如果不能抢到锁,线程只能无限制阻塞。

可重入锁 ReentrantLock

ReentrantLock 相关的类继承结构如下:

image.png

  1. ReentrantLock 是 JUC 包下提供的 Lock 基础实现类
  2. ReentrantLock 实现了 Lock 接口,所有它拥有和 synchronized 相同的并发性和内存语义,还拥有了限时抢占、可中断抢占等一些高级特性
  3. ReentrantLock 内部基于 Sync 实现了锁功能。sync 继承至 AbstractQueuedSynchronizer(抽象队列同步器,AQS),因此在争用激烈的场景下,能表现出比内置锁更佳的性能

ReentrantLock 是一个可重入的独占锁,其中:

  1. 可重入表示该锁能够支持一个线程对资源的重复加锁,也就是说,一个线程可以多次进入同一个锁所同步的临界区代码块。例如:

    1. lock.lock();
    2. lock.lock();
    3. try {
    4. // 临界区代码
    5. } finally {
    6. lock.unlock();
    7. lock.unlock();
    8. }
  2. 独占表示在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能等待,只有拥有锁的线程释放了锁后,其他的线程才能够获取锁。

基于 Condition 实现等待-通知式线程间通信

等待-通知方式的线程间通信机制,具体来说是指一个线程 A 调用了同步对象的 wait() 方法进入等待状态,而另一线程 B 调用了同步对象的 notify() 或者 notifyAll() 方法去唤醒等待线程,当线程 A 收到线程 B 的唤醒通知后,就可以重新开始执行了。

在 Lock 体系中,等待-通知方式的线程间通信机制可以使用 Condition 接口来实现。Condition 接口的主要方法如下:

  1. public interface Condition {
  2. /**
  3. * 等待,该方法在功能上等同于 Object.wait()
  4. * 是当前线程加入 await() 等待队列中,并释放当前锁
  5. * 当其他线程调用 signal() 时,等待队列中的某个线程会被唤醒,重新去抢所
  6. */
  7. void await() throws InterruptedException;
  8. /**
  9. * 等待,不会响应中断信号
  10. */
  11. void awaitUninterruptibly();
  12. /**
  13. * 超时等待
  14. */
  15. long awaitNanos(long nanosTimeout) throws InterruptedException;
  16. /**
  17. * 超时等待
  18. */
  19. boolean await(long time, TimeUnit unit) throws InterruptedException;
  20. /**
  21. * 定时等待
  22. */
  23. boolean awaitUntil(Date deadline) throws InterruptedException;
  24. /**
  25. * 通知。此方法在功能上与Object.notify功能一致,用来唤醒在await()等待队列中的某一个线程
  26. */
  27. void signal();
  28. /**
  29. * 通知。用来唤醒在await()等待队列中的所有线程,线程唤醒之后再由操作系统进行调度
  30. */
  31. void signalAll();
  32. }

Condition 对象的 signal(通知)方法和同一个对象的 await(等待)方法是一一配对使用的,也就是说,一个 Condition 对象的 signal(或 signalAll)方法不能去唤醒其他 Condition 对象上的 await 线程。

ReentrantLock 的抢锁流程

ReentrantLock 有两种模式:

  • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
  • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的。

ReentrantLock 在同一个时间点只能被一个线程获取,ReentrantLock 是通过一个 FIFO 的等待队列( AQS 队列)来管理获取该锁所有线程的。ReentrantLock 是继承自 Lock 接口实现的独占式可重入锁,与 ReentrantLock 组合一个 AQS 内部实例完成同步操作。

ReentrantLock$NonfairSync 非公平锁抢占流程

ReentrantLock 为非公平锁实现了一个内部的同步器——NonfairSync,其显式锁获取方法 lock() 的源码如下:

  1. static final class NonfairSync extends Sync {
  2. final void lock() {
  3. // 流程1
  4. if (compareAndSetState(0, 1))
  5. setExclusiveOwnerThread(Thread.currentThread());
  6. else
  7. // 流程2
  8. acquire(1);
  9. }
  10. protected final boolean tryAcquire(int acquires) {
  11. return nonfairTryAcquire(acquires);
  12. }
  13. }
  1. 流程1 - 通过 CAS 操作判断 state 的值是不是0(表示当前锁未被占用),如果能够获取到锁,将当前线程设置为独占线程 。由于这里线程一进来就抢锁,完全没有考虑到阻塞队列中是否存在等待的线程,因此这正是非公平的
  2. 流程2 - 当前处于并发状态,已经有线程占用了锁。通过 acquire(int arg) 方法再次获取,获取失败则进入等待队列

下面进入 AQS 的 acquire(int arg) 方法:

  1. public final void acquire(int arg) {
  2. // tryAcquire(int acquires) - 尝试获取锁
  3. if (!tryAcquire(arg) &&
  4. // addWaiter(Node mode) - 创建节点并入队
  5. // acquireQueued(final Node node, int arg) - 阻塞线程
  6. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  7. selfInterrupt();
  8. }

tryAcquire(int) - 尝试获取锁

tryAcquire(int arg) 是一个钩子方法,ReentrantLock$NonfairSync 重写了这个方法,实现了自己的抢锁逻辑。代码如下:

  1. // ReentrantLock$NonfairSync
  2. protected final boolean tryAcquire(int acquires) {
  3. return nonfairTryAcquire(acquires);
  4. }
  5. // ReentrantLock
  6. final boolean nonfairTryAcquire(int acquires) {
  7. final Thread current = Thread.currentThread();
  8. int c = getState();
  9. // 流程1.1 当前锁未被占用
  10. if (c == 0) {
  11. if (compareAndSetState(0, acquires)) {
  12. setExclusiveOwnerThread(current);
  13. return true;
  14. }
  15. }
  16. // 流程1.2 当前线程是锁的持有者
  17. else if (current == getExclusiveOwnerThread()) {
  18. int nextc = c + acquires;
  19. if (nextc < 0) // overflow
  20. throw new Error("Maximum lock count exceeded");
  21. setState(nextc);
  22. return true;
  23. }
  24. // 流程1.3
  25. return false;
  26. }
  1. 流程1.1 - 首先检查锁状态,如果没有线程持有这把锁,即调用 compareAndSetState(int except, int update) 抢占锁,抢占成功则设置当前线程为锁的持有者并且返回 true,否则返回 false
  2. 流程1.2 - 如果当前线程正好是锁的持有者(重入),则将 state + 1 并且返回 true。需要注意的是, state 的值溢出(nextc < 0)会抛出异常
  3. 流程1.3 - 当前线程既不是当前锁的持有者,又不能抢占到锁,返回 false,之后将其放入到 AQS 阻塞队列

addWaiter(Node) - 创建节点并入队

该方法为当前线程创建一个 Node 节点并将节点放入到双向链表的尾节点。注意:此时只是把 Thread 放入了队列当中而已,线程并没有阻塞。

  1. private Node addWaiter(Node mode) {
  2. // 为当前线程创建节点,mode = Node.EXCLUSIVE
  3. Node node = new Node(Thread.currentThread(), mode);
  4. Node pred = tail;
  5. if (pred != null) { // 已经有线程进入阻塞状态
  6. // 将node作为链表的尾节点,上一个尾节点作为它的前节点
  7. node.prev = pred;
  8. // CAS 操作设置尾节点
  9. if (compareAndSetTail(pred, node)) {
  10. pred.next = node;
  11. return node;
  12. }
  13. }
  14. // 当前同步队列并不存在,创建节点
  15. enq(node);
  16. return node;
  17. }
  18. private Node enq(final Node node) {
  19. for (;;) {
  20. Node t = tail;
  21. if (t == null) {
  22. // 通过 CAS 操作初始化头结点 head,如果后来的线程先完成了该初始化流程,该 CAS 操作
  23. // 就会失败,再次进入 for 循环,从而通过 else 分支将线程入队
  24. if (compareAndSetHead(new Node()))
  25. tail = head;
  26. } else {
  27. node.prev = t;
  28. // 通过 CAS 操作初始化尾结点 tail,避免并发情况下后来者线程先初始化了链表
  29. if (compareAndSetTail(t, node)) {
  30. t.next = node;
  31. return t;
  32. }
  33. }
  34. }
  35. }

acquireQueued(Node, int ) - 自旋抢占

在 addWaiter(Node) 方法中,将线程封装称为 Node 对象并加入到 AQS 双向链表中,但是该线程并没有进入阻塞状态。该方法就是完成线程的阻塞的。

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. // 线程是否被中断
  5. boolean interrupted = false;
  6. // 流程1 死循环自旋检查当前节点的前驱结点是否是头结点
  7. for (;;) {
  8. // 获取上一个节点
  9. final Node p = node.predecessor();
  10. // 流程2
  11. // 只有上一个是头结点且获取到锁才会执行当前线程。为什么上一个节点是头结点,因为队列
  12. // 是从尾节点插入,头结点取出,所以每次唤醒都是拿到的头结点的线程执行,避免队列中
  13. // 其他线程无意义的做 CAS 自旋
  14. if (p == head && tryAcquire(arg)) {
  15. // 设置当前节点为头结点
  16. setHead(node);
  17. // 上一个节点是头结点,置为null是表示与链表脱离开,方便GC回收
  18. p.next = null; // help GC
  19. failed = false;
  20. return interrupted;
  21. }
  22. // 流程3 检查前一个节点的状态,判断当前获取锁失败的线程是否需要挂起,如果需要挂起,
  23. // 调用 parkAndCheckInterrupt() 方法挂起线程,直到被唤醒
  24. // parkAndCheckInterrupt() - 线程挂起
  25. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
  26. interrupted = true;
  27. }
  28. } finally {
  29. if (failed)
  30. // 流程4 等待过程中没有获取到锁,取消请求,将节点从队列中移除
  31. // cancelAcquire(Node) - 取消请求
  32. cancelAcquire(node);
  33. }
  34. }
  1. 流程1 - 当前 Node 节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋
  2. 流程2 - 有当前驱节点是头节点时才能尝试获取锁,原因是:
    1. 头节点是成功获取锁的节点,而头节点的线程释放了锁以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点
    2. 维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行 for 死循环
  3. 流程3 - 检查前一个节点的状态,判断当前获取锁失败的线程是否需要挂起,如果需要挂起, 调用 parkAndCheckInterrupt() 方法挂起线程,直到被唤醒。这样就不会因为执行无效的循环导致 CPU 的资源浪费
  4. 流程4 - 如果等待过程中没有获取到锁,则取消请求,并且将节点从队列中移除

shouldParkAfterFailedAcquire(Node, Node) - 挂起预判

  1. // 检查和更新未能获取的节点的状态
  2. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  3. int ws = pred.waitStatus; // 获取前驱结点的状态
  4. if (ws == Node.SIGNAL) // 如果前驱结点的状态为SIGNAL(-1),则直接返回true
  5. return true;
  6. if (ws > 0) { // 前驱结点所指向的线程已经取消,CANCELLED(1)
  7. // 不断循环,知道找到有效的前驱结点,即非 CANCELLED 状态的节点
  8. do {
  9. node.prev = pred = pred.prev;
  10. } while (pred.waitStatus > 0);
  11. // 调整前驱结点的next指针
  12. pred.next = node;
  13. } else {
  14. /*
  15. * 如果前驱结点既不是SIGNAL,也不是CANCELLED,就设置为SIGNAL
  16. */
  17. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  18. }
  19. return false;
  20. }

shouldParkAfterFailedAcquire(Node, Node) 方法是在 acquireQueued(Node, int) 方法的死循环中被调用的,由于此方法返回 false 时 acquireQueued(Node, int) 不会阻塞当前线程,只有此方法返回 true 时当前线程才阻塞,因此在一般情况下,此方法至少需要执行两次,当前线程才会被阻塞。

在第一次进入此方法时,首先会进入后一个 if 判断的 else 分支,通过 CAS 设置 pred 前驱的 waitStatus 为 SIGNAL,然后返回 false。此方法返回 false 之后,获取独占锁的 acquireQueued(Node, int) 方法会继续进行 for 循环去抢锁:

  1. 假设 node 的前驱节点是头节点,tryAcquire(int) 抢锁成功,则获取到锁
  2. 假设 node 的前驱节点不是头节点,或者 tryAcquire(int) 抢锁失败,仍然会再次调用此方法

第二次进入此方法时,由于上一次进入时已经将 pred.waitStatus 设置为 SIGNAL(-1)了,因此这次会进入第一个判断条件,直接返回 true,表示应该调用 parkAndCheckInterrupt() 阻塞当前线程,等待前一个节点执行完成之后唤醒。

注意:SIGNAL 标记表示后继节点处于阻塞状态,所以**shouldParkAfterFailedAcquire(Node, Node)** 方法会先将前驱结点的状态设置为 SIGNAL,然后再阻塞当前线程

parkAndCheckInterrupt() - 线程挂起

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);
  3. return Thread.interrupted();
  4. }

主要做两件事:

  1. 通过 LockSupport.park(this) 挂起当前线程,底层是通过 Unsafe#park(boolean, long) 来实现线程挂起功能的
  2. 检查当前线程的中断信号。注意:Thread.interrupted() 是会重置中断信号的

由此可见,虽然 acquireQueued(Node, int) 不会阻塞当前线程,但是却会记录下阻塞过程中是否接收到中断信号,如果接收到中断信号,才会回调到以下代码:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. // acquireQueued 的返回值表示是否接收到中断信号
  4. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  5. // selfInterrupt() - 线程自我中断
  6. selfInterrupt();
  7. }

selfInterrupt() - 线程自我中断

  1. static void selfInterrupt() {
  2. Thread.currentThread().interrupt();
  3. }

cancelAcquire(Node) - 取消请求

  1. private void cancelAcquire(Node node) {
  2. if (node == null)
  3. return;
  4. node.thread = null;
  5. Node pred = node.prev; // 检查当前节点的前驱结点的状态
  6. while (pred.waitStatus > 0) // 根据前驱结点的状态,找到最近的有效的(非 CANCELLED)的节点
  7. node.prev = pred = pred.prev;
  8. // predNext 是要取消的明显节点
  9. Node predNext = pred.next;
  10. // 设置当前节点的状态为CANCELLED
  11. node.waitStatus = Node.CANCELLED;
  12. // 流程1 设置链表的tail为刚才找到的有效的前驱结点
  13. if (node == tail && compareAndSetTail(node, pred)) {
  14. // 设置pred的next节点为null
  15. compareAndSetNext(pred, predNext, null);
  16. } else {
  17. // If successor needs signal, try to set pred's next-link
  18. // so it will get one. Otherwise wake it up to propagate.
  19. int ws;
  20. // 流程2
  21. if (pred != head &&
  22. ((ws = pred.waitStatus) == Node.SIGNAL ||
  23. (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
  24. pred.thread != null) {
  25. Node next = node.next;
  26. // 断链,将node的next节点赋给prev作为其next节点
  27. if (next != null && next.waitStatus <= 0)
  28. compareAndSetNext(pred, predNext, next);
  29. } else {
  30. // 流程3
  31. unparkSuccessor(node);
  32. }
  33. node.next = node; // help GC
  34. }
  35. }
  36. private void unparkSuccessor(Node node) {
  37. int ws = node.waitStatus;
  38. if (ws < 0)
  39. // 重置node的状态
  40. compareAndSetWaitStatus(node, ws, 0);
  41. // s 作为node的后继节点
  42. Node s = node.next;
  43. // 以防s节点为null或者已取消,从tail一直往前遍历,找到一个最接近node的合适的(非CANCELLED状态)的节点
  44. if (s == null || s.waitStatus > 0) {
  45. s = null;
  46. for (Node t = tail; t != null && t != node; t = t.prev)
  47. if (t.waitStatus <= 0)
  48. s = t;
  49. }
  50. // 唤醒该节点,让其线程继续运行
  51. if (s != null)
  52. LockSupport.unpark(s.thread);
  53. }
  1. 流程1 - 如果要取消的节点是尾节点,则将 node->prev 设置为尾节点,并将 node->prev 的 next 节点通过 CAS 操作设置为null
  2. 流程2 - 如果 node->prev 节点不是头结点,且 node->prev 的状态为 SIGNAL 或者通过 CAS 操作可设置为 SIGNAL,且 node->prev 包含的线程存在,则将node->next设置为 node->prev->next,相当于将 node 从链表中断开。也就是说,如果上一个节点还在阻塞,直接断链就行,否则的话,唤醒 node->next 去争夺 CPU 时间片来运行
  3. 流程3 - 从 tail 往前遍历,找到离 node 最近的且为有效状态节点,唤醒其线程,继续执行。避免此时上一个线程执行完毕

总之,cancelAcquire(Node) 就是要将当前线程从链表中取消,并且将链表重新续接上。

小结

非公平锁的抢锁过程如下:

  1. 线程首先会通过 tryAcquire(int) 来尝试获取锁,不用管等待队列中是否有线程在等待;
  2. 如果获取不到锁,则调用 addWaiter(Node) 创建节点并且追加在队列的尾部,然后通过调用 acquireQueued(Node, int) 进入无限 for 循环自旋抢占,在 parkAndChechInterrupt() 方法中通过 LockSupport#park() 方法阻塞自身线程。之所以acquireQueued(Node, int) 使用的是死循环,就是因为 LockSupport#park() 方法会响应中断唤醒,再次循环就会自己阻塞自己,直到锁被释放且线程竞争到了锁资源
  3. 线程在阻塞期间收到中断信号,唤醒之后会通过 selfInterrupt() 方法再次自己中断自己。之所以要这么做,是因为在收到其他线程的中断信号之后没有及时响应(acquireQueued 是死循环),现在要进行补偿

ReentrantLock$FairSync 公平锁抢占流程

tryAcquire(int) - 尝试获取锁

同 ReentrantLock$NonfairSync 的开始一样,都会调用到 acqueri(int)方法:

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

不同的是,tryAcquire(int) 是自己的实现:

  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3. int c = getState();
  4. if (c == 0) {
  5. // hasQueuedPredecessors() - 检查等待队列
  6. if (!hasQueuedPredecessors() &&
  7. compareAndSetState(0, acquires)) {
  8. setExclusiveOwnerThread(current);
  9. return true;
  10. }
  11. }
  12. else if (current == getExclusiveOwnerThread()) {
  13. int nextc = c + acquires;
  14. if (nextc < 0)
  15. throw new Error("Maximum lock count exceeded");
  16. setState(nextc);
  17. return true;
  18. }
  19. return false;
  20. }
  21. }

hasQueuedPredecessors() - 检查等待队列

  1. public final boolean hasQueuedPredecessors() {
  2. Node t = tail;
  3. Node h = head;
  4. Node s;
  5. return h != t &&
  6. ((s = h.next) == null || s.thread != Thread.currentThread());
  7. }

该方法是实现公平锁的核心代码:

  1. h == t 表示当前 AQS 队列且空,直接返回 false
  2. h != t && s == null 表示当前 AQS 的队列只有一个元素,也就是队列的头结点,返回 true
  3. h != t && (s == null || s.thread != Thread.currentThread()) 表示队列中的第一个线程不是当前线程,说明此时队列中有线程在等待,为了达到公平,当前线程不应该抢占锁,而是进入队列等到。结果返回 true

小结

公平锁的实现在为线程创建节点并且入队、自旋抢占、自我阻塞,中断补救等方面都与非公平锁一致,唯一不同的是在尝试获取锁(tryAcquire(int))时,公平锁会先检测 AQS 同步队列中是否已经有线程在等待,如果存在,则不会通过 CAS 操作进行抢锁,而是进入等到队列中,直到被唤醒。

ReentrantLock 释放锁流程

ReentrantLock 的锁释放流程是通过调用 unlock() 方法来实现的:

  1. // ReentrantLock.java
  2. public void unlock() {
  3. sync.release(1);
  4. }
  5. // AbstractQueuedSynchronizer.java
  6. public final boolean release(int arg) {
  7. // tryRelease(int) - 钩子函数:尝试释放锁
  8. if (tryRelease(arg)) {
  9. Node h = head;
  10. if (h != null && h.waitStatus != 0)
  11. // unparkSuccessor(Node) - 唤醒后继线程
  12. unparkSuccessor(h);
  13. return true;
  14. }
  15. return false;
  16. }

tryRelease(int) - 钩子函数:尝试释放锁

tryRelease(int) 方法对于公平锁和非公平锁来说都是同一个实现,因此真正的实现代码是在 ReentrantLock 中:

  1. protected final boolean tryRelease(int releases) {
  2. // 流程1 计算锁状态,重入会导致state的值大于1
  3. int c = getState() - releases;
  4. // 流程2 判断需要释放锁的当前线程是否是持有锁的线程,不是则抛出异常
  5. if (Thread.currentThread() != getExclusiveOwnerThread())
  6. throw new IllegalMonitorStateException();
  7. boolean free = false;
  8. // 流程3 只有重入的次数为0,表示当前锁可用,才会将持有锁的线程置空,在其他线程抢占
  9. // 到锁之后再重新设置
  10. if (c == 0) {
  11. free = true;
  12. setExclusiveOwnerThread(null);
  13. }
  14. setState(c);
  15. return free;
  16. }
  1. 流程1 - 由于 ReentrantLock 是重入锁,在 tryAcquire(int) - 尝试获取锁 中对重入的逻辑处理为 state 自增,表示冲入的次数。相应地,释放锁也需要多次释放,次数与重入次数一样,才能完全释放锁
  2. 流程2 - 释放锁的操作只能在加锁线程执行,否则会触发 IllegalMonitorStateException 异常
  3. 只有 state 的值为 0(表示锁已经全部释放),才会将重置锁的线程拥有者实例

unparkSuccessor(Node) - 唤醒后继线程

在线程释放锁之后,需要唤醒下一个线程。唤醒操作由方法 unparkSuccessor(Node) 实现:

  1. private void unparkSuccessor(Node node) {
  2. /*
  3. * 获取节点状态,该节点也就是释放锁的节点,也是头结点
  4. */
  5. int ws = node.waitStatus;
  6. // CANCELLLED(1),SIGNAL(-1),CONDITION(-2),PROPAGATE(-3)
  7. // 若节点状态小于0,则将其值为0,表示初始状态
  8. if (ws < 0)
  9. compareAndSetWaitStatus(node, ws, 0);
  10. // 找到后继节点
  11. Node s = node.next;
  12. // 如果新节点已经被取消,对应状态为CANCELLED(1)
  13. if (s == null || s.waitStatus > 0) {
  14. s = null;
  15. // 从队列尾部开始往前遍历,找到最前面一个状态小于0的且最接近当前节点的节点(有效节点)
  16. for (Node t = tail; t != null && t != node; t = t.prev)
  17. if (t.waitStatus <= 0)
  18. s = t;
  19. }
  20. // 唤醒后继节点
  21. if (s != null)
  22. LockSupport.unpark(s.thread);
  23. }

unparkSuccessor() 唤醒后继节点的线程后,后继节点的线程重新执行方法 acquireQueued(Node, int) 中的自旋抢占逻辑。

注意:当 AQS 头节点释放锁之后,头节点的状态变成初始状态,此节点理论上需要从队列中移除,但是此时该无效节点并没有立即被移除,unparkSuccessor() 方法并没有立即从队列中删除该无效节点,仅仅唤醒了后继节点的线程,重启了后继节点的自旋抢锁。然后在 acquireQueued(Node, int) 方法中移除掉该节点 。

lockInterruptibly() 可中断抢锁流程

使用 lock() 方法抢锁并不能及时响应中断信号,而是记录下中断信号,在线程唤醒之后再次触发中断,相当于一个中断信号的补救措施。但是 Lock 接口也提供了一个可响应中断信号的方法,那就是 lockInterruptibly()

  1. public void lockInterruptibly() throws InterruptedException {
  2. sync.acquireInterruptibly(1);
  3. }
  4. // AbstractQueuedSynchronizer.java
  5. public final void acquireInterruptibly(int arg)
  6. throws InterruptedException {
  7. // 先检查线程的中断状态,如果处于中断状态,则直接抛出 InterruptedException 异常
  8. if (Thread.interrupted())
  9. throw new InterruptedException();
  10. if (!tryAcquire(arg))
  11. // doAcquireInterruptibly(int) - 可中断抢占锁流程
  12. doAcquireInterruptibly(arg);
  13. }

先检查线程的中断状态,如果处于中断状态,则直接抛出 InterruptedException 异常。如果线程没有接收到中断信号,则进入可中断抢占锁流程。

doAcquireInterruptibly(int) - 可中断抢占锁流程

  1. private void doAcquireInterruptibly(int arg) throws InterruptedException {
  2. final Node node = addWaiter(Node.EXCLUSIVE);
  3. boolean failed = true;
  4. try {
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. // 区别点
  16. throw new InterruptedException();
  17. }
  18. } finally {
  19. if (failed)
  20. cancelAcquire(node);
  21. }
  22. }

该方法与 acquireQueued(Node, int) 不同的是,acquireQueued(Node, int) 如果通过 parkAndCheckInterrupt() 检测到中断信号,只是会返回一个为 true 的标志位,之后有线程自己调用自己的 interrupt() 方法给自己发起中断;但是 doAcquireInterruptibly(int) 则是在检测到中断信号之后直接抛出 InterruptedException 异常。这也正是 lockInterruptibly() - 可中断抢占锁流程的核心所在。

Condition 基本原理

Condition 是 JUC 用来替代传统 Object 的 wait()/notify() 线程间通信与协作机制的新组件,相比调用Object 的 wait()/notify(),调用 Condition 的 await()/signal() 这种方式实现线程间协作更加高效。因为 Object 的 wait()/notify() 由 JVM 实现,需要进行内核态和用户态之间的切换,而 Condition 的 await()/signal() 由纯 Java 代码执行,在 Java 层进行自旋,比较高效。

ConditionObject 是实现条件队列的关键,每个 ConditionObject 都维护了一个单独的条件等待队列,分别记录了该队列的头结点与尾节点。

  1. public class ConditionObject implements Condition, java.io.Serializable {
  2. /** First node of condition queue. */
  3. private transient Node firstWaiter;
  4. /** Last node of condition queue. */
  5. private transient Node lastWaiter;
  6. ....
  7. }

在一个锁(Lock)上,可以创建多个不同的条件等待队列:

  1. private Lock lock = new ReentrantLock();
  2. private Condition first = lock.newCondition();
  3. private Condition second = lock.newCondition();

Condition 条件队列与 AQS 同步队列的关系如下:

Java并发编程-ReentrantLock - 图2
Condition 条件队列是单向的,而 AQS 同步队列是双向的,AQS 节点会有前驱指针。一个 AQS 实例可以有多个条件队列,是聚合关系;但是一个 AQS 实例只有一个同步队列,是逻辑上的组合关系。

聚合关系强调是“整体”包含“部分”,但是“部分”可以脱离“整体”而单独存在。 组合关系也是强调整体与部分的关系,但是部分不能脱离整体而存在。

await() - 等待

当线程调用 await() 方法时,说明当前线程的节点为当前 AQS 队列的头节点,正好处于占有锁的状态,await() 方法需要把该线程从 AQS 队列挪到 Condition 等待队列里。

Java并发编程-ReentrantLock - 图3

注意:在 await() 方法将当前线程挪动到 Condition 等待队列后,还会唤醒 AQS 同步队列中 head 节点的下一个节点。

  1. public final void await() throws InterruptedException {
  2. // 线程处于中断状态,直接抛出中断异常
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. // addConditionWaiter() - 创建Node节点并入队
  6. Node node = addConditionWaiter();
  7. // fullyRelease(node) - 释放锁
  8. int savedState = fullyRelease(node);
  9. int interruptMode = 0;
  10. // isOnSyncQueue(Node) - 节点是否还在同步队列中等待
  11. while (!isOnSyncQueue(node)) {
  12. // 线程自我挂起
  13. LockSupport.park(this);
  14. //
  15. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  16. break;
  17. }
  18. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  19. interruptMode = REINTERRUPT;
  20. if (node.nextWaiter != null) // clean up if cancelled
  21. unlinkCancelledWaiters();
  22. if (interruptMode != 0)
  23. reportInterruptAfterWait(interruptMode);
  24. }
  1. 创建节点并放入 Condition 队列尾部
  2. 释放 AQS 锁,并唤醒 AQS 同步队列中的头结点的后一个节点
  3. 执行 while 循环,将该节点的线程阻塞,知道该节点离开 Condition 队列,重新回到同步队列,线程才退出 while 循环
  4. 退出 while 循环后,调用 acquireQueued(Node, int) 尝试拿锁,拿不到锁进入 AQS 同步队列

addConditionWaiter() - 创建Node节点并入队

  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter;
  3. if (t != null && t.waitStatus != Node.CONDITION) {
  4. unlinkCancelledWaiters();
  5. t = lastWaiter;
  6. }
  7. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  8. if (t == null)
  9. firstWaiter = node;
  10. else
  11. t.nextWaiter = node;
  12. lastWaiter = node;
  13. return node;
  14. }

该方法的作用是为当前线程创建一个 Node 节点并追加到当前 ConditionObject 的条件队列上,在此期间,会将该条件链表中无效节点通过 unlinkCancelledWaiters() 方法清除掉。

注意:线程调用 await() 的时候,肯定已经获取到了锁。所以这里对链表的操作不需要 CAS。

unlinkCancelledWaiters() - 清除条件队列中已取消的节点

  1. private void unlinkCancelledWaiters() {
  2. // 头结点
  3. Node t = firstWaiter;
  4. Node trail = null;
  5. // 步骤1
  6. while (t != null) {
  7. // 步骤2
  8. Node next = t.nextWaiter;
  9. // 步骤3
  10. if (t.waitStatus != Node.CONDITION) {
  11. // 步骤4
  12. t.nextWaiter = null;
  13. // 步骤5
  14. if (trail == null) {
  15. firstWaiter = next;
  16. } else {
  17. // 步骤6
  18. trail.nextWaiter = next;
  19. }
  20. 步骤7
  21. if (next == null) {
  22. lastWaiter = trail;
  23. }
  24. } else {
  25. // 步骤8 trail记录状态为CONDITION的节点
  26. trail = t;
  27. }
  28. t = next;
  29. }
  30. }

该方法的作用是清除掉 ConditionObject 中队列的无效状态(CANCELLED)节点。该队列是一个单项队列,采取的方案是从头结点往后遍历整个链表,遍历过程中会把所有无效节点清除。整个过程中:

  • t 表示当前节点,也正是判断状态的节点
  • trail 表示的是清除无效节点之后的有效链表的尾节点,遍历到最后一个节点时会赋值给 lastWaiter
  1. 步骤1 - 方法入口获取到链表头结点,然后将头结点赋值给 t,进入 while 循环,该循环的结束条件是遍历到尾节点,因为尾节点的 nextWaiter 为 null;
  2. 步骤2 - 获取到当前节点的 nextWaiter,赋值给 next
  3. 步骤3 - 如果当前节点 t 的状态不是 CONDITION,表示该节点已经取消(CANCELLED),因此需要将这个节点从链表中移除掉
    1. 步骤4 - 从链表中移除当前节点,具体实现为:t.nextWaiter = null,表示与整个链表脱离开
    2. 步骤5 - trail == null,给 firstWaiter 赋值为 t.nextWaiter ,这里是假设 t.nextWaiter 的状态是有效的,无效的话下次会再次覆盖。在步骤-8中,会将有效状态的节点赋值给 trail,此时的节点 t 正是上一步的 next,也就是上一次给 firstWaiter 赋值的 t.nextWaiter。因此,这里的 firstWaiter 赋值总是有效的
    3. 步骤6 - 同步骤5一样,总是假设下一个节点是有效的,将下一个有效节点链接到 firstWaiter 所在的有效节点链表中
    4. 步骤7 - next == null 表示节点遍历结束,将最后一个有效状态的节点赋值给 tail。至此,整个链表的无效节点清除完毕
  4. 步骤8 - trail 总是记录当前有效状态的节点信息

fullyRelease(node) - 释放锁

  1. final int fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. int savedState = getState();
  5. // 一次性将重入锁全部释放掉
  6. if (release(savedState)) {
  7. failed = false;
  8. return savedState;
  9. } else {
  10. throw new IllegalMonitorStateException();
  11. }
  12. } finally {
  13. if (failed)
  14. node.waitStatus = Node.CANCELLED;
  15. }
  16. }

该方法的作用是用来释放锁,与普通锁释放操作不一样的是,这里显示通过 getState() 方法获取到重入次数,然后将结果传入 release 方法中,表示将该锁一次性全部释放。还将释放的次数作为返回值返回。

注意:

condition 的使用场景如下:

  1. lock.lock();
  2. System.out.println("wait start");
  3. condition.await();
  4. System.out.println("wait end");
  1. 该方法会先释放锁,因为 await() 方法是在 lock() 方法之后调用,lock() 方法会通过 CAS 操作拿到锁。在 await() 方法中会调用 LockSupport.park(this) 将线程阻塞,如果没有在阻塞之前释放锁,会导致其他线程获取不到锁(当前线程持有锁,并且阻塞在这里),进而会导致死锁的情况
  2. release(int) 方法中,还会唤醒队列中的下一个线程

isOnSyncQueue(Node) - 节点是否还在同步队列中等待

  1. while (!isOnSyncQueue(node)) {
  2. // 线程自我挂起
  3. LockSupport.park(this);
  4. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  5. break;
  6. }
  • isOnSyncQueue(Node) 方法用来判断该 Node 是否在 AQS 队列中
  • checkInterruptWhileWaiting(Node) 方法会检查中断信号,并作相应处理
  1. final boolean isOnSyncQueue(Node node) {
  2. if (node.waitStatus == Node.CONDITION || node.prev == null)
  3. return false;
  4. if (node.next != null) // If has successor, it must be on queue
  5. return true;
  6. return findNodeFromTail(node);
  7. }
  8. private boolean findNodeFromTail(Node node) {
  9. Node t = tail;
  10. for (;;) {
  11. if (t == node)
  12. return true;
  13. if (t == null)
  14. return false;
  15. t = t.prev;
  16. }
  17. }

该方法用来判断该 Node 是否在 AQS 队列中,初始的时候,Node 只存在于 ConditionObject 队列中。在执行 signal() 操作之后,调用 transferForSignal(Node) -> enq(Node) 方法将 Node 会放进 AQS 的同步队列中。

checkInterruptWhileWaiting(Node) - 检查线程中断状态

  1. private int checkInterruptWhileWaiting(Node node) {
  2. // THROW_IE:-1
  3. // REINTERRUPT:1
  4. return Thread.interrupted() ?
  5. (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
  6. }
  7. final boolean transferAfterCancelledWait(Node node) {
  8. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  9. enq(node);
  10. return true;
  11. }
  12. while (!isOnSyncQueue(node))
  13. Thread.yield();
  14. return false;
  15. }

checkInterruptWhileWaiting(Node) 方法在 park() 方法之后调用,是因为线程从 park() 状态醒来时有两种可能:

  1. 其他线程调用了 unpark(Thread) 方法进行唤醒
  2. 收到中断信号唤醒

所以,这里需要检测中断信号,当发现自己是被中断唤醒的,而不是被 unpark(Thread) 唤醒的,会直接退出循环,await() 方法也会返回。

小结

调用 Condition#await() 将当前线程阻塞在 while 循环中,直到调用 Condition#signal() 唤醒线程,将当前节点加入到 AQS 同步队列中,然后退出 while 循环。

awaitUninterruptibly() - 非中断等待

  1. public final void awaitUninterruptibly() {
  2. Node node = addConditionWaiter();
  3. int savedState = fullyRelease(node);
  4. boolean interrupted = false;
  5. while (!isOnSyncQueue(node)) {
  6. LockSupport.park(this);
  7. // 重点
  8. if (Thread.interrupted())
  9. interrupted = true;
  10. }
  11. if (acquireQueued(node, savedState) || interrupted)
  12. selfInterrupt();
  13. }

该方法的最核心的部分则是 while 循环中收到中断信息之后不会退出 while 循环,而是设置标志位,继续循环。

signal() - 唤醒

  1. public final void signal() {
  2. // 只有持有锁的线程才能够调用 signal() 方法
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. Node first = firstWaiter;
  6. if (first != null)
  7. doSignal(first);
  8. }
  9. // 唤醒队列中的第一个线程
  10. private void doSignal(Node first) {
  11. do {
  12. if ( (firstWaiter = first.nextWaiter) == null)
  13. lastWaiter = null; // 如果第二个节点为null,表示尾节点也为空
  14. //将 node从Condition队列移除
  15. first.nextWaiter = null;
  16. } while (!transferForSignal(first) &&
  17. (first = firstWaiter) != null);
  18. }
  19. // 将节点从条件队列转移到同步队列,成功则返回true
  20. final boolean transferForSignal(Node node) {
  21. // 重置node的状态
  22. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  23. return false;
  24. // node节点加入AQS同步队列,并获取AQS队列的前驱结点
  25. Node p = enq(node);
  26. int ws = p.waitStatus;
  27. // ws > 0 即是CANCELLED状态
  28. // 设置前驱节点为Signal状态失败
  29. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  30. LockSupport.unpark(node.thread);
  31. return true;
  32. }
  1. 通过 enq() 方法自旋将条件队列中的头节点放入 AQS 同步队列尾部,并获取它在 AQS 队列中的前驱节点
  2. 如果前驱节点的状态是取消状态,或者设置前驱节点为 Signal 状态失败,就唤醒当前节点的线程;否则节点在同步队列的尾部,参与排队
  3. 同步队列中的线程被唤醒后,表示重新获取了锁,然后继续执行 Condition#await() 方法的临界区代码