JUC 显示锁(Lock)是一种是一种使用纯 Java 语言实现的,非常灵活的锁,可以进行无条件的,可轮询的,定时的,可中断的锁获取和释放操作。
显示锁的由来
在 Java 中,内置锁(synchronized)有虚拟机自动抢占和释放同步对象的监视器(monitor),而且每一个对象都拥有一个监视器,这使得使用 synchronized 变得异常简便。但是 synchronized 的功能比较单一,在一些场景中无法胜任。例如:
- 限时抢锁:设置超时时长,如果超个这个时间还是无法获取到锁,则放弃对锁的抢占,不至于无限等待
- 可中断抢锁:在抢锁时,外部线程给抢锁线程发一个中断信号,就能唤起等待锁的线程,并终止抢占过程
- 多个等待队列:为锁维持多个等待队列,以便提高锁的效率。比如在生产者-消费者模式实现中,生产者和消费者共用一把锁,该锁上维持两个等待队列,即一个生产者队列和一个消费者队列
除此之外,在高并发的场景中,Java 对象锁会膨胀成为重量级锁,而重量级锁的线程阻塞和线程唤醒需要在用户态和内核态来回切换,导致性能低下。因此,显示锁就是为了解决这些问题而生的。JDK1.5 中引入了 Lock 接口,Lock 是 Java 代码级别的锁,也称之为显示锁。
Lock 接口
Lock 接口位于 java.util.concurrent.locks 包中,是 JUC 显式锁的一个抽象。与 synchronized 关键字不同,显式锁不再作为 Java 内置特性来实现,而是作为 Java 语言可编程特性来实现。这就为多种不同功能的锁实现留下了空间,各种锁实现可能有不同的调度算法、性能特性或者锁定语义。
Lock 接口的主要方法如下:
public interface Lock {/*** 获取锁,成功则向下运行,失败则自旋检测线程*/void lock();/*** 可中断抢锁,当前线程在抢锁过程中可以响应中断信号*/void lockInterruptibly() throws InterruptedException;/*** 尝试抢锁,线程为非阻塞模式,在调用tryLock()方法后立即返回。若抢锁成功则返回true,否则返回false*/boolean tryLock();/*** 限时抢锁。到达超时时间返回false,也可以相应中断信号*/boolean tryLock(long time, TimeUnit unit) throws InterruptedException;/*** 释放锁*/void unlock();/*** 获取与显示锁绑定的Condition对象,用于等待-通知方式的线程间通信*/Condition newCondition();}
据 Lock 接口提供的方法来看,Lock 锁至少比 synchronized 多出以下优势:
- 可中断获取锁
使用 synchronized 关键字获取锁的时候,如果线程没有获取到被阻塞,阻塞期间该线程是不响应中断信号(interrupt)的;而调用 Lock.lockInterruptibly() 方法获取锁时,如果线程被中断,线程将抛出中断异常。
- 可非阻塞获取锁
使用 synchronized 关键字获取锁时,如果没有成功获取,线程只有被阻塞;而调用 Lock.tryLock() 方法获取锁时,如果没有获取成功,线程也不会被阻塞,而是直接返回 false。
- 可限时获取锁
调用 Lock.tryLock(long time,TimeUnit unit) 方法,显式锁可以设置限定抢占锁的超时时间。而在使用 synchronized 关键字获取锁时,如果不能抢到锁,线程只能无限制阻塞。
可重入锁 ReentrantLock
ReentrantLock 相关的类继承结构如下:

- ReentrantLock 是 JUC 包下提供的 Lock 基础实现类
- ReentrantLock 实现了 Lock 接口,所有它拥有和 synchronized 相同的并发性和内存语义,还拥有了限时抢占、可中断抢占等一些高级特性
- ReentrantLock 内部基于 Sync 实现了锁功能。sync 继承至 AbstractQueuedSynchronizer(抽象队列同步器,AQS),因此在争用激烈的场景下,能表现出比内置锁更佳的性能
ReentrantLock 是一个可重入的独占锁,其中:
可重入表示该锁能够支持一个线程对资源的重复加锁,也就是说,一个线程可以多次进入同一个锁所同步的临界区代码块。例如:
lock.lock();lock.lock();try {// 临界区代码} finally {lock.unlock();lock.unlock();}
独占表示在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能等待,只有拥有锁的线程释放了锁后,其他的线程才能够获取锁。
基于 Condition 实现等待-通知式线程间通信
等待-通知方式的线程间通信机制,具体来说是指一个线程 A 调用了同步对象的 wait() 方法进入等待状态,而另一线程 B 调用了同步对象的 notify() 或者 notifyAll() 方法去唤醒等待线程,当线程 A 收到线程 B 的唤醒通知后,就可以重新开始执行了。
在 Lock 体系中,等待-通知方式的线程间通信机制可以使用 Condition 接口来实现。Condition 接口的主要方法如下:
public interface Condition {/*** 等待,该方法在功能上等同于 Object.wait()* 是当前线程加入 await() 等待队列中,并释放当前锁* 当其他线程调用 signal() 时,等待队列中的某个线程会被唤醒,重新去抢所*/void await() throws InterruptedException;/*** 等待,不会响应中断信号*/void awaitUninterruptibly();/*** 超时等待*/long awaitNanos(long nanosTimeout) throws InterruptedException;/*** 超时等待*/boolean await(long time, TimeUnit unit) throws InterruptedException;/*** 定时等待*/boolean awaitUntil(Date deadline) throws InterruptedException;/*** 通知。此方法在功能上与Object.notify功能一致,用来唤醒在await()等待队列中的某一个线程*/void signal();/*** 通知。用来唤醒在await()等待队列中的所有线程,线程唤醒之后再由操作系统进行调度*/void signalAll();}
Condition 对象的 signal(通知)方法和同一个对象的 await(等待)方法是一一配对使用的,也就是说,一个 Condition 对象的 signal(或 signalAll)方法不能去唤醒其他 Condition 对象上的 await 线程。
ReentrantLock 的抢锁流程
ReentrantLock 有两种模式:
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的。
ReentrantLock 在同一个时间点只能被一个线程获取,ReentrantLock 是通过一个 FIFO 的等待队列( AQS 队列)来管理获取该锁所有线程的。ReentrantLock 是继承自 Lock 接口实现的独占式可重入锁,与 ReentrantLock 组合一个 AQS 内部实例完成同步操作。
ReentrantLock$NonfairSync 非公平锁抢占流程
ReentrantLock 为非公平锁实现了一个内部的同步器——NonfairSync,其显式锁获取方法 lock() 的源码如下:
static final class NonfairSync extends Sync {final void lock() {// 流程1if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else// 流程2acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}
- 流程1 - 通过 CAS 操作判断 state 的值是不是0(表示当前锁未被占用),如果能够获取到锁,将当前线程设置为独占线程 。由于这里线程一进来就抢锁,完全没有考虑到阻塞队列中是否存在等待的线程,因此这正是非公平的
- 流程2 - 当前处于并发状态,已经有线程占用了锁。通过
acquire(int arg)方法再次获取,获取失败则进入等待队列
下面进入 AQS 的 acquire(int arg) 方法:
public final void acquire(int arg) {// tryAcquire(int acquires) - 尝试获取锁if (!tryAcquire(arg) &&// addWaiter(Node mode) - 创建节点并入队// acquireQueued(final Node node, int arg) - 阻塞线程acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
tryAcquire(int) - 尝试获取锁
tryAcquire(int arg) 是一个钩子方法,ReentrantLock$NonfairSync 重写了这个方法,实现了自己的抢锁逻辑。代码如下:
// ReentrantLock$NonfairSyncprotected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}// ReentrantLockfinal boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 流程1.1 当前锁未被占用if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 流程1.2 当前线程是锁的持有者else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}// 流程1.3return false;}
- 流程1.1 - 首先检查锁状态,如果没有线程持有这把锁,即调用 compareAndSetState(int except, int update) 抢占锁,抢占成功则设置当前线程为锁的持有者并且返回 true,否则返回 false
- 流程1.2 - 如果当前线程正好是锁的持有者(重入),则将 state + 1 并且返回 true。需要注意的是, state 的值溢出(nextc < 0)会抛出异常
- 流程1.3 - 当前线程既不是当前锁的持有者,又不能抢占到锁,返回 false,之后将其放入到 AQS 阻塞队列
addWaiter(Node) - 创建节点并入队
该方法为当前线程创建一个 Node 节点并将节点放入到双向链表的尾节点。注意:此时只是把 Thread 放入了队列当中而已,线程并没有阻塞。
private Node addWaiter(Node mode) {// 为当前线程创建节点,mode = Node.EXCLUSIVENode node = new Node(Thread.currentThread(), mode);Node pred = tail;if (pred != null) { // 已经有线程进入阻塞状态// 将node作为链表的尾节点,上一个尾节点作为它的前节点node.prev = pred;// CAS 操作设置尾节点if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 当前同步队列并不存在,创建节点enq(node);return node;}private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) {// 通过 CAS 操作初始化头结点 head,如果后来的线程先完成了该初始化流程,该 CAS 操作// 就会失败,再次进入 for 循环,从而通过 else 分支将线程入队if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;// 通过 CAS 操作初始化尾结点 tail,避免并发情况下后来者线程先初始化了链表if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
acquireQueued(Node, int ) - 自旋抢占
在 addWaiter(Node) 方法中,将线程封装称为 Node 对象并加入到 AQS 双向链表中,但是该线程并没有进入阻塞状态。该方法就是完成线程的阻塞的。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 线程是否被中断boolean interrupted = false;// 流程1 死循环自旋检查当前节点的前驱结点是否是头结点for (;;) {// 获取上一个节点final Node p = node.predecessor();// 流程2// 只有上一个是头结点且获取到锁才会执行当前线程。为什么上一个节点是头结点,因为队列// 是从尾节点插入,头结点取出,所以每次唤醒都是拿到的头结点的线程执行,避免队列中// 其他线程无意义的做 CAS 自旋if (p == head && tryAcquire(arg)) {// 设置当前节点为头结点setHead(node);// 上一个节点是头结点,置为null是表示与链表脱离开,方便GC回收p.next = null; // help GCfailed = false;return interrupted;}// 流程3 检查前一个节点的状态,判断当前获取锁失败的线程是否需要挂起,如果需要挂起,// 调用 parkAndCheckInterrupt() 方法挂起线程,直到被唤醒// parkAndCheckInterrupt() - 线程挂起if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)// 流程4 等待过程中没有获取到锁,取消请求,将节点从队列中移除// cancelAcquire(Node) - 取消请求cancelAcquire(node);}}
- 流程1 - 当前 Node 节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋
- 流程2 - 有当前驱节点是头节点时才能尝试获取锁,原因是:
- 头节点是成功获取锁的节点,而头节点的线程释放了锁以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点
- 维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行 for 死循环
- 流程3 - 检查前一个节点的状态,判断当前获取锁失败的线程是否需要挂起,如果需要挂起, 调用
parkAndCheckInterrupt()方法挂起线程,直到被唤醒。这样就不会因为执行无效的循环导致 CPU 的资源浪费 - 流程4 - 如果等待过程中没有获取到锁,则取消请求,并且将节点从队列中移除
shouldParkAfterFailedAcquire(Node, Node) - 挂起预判
// 检查和更新未能获取的节点的状态private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; // 获取前驱结点的状态if (ws == Node.SIGNAL) // 如果前驱结点的状态为SIGNAL(-1),则直接返回truereturn true;if (ws > 0) { // 前驱结点所指向的线程已经取消,CANCELLED(1)// 不断循环,知道找到有效的前驱结点,即非 CANCELLED 状态的节点do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);// 调整前驱结点的next指针pred.next = node;} else {/** 如果前驱结点既不是SIGNAL,也不是CANCELLED,就设置为SIGNAL*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
shouldParkAfterFailedAcquire(Node, Node) 方法是在 acquireQueued(Node, int) 方法的死循环中被调用的,由于此方法返回 false 时 acquireQueued(Node, int) 不会阻塞当前线程,只有此方法返回 true 时当前线程才阻塞,因此在一般情况下,此方法至少需要执行两次,当前线程才会被阻塞。
在第一次进入此方法时,首先会进入后一个 if 判断的 else 分支,通过 CAS 设置 pred 前驱的 waitStatus 为 SIGNAL,然后返回 false。此方法返回 false 之后,获取独占锁的 acquireQueued(Node, int) 方法会继续进行 for 循环去抢锁:
- 假设 node 的前驱节点是头节点,
tryAcquire(int)抢锁成功,则获取到锁 - 假设 node 的前驱节点不是头节点,或者
tryAcquire(int)抢锁失败,仍然会再次调用此方法
第二次进入此方法时,由于上一次进入时已经将 pred.waitStatus 设置为 SIGNAL(-1)了,因此这次会进入第一个判断条件,直接返回 true,表示应该调用 parkAndCheckInterrupt() 阻塞当前线程,等待前一个节点执行完成之后唤醒。
注意:SIGNAL 标记表示后继节点处于阻塞状态,所以**shouldParkAfterFailedAcquire(Node, Node)** 方法会先将前驱结点的状态设置为 SIGNAL,然后再阻塞当前线程
parkAndCheckInterrupt() - 线程挂起
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
主要做两件事:
- 通过 LockSupport.park(this) 挂起当前线程,底层是通过
Unsafe#park(boolean, long)来实现线程挂起功能的 - 检查当前线程的中断信号。注意:
Thread.interrupted()是会重置中断信号的
由此可见,虽然 acquireQueued(Node, int) 不会阻塞当前线程,但是却会记录下阻塞过程中是否接收到中断信号,如果接收到中断信号,才会回调到以下代码:
public final void acquire(int arg) {if (!tryAcquire(arg) &&// acquireQueued 的返回值表示是否接收到中断信号acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// selfInterrupt() - 线程自我中断selfInterrupt();}
selfInterrupt() - 线程自我中断
static void selfInterrupt() {Thread.currentThread().interrupt();}
cancelAcquire(Node) - 取消请求
private void cancelAcquire(Node node) {if (node == null)return;node.thread = null;Node pred = node.prev; // 检查当前节点的前驱结点的状态while (pred.waitStatus > 0) // 根据前驱结点的状态,找到最近的有效的(非 CANCELLED)的节点node.prev = pred = pred.prev;// predNext 是要取消的明显节点Node predNext = pred.next;// 设置当前节点的状态为CANCELLEDnode.waitStatus = Node.CANCELLED;// 流程1 设置链表的tail为刚才找到的有效的前驱结点if (node == tail && compareAndSetTail(node, pred)) {// 设置pred的next节点为nullcompareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;// 流程2if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;// 断链,将node的next节点赋给prev作为其next节点if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {// 流程3unparkSuccessor(node);}node.next = node; // help GC}}private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)// 重置node的状态compareAndSetWaitStatus(node, ws, 0);// s 作为node的后继节点Node s = node.next;// 以防s节点为null或者已取消,从tail一直往前遍历,找到一个最接近node的合适的(非CANCELLED状态)的节点if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 唤醒该节点,让其线程继续运行if (s != null)LockSupport.unpark(s.thread);}
- 流程1 - 如果要取消的节点是尾节点,则将 node->prev 设置为尾节点,并将 node->prev 的 next 节点通过 CAS 操作设置为null
- 流程2 - 如果 node->prev 节点不是头结点,且 node->prev 的状态为 SIGNAL 或者通过 CAS 操作可设置为 SIGNAL,且 node->prev 包含的线程存在,则将node->next设置为 node->prev->next,相当于将 node 从链表中断开。也就是说,如果上一个节点还在阻塞,直接断链就行,否则的话,唤醒 node->next 去争夺 CPU 时间片来运行
- 流程3 - 从 tail 往前遍历,找到离 node 最近的且为有效状态节点,唤醒其线程,继续执行。避免此时上一个线程执行完毕
总之,cancelAcquire(Node) 就是要将当前线程从链表中取消,并且将链表重新续接上。
小结
非公平锁的抢锁过程如下:
- 线程首先会通过 tryAcquire(int) 来尝试获取锁,不用管等待队列中是否有线程在等待;
- 如果获取不到锁,则调用 addWaiter(Node) 创建节点并且追加在队列的尾部,然后通过调用 acquireQueued(Node, int) 进入无限 for 循环自旋抢占,在 parkAndChechInterrupt() 方法中通过 LockSupport#park() 方法阻塞自身线程。之所以acquireQueued(Node, int) 使用的是死循环,就是因为 LockSupport#park() 方法会响应中断唤醒,再次循环就会自己阻塞自己,直到锁被释放且线程竞争到了锁资源
- 线程在阻塞期间收到中断信号,唤醒之后会通过 selfInterrupt() 方法再次自己中断自己。之所以要这么做,是因为在收到其他线程的中断信号之后没有及时响应(acquireQueued 是死循环),现在要进行补偿
ReentrantLock$FairSync 公平锁抢占流程
tryAcquire(int) - 尝试获取锁
同 ReentrantLock$NonfairSync 的开始一样,都会调用到 acqueri(int)方法:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
不同的是,tryAcquire(int) 是自己的实现:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// hasQueuedPredecessors() - 检查等待队列if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
hasQueuedPredecessors() - 检查等待队列
public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}
该方法是实现公平锁的核心代码:
- h == t 表示当前 AQS 队列且空,直接返回 false
- h != t && s == null 表示当前 AQS 的队列只有一个元素,也就是队列的头结点,返回 true
- h != t && (s == null || s.thread != Thread.currentThread()) 表示队列中的第一个线程不是当前线程,说明此时队列中有线程在等待,为了达到公平,当前线程不应该抢占锁,而是进入队列等到。结果返回 true
小结
公平锁的实现在为线程创建节点并且入队、自旋抢占、自我阻塞,中断补救等方面都与非公平锁一致,唯一不同的是在尝试获取锁(tryAcquire(int))时,公平锁会先检测 AQS 同步队列中是否已经有线程在等待,如果存在,则不会通过 CAS 操作进行抢锁,而是进入等到队列中,直到被唤醒。
ReentrantLock 释放锁流程
ReentrantLock 的锁释放流程是通过调用 unlock() 方法来实现的:
// ReentrantLock.javapublic void unlock() {sync.release(1);}// AbstractQueuedSynchronizer.javapublic final boolean release(int arg) {// tryRelease(int) - 钩子函数:尝试释放锁if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)// unparkSuccessor(Node) - 唤醒后继线程unparkSuccessor(h);return true;}return false;}
tryRelease(int) - 钩子函数:尝试释放锁
tryRelease(int) 方法对于公平锁和非公平锁来说都是同一个实现,因此真正的实现代码是在 ReentrantLock 中:
protected final boolean tryRelease(int releases) {// 流程1 计算锁状态,重入会导致state的值大于1int c = getState() - releases;// 流程2 判断需要释放锁的当前线程是否是持有锁的线程,不是则抛出异常if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 流程3 只有重入的次数为0,表示当前锁可用,才会将持有锁的线程置空,在其他线程抢占// 到锁之后再重新设置if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
- 流程1 - 由于 ReentrantLock 是重入锁,在 tryAcquire(int) - 尝试获取锁 中对重入的逻辑处理为 state 自增,表示冲入的次数。相应地,释放锁也需要多次释放,次数与重入次数一样,才能完全释放锁
- 流程2 - 释放锁的操作只能在加锁线程执行,否则会触发 IllegalMonitorStateException 异常
- 只有 state 的值为 0(表示锁已经全部释放),才会将重置锁的线程拥有者实例
unparkSuccessor(Node) - 唤醒后继线程
在线程释放锁之后,需要唤醒下一个线程。唤醒操作由方法 unparkSuccessor(Node) 实现:
private void unparkSuccessor(Node node) {/** 获取节点状态,该节点也就是释放锁的节点,也是头结点*/int ws = node.waitStatus;// CANCELLLED(1),SIGNAL(-1),CONDITION(-2),PROPAGATE(-3)// 若节点状态小于0,则将其值为0,表示初始状态if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 找到后继节点Node s = node.next;// 如果新节点已经被取消,对应状态为CANCELLED(1)if (s == null || s.waitStatus > 0) {s = null;// 从队列尾部开始往前遍历,找到最前面一个状态小于0的且最接近当前节点的节点(有效节点)for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 唤醒后继节点if (s != null)LockSupport.unpark(s.thread);}
unparkSuccessor() 唤醒后继节点的线程后,后继节点的线程重新执行方法 acquireQueued(Node, int) 中的自旋抢占逻辑。
注意:当 AQS 头节点释放锁之后,头节点的状态变成初始状态,此节点理论上需要从队列中移除,但是此时该无效节点并没有立即被移除,unparkSuccessor() 方法并没有立即从队列中删除该无效节点,仅仅唤醒了后继节点的线程,重启了后继节点的自旋抢锁。然后在 acquireQueued(Node, int) 方法中移除掉该节点 。
lockInterruptibly() 可中断抢锁流程
使用 lock() 方法抢锁并不能及时响应中断信号,而是记录下中断信号,在线程唤醒之后再次触发中断,相当于一个中断信号的补救措施。但是 Lock 接口也提供了一个可响应中断信号的方法,那就是 lockInterruptibly():
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}// AbstractQueuedSynchronizer.javapublic final void acquireInterruptibly(int arg)throws InterruptedException {// 先检查线程的中断状态,如果处于中断状态,则直接抛出 InterruptedException 异常if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))// doAcquireInterruptibly(int) - 可中断抢占锁流程doAcquireInterruptibly(arg);}
先检查线程的中断状态,如果处于中断状态,则直接抛出 InterruptedException 异常。如果线程没有接收到中断信号,则进入可中断抢占锁流程。
doAcquireInterruptibly(int) - 可中断抢占锁流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 区别点throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
该方法与 acquireQueued(Node, int) 不同的是,acquireQueued(Node, int) 如果通过 parkAndCheckInterrupt() 检测到中断信号,只是会返回一个为 true 的标志位,之后有线程自己调用自己的 interrupt() 方法给自己发起中断;但是 doAcquireInterruptibly(int) 则是在检测到中断信号之后直接抛出 InterruptedException 异常。这也正是 lockInterruptibly() - 可中断抢占锁流程的核心所在。
Condition 基本原理
Condition 是 JUC 用来替代传统 Object 的 wait()/notify() 线程间通信与协作机制的新组件,相比调用Object 的 wait()/notify(),调用 Condition 的 await()/signal() 这种方式实现线程间协作更加高效。因为 Object 的 wait()/notify() 由 JVM 实现,需要进行内核态和用户态之间的切换,而 Condition 的 await()/signal() 由纯 Java 代码执行,在 Java 层进行自旋,比较高效。
ConditionObject 是实现条件队列的关键,每个 ConditionObject 都维护了一个单独的条件等待队列,分别记录了该队列的头结点与尾节点。
public class ConditionObject implements Condition, java.io.Serializable {/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;....}
在一个锁(Lock)上,可以创建多个不同的条件等待队列:
private Lock lock = new ReentrantLock();private Condition first = lock.newCondition();private Condition second = lock.newCondition();
Condition 条件队列与 AQS 同步队列的关系如下:

Condition 条件队列是单向的,而 AQS 同步队列是双向的,AQS 节点会有前驱指针。一个 AQS 实例可以有多个条件队列,是聚合关系;但是一个 AQS 实例只有一个同步队列,是逻辑上的组合关系。
聚合关系强调是“整体”包含“部分”,但是“部分”可以脱离“整体”而单独存在。 组合关系也是强调整体与部分的关系,但是部分不能脱离整体而存在。
await() - 等待
当线程调用 await() 方法时,说明当前线程的节点为当前 AQS 队列的头节点,正好处于占有锁的状态,await() 方法需要把该线程从 AQS 队列挪到 Condition 等待队列里。

注意:在 await() 方法将当前线程挪动到 Condition 等待队列后,还会唤醒 AQS 同步队列中 head 节点的下一个节点。
public final void await() throws InterruptedException {// 线程处于中断状态,直接抛出中断异常if (Thread.interrupted())throw new InterruptedException();// addConditionWaiter() - 创建Node节点并入队Node node = addConditionWaiter();// fullyRelease(node) - 释放锁int savedState = fullyRelease(node);int interruptMode = 0;// isOnSyncQueue(Node) - 节点是否还在同步队列中等待while (!isOnSyncQueue(node)) {// 线程自我挂起LockSupport.park(this);//if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
- 创建节点并放入 Condition 队列尾部
- 释放 AQS 锁,并唤醒 AQS 同步队列中的头结点的后一个节点
- 执行 while 循环,将该节点的线程阻塞,知道该节点离开 Condition 队列,重新回到同步队列,线程才退出 while 循环
- 退出 while 循环后,调用 acquireQueued(Node, int) 尝试拿锁,拿不到锁进入 AQS 同步队列
addConditionWaiter() - 创建Node节点并入队
private Node addConditionWaiter() {Node t = lastWaiter;if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
该方法的作用是为当前线程创建一个 Node 节点并追加到当前 ConditionObject 的条件队列上,在此期间,会将该条件链表中无效节点通过 unlinkCancelledWaiters() 方法清除掉。
注意:线程调用 await() 的时候,肯定已经获取到了锁。所以这里对链表的操作不需要 CAS。
unlinkCancelledWaiters() - 清除条件队列中已取消的节点
private void unlinkCancelledWaiters() {// 头结点Node t = firstWaiter;Node trail = null;// 步骤1while (t != null) {// 步骤2Node next = t.nextWaiter;// 步骤3if (t.waitStatus != Node.CONDITION) {// 步骤4t.nextWaiter = null;// 步骤5if (trail == null) {firstWaiter = next;} else {// 步骤6trail.nextWaiter = next;}步骤7if (next == null) {lastWaiter = trail;}} else {// 步骤8 trail记录状态为CONDITION的节点trail = t;}t = next;}}
该方法的作用是清除掉 ConditionObject 中队列的无效状态(CANCELLED)节点。该队列是一个单项队列,采取的方案是从头结点往后遍历整个链表,遍历过程中会把所有无效节点清除。整个过程中:
- t 表示当前节点,也正是判断状态的节点
- trail 表示的是清除无效节点之后的有效链表的尾节点,遍历到最后一个节点时会赋值给 lastWaiter
- 步骤1 - 方法入口获取到链表头结点,然后将头结点赋值给 t,进入 while 循环,该循环的结束条件是遍历到尾节点,因为尾节点的 nextWaiter 为 null;
- 步骤2 - 获取到当前节点的 nextWaiter,赋值给 next
- 步骤3 - 如果当前节点 t 的状态不是 CONDITION,表示该节点已经取消(CANCELLED),因此需要将这个节点从链表中移除掉
- 步骤4 - 从链表中移除当前节点,具体实现为:t.nextWaiter = null,表示与整个链表脱离开
- 步骤5 - trail == null,给 firstWaiter 赋值为 t.nextWaiter ,这里是假设 t.nextWaiter 的状态是有效的,无效的话下次会再次覆盖。在步骤-8中,会将有效状态的节点赋值给 trail,此时的节点 t 正是上一步的 next,也就是上一次给 firstWaiter 赋值的 t.nextWaiter。因此,这里的 firstWaiter 赋值总是有效的
- 步骤6 - 同步骤5一样,总是假设下一个节点是有效的,将下一个有效节点链接到 firstWaiter 所在的有效节点链表中
- 步骤7 - next == null 表示节点遍历结束,将最后一个有效状态的节点赋值给 tail。至此,整个链表的无效节点清除完毕
- 步骤8 - trail 总是记录当前有效状态的节点信息
fullyRelease(node) - 释放锁
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();// 一次性将重入锁全部释放掉if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}
该方法的作用是用来释放锁,与普通锁释放操作不一样的是,这里显示通过 getState() 方法获取到重入次数,然后将结果传入 release 方法中,表示将该锁一次性全部释放。还将释放的次数作为返回值返回。
注意:
condition 的使用场景如下:
lock.lock();System.out.println("wait start");condition.await();System.out.println("wait end");
- 该方法会先释放锁,因为
await()方法是在lock()方法之后调用,lock() 方法会通过 CAS 操作拿到锁。在await()方法中会调用LockSupport.park(this)将线程阻塞,如果没有在阻塞之前释放锁,会导致其他线程获取不到锁(当前线程持有锁,并且阻塞在这里),进而会导致死锁的情况 - release(int) 方法中,还会唤醒队列中的下一个线程
isOnSyncQueue(Node) - 节点是否还在同步队列中等待
while (!isOnSyncQueue(node)) {// 线程自我挂起LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}
isOnSyncQueue(Node)方法用来判断该 Node 是否在 AQS 队列中checkInterruptWhileWaiting(Node)方法会检查中断信号,并作相应处理
final boolean isOnSyncQueue(Node node) {if (node.waitStatus == Node.CONDITION || node.prev == null)return false;if (node.next != null) // If has successor, it must be on queuereturn true;return findNodeFromTail(node);}private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}
该方法用来判断该 Node 是否在 AQS 队列中,初始的时候,Node 只存在于 ConditionObject 队列中。在执行 signal() 操作之后,调用 transferForSignal(Node) -> enq(Node) 方法将 Node 会放进 AQS 的同步队列中。
checkInterruptWhileWaiting(Node) - 检查线程中断状态
private int checkInterruptWhileWaiting(Node node) {// THROW_IE:-1// REINTERRUPT:1return Thread.interrupted() ?(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;}final boolean transferAfterCancelledWait(Node node) {if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {enq(node);return true;}while (!isOnSyncQueue(node))Thread.yield();return false;}
checkInterruptWhileWaiting(Node) 方法在 park() 方法之后调用,是因为线程从 park() 状态醒来时有两种可能:
- 其他线程调用了
unpark(Thread)方法进行唤醒 - 收到中断信号唤醒
所以,这里需要检测中断信号,当发现自己是被中断唤醒的,而不是被 unpark(Thread) 唤醒的,会直接退出循环,await() 方法也会返回。
小结
调用 Condition#await() 将当前线程阻塞在 while 循环中,直到调用 Condition#signal() 唤醒线程,将当前节点加入到 AQS 同步队列中,然后退出 while 循环。
awaitUninterruptibly() - 非中断等待
public final void awaitUninterruptibly() {Node node = addConditionWaiter();int savedState = fullyRelease(node);boolean interrupted = false;while (!isOnSyncQueue(node)) {LockSupport.park(this);// 重点if (Thread.interrupted())interrupted = true;}if (acquireQueued(node, savedState) || interrupted)selfInterrupt();}
该方法的最核心的部分则是 while 循环中收到中断信息之后不会退出 while 循环,而是设置标志位,继续循环。
signal() - 唤醒
public final void signal() {// 只有持有锁的线程才能够调用 signal() 方法if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}// 唤醒队列中的第一个线程private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null; // 如果第二个节点为null,表示尾节点也为空//将 node从Condition队列移除first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}// 将节点从条件队列转移到同步队列,成功则返回truefinal boolean transferForSignal(Node node) {// 重置node的状态if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;// node节点加入AQS同步队列,并获取AQS队列的前驱结点Node p = enq(node);int ws = p.waitStatus;// ws > 0 即是CANCELLED状态// 设置前驱节点为Signal状态失败if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
- 通过
enq()方法自旋将条件队列中的头节点放入 AQS 同步队列尾部,并获取它在 AQS 队列中的前驱节点 - 如果前驱节点的状态是取消状态,或者设置前驱节点为 Signal 状态失败,就唤醒当前节点的线程;否则节点在同步队列的尾部,参与排队
- 同步队列中的线程被唤醒后,表示重新获取了锁,然后继续执行
Condition#await()方法的临界区代码
