JUC 显示锁(Lock)是一种是一种使用纯 Java 语言实现的,非常灵活的锁,可以进行无条件的,可轮询的,定时的,可中断的锁获取和释放操作。
显示锁的由来
在 Java 中,内置锁(synchronized)有虚拟机自动抢占和释放同步对象的监视器(monitor),而且每一个对象都拥有一个监视器,这使得使用 synchronized 变得异常简便。但是 synchronized 的功能比较单一,在一些场景中无法胜任。例如:
- 限时抢锁:设置超时时长,如果超个这个时间还是无法获取到锁,则放弃对锁的抢占,不至于无限等待
- 可中断抢锁:在抢锁时,外部线程给抢锁线程发一个中断信号,就能唤起等待锁的线程,并终止抢占过程
- 多个等待队列:为锁维持多个等待队列,以便提高锁的效率。比如在生产者-消费者模式实现中,生产者和消费者共用一把锁,该锁上维持两个等待队列,即一个生产者队列和一个消费者队列
除此之外,在高并发的场景中,Java 对象锁会膨胀成为重量级锁,而重量级锁的线程阻塞和线程唤醒需要在用户态和内核态来回切换,导致性能低下。因此,显示锁就是为了解决这些问题而生的。JDK1.5 中引入了 Lock 接口,Lock 是 Java 代码级别的锁,也称之为显示锁。
Lock 接口
Lock 接口位于 java.util.concurrent.locks
包中,是 JUC 显式锁的一个抽象。与 synchronized 关键字不同,显式锁不再作为 Java 内置特性来实现,而是作为 Java 语言可编程特性来实现。这就为多种不同功能的锁实现留下了空间,各种锁实现可能有不同的调度算法、性能特性或者锁定语义。
Lock 接口的主要方法如下:
public interface Lock {
/**
* 获取锁,成功则向下运行,失败则自旋检测线程
*/
void lock();
/**
* 可中断抢锁,当前线程在抢锁过程中可以响应中断信号
*/
void lockInterruptibly() throws InterruptedException;
/**
* 尝试抢锁,线程为非阻塞模式,在调用tryLock()方法后立即返回。若抢锁成功则返回true,否则返回false
*/
boolean tryLock();
/**
* 限时抢锁。到达超时时间返回false,也可以相应中断信号
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 释放锁
*/
void unlock();
/**
* 获取与显示锁绑定的Condition对象,用于等待-通知方式的线程间通信
*/
Condition newCondition();
}
据 Lock 接口提供的方法来看,Lock 锁至少比 synchronized 多出以下优势:
- 可中断获取锁
使用 synchronized 关键字获取锁的时候,如果线程没有获取到被阻塞,阻塞期间该线程是不响应中断信号(interrupt)的;而调用 Lock.lockInterruptibly()
方法获取锁时,如果线程被中断,线程将抛出中断异常。
- 可非阻塞获取锁
使用 synchronized 关键字获取锁时,如果没有成功获取,线程只有被阻塞;而调用 Lock.tryLock()
方法获取锁时,如果没有获取成功,线程也不会被阻塞,而是直接返回 false。
- 可限时获取锁
调用 Lock.tryLock(long time,TimeUnit unit)
方法,显式锁可以设置限定抢占锁的超时时间。而在使用 synchronized 关键字获取锁时,如果不能抢到锁,线程只能无限制阻塞。
可重入锁 ReentrantLock
ReentrantLock 相关的类继承结构如下:
- ReentrantLock 是 JUC 包下提供的 Lock 基础实现类
- ReentrantLock 实现了 Lock 接口,所有它拥有和 synchronized 相同的并发性和内存语义,还拥有了限时抢占、可中断抢占等一些高级特性
- ReentrantLock 内部基于 Sync 实现了锁功能。sync 继承至 AbstractQueuedSynchronizer(抽象队列同步器,AQS),因此在争用激烈的场景下,能表现出比内置锁更佳的性能
ReentrantLock 是一个可重入的独占锁,其中:
可重入表示该锁能够支持一个线程对资源的重复加锁,也就是说,一个线程可以多次进入同一个锁所同步的临界区代码块。例如:
lock.lock();
lock.lock();
try {
// 临界区代码
} finally {
lock.unlock();
lock.unlock();
}
独占表示在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能等待,只有拥有锁的线程释放了锁后,其他的线程才能够获取锁。
基于 Condition 实现等待-通知式线程间通信
等待-通知方式的线程间通信机制,具体来说是指一个线程 A 调用了同步对象的 wait()
方法进入等待状态,而另一线程 B 调用了同步对象的 notify()
或者 notifyAll()
方法去唤醒等待线程,当线程 A 收到线程 B 的唤醒通知后,就可以重新开始执行了。
在 Lock 体系中,等待-通知方式的线程间通信机制可以使用 Condition 接口来实现。Condition 接口的主要方法如下:
public interface Condition {
/**
* 等待,该方法在功能上等同于 Object.wait()
* 是当前线程加入 await() 等待队列中,并释放当前锁
* 当其他线程调用 signal() 时,等待队列中的某个线程会被唤醒,重新去抢所
*/
void await() throws InterruptedException;
/**
* 等待,不会响应中断信号
*/
void awaitUninterruptibly();
/**
* 超时等待
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* 超时等待
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* 定时等待
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* 通知。此方法在功能上与Object.notify功能一致,用来唤醒在await()等待队列中的某一个线程
*/
void signal();
/**
* 通知。用来唤醒在await()等待队列中的所有线程,线程唤醒之后再由操作系统进行调度
*/
void signalAll();
}
Condition 对象的 signal(通知)方法和同一个对象的 await(等待)方法是一一配对使用的,也就是说,一个 Condition 对象的 signal(或 signalAll)方法不能去唤醒其他 Condition 对象上的 await 线程。
ReentrantLock 的抢锁流程
ReentrantLock 有两种模式:
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的。
ReentrantLock 在同一个时间点只能被一个线程获取,ReentrantLock 是通过一个 FIFO 的等待队列( AQS 队列)来管理获取该锁所有线程的。ReentrantLock 是继承自 Lock 接口实现的独占式可重入锁,与 ReentrantLock 组合一个 AQS 内部实例完成同步操作。
ReentrantLock$NonfairSync 非公平锁抢占流程
ReentrantLock 为非公平锁实现了一个内部的同步器——NonfairSync,其显式锁获取方法 lock()
的源码如下:
static final class NonfairSync extends Sync {
final void lock() {
// 流程1
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 流程2
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
- 流程1 - 通过 CAS 操作判断 state 的值是不是0(表示当前锁未被占用),如果能够获取到锁,将当前线程设置为独占线程 。由于这里线程一进来就抢锁,完全没有考虑到阻塞队列中是否存在等待的线程,因此这正是非公平的
- 流程2 - 当前处于并发状态,已经有线程占用了锁。通过
acquire(int arg)
方法再次获取,获取失败则进入等待队列
下面进入 AQS 的 acquire(int arg) 方法:
public final void acquire(int arg) {
// tryAcquire(int acquires) - 尝试获取锁
if (!tryAcquire(arg) &&
// addWaiter(Node mode) - 创建节点并入队
// acquireQueued(final Node node, int arg) - 阻塞线程
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(int) - 尝试获取锁
tryAcquire(int arg) 是一个钩子方法,ReentrantLock$NonfairSync 重写了这个方法,实现了自己的抢锁逻辑。代码如下:
// ReentrantLock$NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 流程1.1 当前锁未被占用
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 流程1.2 当前线程是锁的持有者
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 流程1.3
return false;
}
- 流程1.1 - 首先检查锁状态,如果没有线程持有这把锁,即调用 compareAndSetState(int except, int update) 抢占锁,抢占成功则设置当前线程为锁的持有者并且返回 true,否则返回 false
- 流程1.2 - 如果当前线程正好是锁的持有者(重入),则将 state + 1 并且返回 true。需要注意的是, state 的值溢出(nextc < 0)会抛出异常
- 流程1.3 - 当前线程既不是当前锁的持有者,又不能抢占到锁,返回 false,之后将其放入到 AQS 阻塞队列
addWaiter(Node) - 创建节点并入队
该方法为当前线程创建一个 Node 节点并将节点放入到双向链表的尾节点。注意:此时只是把 Thread 放入了队列当中而已,线程并没有阻塞。
private Node addWaiter(Node mode) {
// 为当前线程创建节点,mode = Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { // 已经有线程进入阻塞状态
// 将node作为链表的尾节点,上一个尾节点作为它的前节点
node.prev = pred;
// CAS 操作设置尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 当前同步队列并不存在,创建节点
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 通过 CAS 操作初始化头结点 head,如果后来的线程先完成了该初始化流程,该 CAS 操作
// 就会失败,再次进入 for 循环,从而通过 else 分支将线程入队
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 通过 CAS 操作初始化尾结点 tail,避免并发情况下后来者线程先初始化了链表
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(Node, int ) - 自旋抢占
在 addWaiter(Node) 方法中,将线程封装称为 Node 对象并加入到 AQS 双向链表中,但是该线程并没有进入阻塞状态。该方法就是完成线程的阻塞的。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 线程是否被中断
boolean interrupted = false;
// 流程1 死循环自旋检查当前节点的前驱结点是否是头结点
for (;;) {
// 获取上一个节点
final Node p = node.predecessor();
// 流程2
// 只有上一个是头结点且获取到锁才会执行当前线程。为什么上一个节点是头结点,因为队列
// 是从尾节点插入,头结点取出,所以每次唤醒都是拿到的头结点的线程执行,避免队列中
// 其他线程无意义的做 CAS 自旋
if (p == head && tryAcquire(arg)) {
// 设置当前节点为头结点
setHead(node);
// 上一个节点是头结点,置为null是表示与链表脱离开,方便GC回收
p.next = null; // help GC
failed = false;
return interrupted;
}
// 流程3 检查前一个节点的状态,判断当前获取锁失败的线程是否需要挂起,如果需要挂起,
// 调用 parkAndCheckInterrupt() 方法挂起线程,直到被唤醒
// parkAndCheckInterrupt() - 线程挂起
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 流程4 等待过程中没有获取到锁,取消请求,将节点从队列中移除
// cancelAcquire(Node) - 取消请求
cancelAcquire(node);
}
}
- 流程1 - 当前 Node 节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋
- 流程2 - 有当前驱节点是头节点时才能尝试获取锁,原因是:
- 头节点是成功获取锁的节点,而头节点的线程释放了锁以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点
- 维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行 for 死循环
- 流程3 - 检查前一个节点的状态,判断当前获取锁失败的线程是否需要挂起,如果需要挂起, 调用
parkAndCheckInterrupt()
方法挂起线程,直到被唤醒。这样就不会因为执行无效的循环导致 CPU 的资源浪费 - 流程4 - 如果等待过程中没有获取到锁,则取消请求,并且将节点从队列中移除
shouldParkAfterFailedAcquire(Node, Node) - 挂起预判
// 检查和更新未能获取的节点的状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 获取前驱结点的状态
if (ws == Node.SIGNAL) // 如果前驱结点的状态为SIGNAL(-1),则直接返回true
return true;
if (ws > 0) { // 前驱结点所指向的线程已经取消,CANCELLED(1)
// 不断循环,知道找到有效的前驱结点,即非 CANCELLED 状态的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 调整前驱结点的next指针
pred.next = node;
} else {
/*
* 如果前驱结点既不是SIGNAL,也不是CANCELLED,就设置为SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire(Node, Node)
方法是在 acquireQueued(Node, int)
方法的死循环中被调用的,由于此方法返回 false 时 acquireQueued(Node, int)
不会阻塞当前线程,只有此方法返回 true 时当前线程才阻塞,因此在一般情况下,此方法至少需要执行两次,当前线程才会被阻塞。
在第一次进入此方法时,首先会进入后一个 if 判断的 else 分支,通过 CAS 设置 pred 前驱的 waitStatus 为 SIGNAL,然后返回 false。此方法返回 false 之后,获取独占锁的 acquireQueued(Node, int)
方法会继续进行 for 循环去抢锁:
- 假设 node 的前驱节点是头节点,
tryAcquire(int)
抢锁成功,则获取到锁 - 假设 node 的前驱节点不是头节点,或者
tryAcquire(int)
抢锁失败,仍然会再次调用此方法
第二次进入此方法时,由于上一次进入时已经将 pred.waitStatus 设置为 SIGNAL(-1)了,因此这次会进入第一个判断条件,直接返回 true,表示应该调用 parkAndCheckInterrupt()
阻塞当前线程,等待前一个节点执行完成之后唤醒。
注意:SIGNAL 标记表示后继节点处于阻塞状态,所以**shouldParkAfterFailedAcquire(Node, Node)**
方法会先将前驱结点的状态设置为 SIGNAL,然后再阻塞当前线程
parkAndCheckInterrupt() - 线程挂起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
主要做两件事:
- 通过 LockSupport.park(this) 挂起当前线程,底层是通过
Unsafe#park(boolean, long)
来实现线程挂起功能的 - 检查当前线程的中断信号。注意:
Thread.interrupted()
是会重置中断信号的
由此可见,虽然 acquireQueued(Node, int)
不会阻塞当前线程,但是却会记录下阻塞过程中是否接收到中断信号,如果接收到中断信号,才会回调到以下代码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// acquireQueued 的返回值表示是否接收到中断信号
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt() - 线程自我中断
selfInterrupt();
}
selfInterrupt() - 线程自我中断
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
cancelAcquire(Node) - 取消请求
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev; // 检查当前节点的前驱结点的状态
while (pred.waitStatus > 0) // 根据前驱结点的状态,找到最近的有效的(非 CANCELLED)的节点
node.prev = pred = pred.prev;
// predNext 是要取消的明显节点
Node predNext = pred.next;
// 设置当前节点的状态为CANCELLED
node.waitStatus = Node.CANCELLED;
// 流程1 设置链表的tail为刚才找到的有效的前驱结点
if (node == tail && compareAndSetTail(node, pred)) {
// 设置pred的next节点为null
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
// 流程2
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
// 断链,将node的next节点赋给prev作为其next节点
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 流程3
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 重置node的状态
compareAndSetWaitStatus(node, ws, 0);
// s 作为node的后继节点
Node s = node.next;
// 以防s节点为null或者已取消,从tail一直往前遍历,找到一个最接近node的合适的(非CANCELLED状态)的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒该节点,让其线程继续运行
if (s != null)
LockSupport.unpark(s.thread);
}
- 流程1 - 如果要取消的节点是尾节点,则将 node->prev 设置为尾节点,并将 node->prev 的 next 节点通过 CAS 操作设置为null
- 流程2 - 如果 node->prev 节点不是头结点,且 node->prev 的状态为 SIGNAL 或者通过 CAS 操作可设置为 SIGNAL,且 node->prev 包含的线程存在,则将node->next设置为 node->prev->next,相当于将 node 从链表中断开。也就是说,如果上一个节点还在阻塞,直接断链就行,否则的话,唤醒 node->next 去争夺 CPU 时间片来运行
- 流程3 - 从 tail 往前遍历,找到离 node 最近的且为有效状态节点,唤醒其线程,继续执行。避免此时上一个线程执行完毕
总之,cancelAcquire(Node)
就是要将当前线程从链表中取消,并且将链表重新续接上。
小结
非公平锁的抢锁过程如下:
- 线程首先会通过 tryAcquire(int) 来尝试获取锁,不用管等待队列中是否有线程在等待;
- 如果获取不到锁,则调用 addWaiter(Node) 创建节点并且追加在队列的尾部,然后通过调用 acquireQueued(Node, int) 进入无限 for 循环自旋抢占,在 parkAndChechInterrupt() 方法中通过 LockSupport#park() 方法阻塞自身线程。之所以acquireQueued(Node, int) 使用的是死循环,就是因为 LockSupport#park() 方法会响应中断唤醒,再次循环就会自己阻塞自己,直到锁被释放且线程竞争到了锁资源
- 线程在阻塞期间收到中断信号,唤醒之后会通过 selfInterrupt() 方法再次自己中断自己。之所以要这么做,是因为在收到其他线程的中断信号之后没有及时响应(acquireQueued 是死循环),现在要进行补偿
ReentrantLock$FairSync 公平锁抢占流程
tryAcquire(int) - 尝试获取锁
同 ReentrantLock$NonfairSync 的开始一样,都会调用到 acqueri(int)方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
不同的是,tryAcquire(int) 是自己的实现:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// hasQueuedPredecessors() - 检查等待队列
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
hasQueuedPredecessors() - 检查等待队列
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
该方法是实现公平锁的核心代码:
- h == t 表示当前 AQS 队列且空,直接返回 false
- h != t && s == null 表示当前 AQS 的队列只有一个元素,也就是队列的头结点,返回 true
- h != t && (s == null || s.thread != Thread.currentThread()) 表示队列中的第一个线程不是当前线程,说明此时队列中有线程在等待,为了达到公平,当前线程不应该抢占锁,而是进入队列等到。结果返回 true
小结
公平锁的实现在为线程创建节点并且入队、自旋抢占、自我阻塞,中断补救等方面都与非公平锁一致,唯一不同的是在尝试获取锁(tryAcquire(int)
)时,公平锁会先检测 AQS 同步队列中是否已经有线程在等待,如果存在,则不会通过 CAS 操作进行抢锁,而是进入等到队列中,直到被唤醒。
ReentrantLock 释放锁流程
ReentrantLock 的锁释放流程是通过调用 unlock()
方法来实现的:
// ReentrantLock.java
public void unlock() {
sync.release(1);
}
// AbstractQueuedSynchronizer.java
public final boolean release(int arg) {
// tryRelease(int) - 钩子函数:尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// unparkSuccessor(Node) - 唤醒后继线程
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(int) - 钩子函数:尝试释放锁
tryRelease(int) 方法对于公平锁和非公平锁来说都是同一个实现,因此真正的实现代码是在 ReentrantLock 中:
protected final boolean tryRelease(int releases) {
// 流程1 计算锁状态,重入会导致state的值大于1
int c = getState() - releases;
// 流程2 判断需要释放锁的当前线程是否是持有锁的线程,不是则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 流程3 只有重入的次数为0,表示当前锁可用,才会将持有锁的线程置空,在其他线程抢占
// 到锁之后再重新设置
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
- 流程1 - 由于 ReentrantLock 是重入锁,在 tryAcquire(int) - 尝试获取锁 中对重入的逻辑处理为 state 自增,表示冲入的次数。相应地,释放锁也需要多次释放,次数与重入次数一样,才能完全释放锁
- 流程2 - 释放锁的操作只能在加锁线程执行,否则会触发 IllegalMonitorStateException 异常
- 只有 state 的值为 0(表示锁已经全部释放),才会将重置锁的线程拥有者实例
unparkSuccessor(Node) - 唤醒后继线程
在线程释放锁之后,需要唤醒下一个线程。唤醒操作由方法 unparkSuccessor(Node) 实现:
private void unparkSuccessor(Node node) {
/*
* 获取节点状态,该节点也就是释放锁的节点,也是头结点
*/
int ws = node.waitStatus;
// CANCELLLED(1),SIGNAL(-1),CONDITION(-2),PROPAGATE(-3)
// 若节点状态小于0,则将其值为0,表示初始状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 找到后继节点
Node s = node.next;
// 如果新节点已经被取消,对应状态为CANCELLED(1)
if (s == null || s.waitStatus > 0) {
s = null;
// 从队列尾部开始往前遍历,找到最前面一个状态小于0的且最接近当前节点的节点(有效节点)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor()
唤醒后继节点的线程后,后继节点的线程重新执行方法 acquireQueued(Node, int)
中的自旋抢占逻辑。
注意:当 AQS 头节点释放锁之后,头节点的状态变成初始状态,此节点理论上需要从队列中移除,但是此时该无效节点并没有立即被移除,unparkSuccessor()
方法并没有立即从队列中删除该无效节点,仅仅唤醒了后继节点的线程,重启了后继节点的自旋抢锁。然后在 acquireQueued(Node, int)
方法中移除掉该节点 。
lockInterruptibly() 可中断抢锁流程
使用 lock()
方法抢锁并不能及时响应中断信号,而是记录下中断信号,在线程唤醒之后再次触发中断,相当于一个中断信号的补救措施。但是 Lock 接口也提供了一个可响应中断信号的方法,那就是 lockInterruptibly()
:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// AbstractQueuedSynchronizer.java
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 先检查线程的中断状态,如果处于中断状态,则直接抛出 InterruptedException 异常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// doAcquireInterruptibly(int) - 可中断抢占锁流程
doAcquireInterruptibly(arg);
}
先检查线程的中断状态,如果处于中断状态,则直接抛出 InterruptedException 异常。如果线程没有接收到中断信号,则进入可中断抢占锁流程。
doAcquireInterruptibly(int) - 可中断抢占锁流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 区别点
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
该方法与 acquireQueued(Node, int)
不同的是,acquireQueued(Node, int)
如果通过 parkAndCheckInterrupt()
检测到中断信号,只是会返回一个为 true 的标志位,之后有线程自己调用自己的 interrupt()
方法给自己发起中断;但是 doAcquireInterruptibly(int)
则是在检测到中断信号之后直接抛出 InterruptedException 异常。这也正是 lockInterruptibly() - 可中断抢占锁流程的核心所在。
Condition 基本原理
Condition 是 JUC 用来替代传统 Object 的 wait()/notify()
线程间通信与协作机制的新组件,相比调用Object 的 wait()/notify()
,调用 Condition 的 await()/signal()
这种方式实现线程间协作更加高效。因为 Object 的 wait()/notify()
由 JVM 实现,需要进行内核态和用户态之间的切换,而 Condition 的 await()/signal()
由纯 Java 代码执行,在 Java 层进行自旋,比较高效。
ConditionObject 是实现条件队列的关键,每个 ConditionObject 都维护了一个单独的条件等待队列,分别记录了该队列的头结点与尾节点。
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
....
}
在一个锁(Lock)上,可以创建多个不同的条件等待队列:
private Lock lock = new ReentrantLock();
private Condition first = lock.newCondition();
private Condition second = lock.newCondition();
Condition 条件队列与 AQS 同步队列的关系如下:
Condition 条件队列是单向的,而 AQS 同步队列是双向的,AQS 节点会有前驱指针。一个 AQS 实例可以有多个条件队列,是聚合关系;但是一个 AQS 实例只有一个同步队列,是逻辑上的组合关系。
聚合关系强调是“整体”包含“部分”,但是“部分”可以脱离“整体”而单独存在。 组合关系也是强调整体与部分的关系,但是部分不能脱离整体而存在。
await() - 等待
当线程调用 await()
方法时,说明当前线程的节点为当前 AQS 队列的头节点,正好处于占有锁的状态,await()
方法需要把该线程从 AQS 队列挪到 Condition 等待队列里。
注意:在 await()
方法将当前线程挪动到 Condition 等待队列后,还会唤醒 AQS 同步队列中 head 节点的下一个节点。
public final void await() throws InterruptedException {
// 线程处于中断状态,直接抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// addConditionWaiter() - 创建Node节点并入队
Node node = addConditionWaiter();
// fullyRelease(node) - 释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// isOnSyncQueue(Node) - 节点是否还在同步队列中等待
while (!isOnSyncQueue(node)) {
// 线程自我挂起
LockSupport.park(this);
//
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 创建节点并放入 Condition 队列尾部
- 释放 AQS 锁,并唤醒 AQS 同步队列中的头结点的后一个节点
- 执行 while 循环,将该节点的线程阻塞,知道该节点离开 Condition 队列,重新回到同步队列,线程才退出 while 循环
- 退出 while 循环后,调用 acquireQueued(Node, int) 尝试拿锁,拿不到锁进入 AQS 同步队列
addConditionWaiter() - 创建Node节点并入队
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
该方法的作用是为当前线程创建一个 Node 节点并追加到当前 ConditionObject 的条件队列上,在此期间,会将该条件链表中无效节点通过 unlinkCancelledWaiters()
方法清除掉。
注意:线程调用 await() 的时候,肯定已经获取到了锁。所以这里对链表的操作不需要 CAS。
unlinkCancelledWaiters() - 清除条件队列中已取消的节点
private void unlinkCancelledWaiters() {
// 头结点
Node t = firstWaiter;
Node trail = null;
// 步骤1
while (t != null) {
// 步骤2
Node next = t.nextWaiter;
// 步骤3
if (t.waitStatus != Node.CONDITION) {
// 步骤4
t.nextWaiter = null;
// 步骤5
if (trail == null) {
firstWaiter = next;
} else {
// 步骤6
trail.nextWaiter = next;
}
步骤7
if (next == null) {
lastWaiter = trail;
}
} else {
// 步骤8 trail记录状态为CONDITION的节点
trail = t;
}
t = next;
}
}
该方法的作用是清除掉 ConditionObject 中队列的无效状态(CANCELLED)节点。该队列是一个单项队列,采取的方案是从头结点往后遍历整个链表,遍历过程中会把所有无效节点清除。整个过程中:
- t 表示当前节点,也正是判断状态的节点
- trail 表示的是清除无效节点之后的有效链表的尾节点,遍历到最后一个节点时会赋值给 lastWaiter
- 步骤1 - 方法入口获取到链表头结点,然后将头结点赋值给 t,进入 while 循环,该循环的结束条件是遍历到尾节点,因为尾节点的 nextWaiter 为 null;
- 步骤2 - 获取到当前节点的 nextWaiter,赋值给 next
- 步骤3 - 如果当前节点 t 的状态不是 CONDITION,表示该节点已经取消(CANCELLED),因此需要将这个节点从链表中移除掉
- 步骤4 - 从链表中移除当前节点,具体实现为:t.nextWaiter = null,表示与整个链表脱离开
- 步骤5 - trail == null,给 firstWaiter 赋值为 t.nextWaiter ,这里是假设 t.nextWaiter 的状态是有效的,无效的话下次会再次覆盖。在步骤-8中,会将有效状态的节点赋值给 trail,此时的节点 t 正是上一步的 next,也就是上一次给 firstWaiter 赋值的 t.nextWaiter。因此,这里的 firstWaiter 赋值总是有效的
- 步骤6 - 同步骤5一样,总是假设下一个节点是有效的,将下一个有效节点链接到 firstWaiter 所在的有效节点链表中
- 步骤7 - next == null 表示节点遍历结束,将最后一个有效状态的节点赋值给 tail。至此,整个链表的无效节点清除完毕
- 步骤8 - trail 总是记录当前有效状态的节点信息
fullyRelease(node) - 释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 一次性将重入锁全部释放掉
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
该方法的作用是用来释放锁,与普通锁释放操作不一样的是,这里显示通过 getState()
方法获取到重入次数,然后将结果传入 release 方法中,表示将该锁一次性全部释放。还将释放的次数作为返回值返回。
注意:
condition 的使用场景如下:
lock.lock();
System.out.println("wait start");
condition.await();
System.out.println("wait end");
- 该方法会先释放锁,因为
await()
方法是在lock()
方法之后调用,lock() 方法会通过 CAS 操作拿到锁。在await()
方法中会调用LockSupport.park(this)
将线程阻塞,如果没有在阻塞之前释放锁,会导致其他线程获取不到锁(当前线程持有锁,并且阻塞在这里),进而会导致死锁的情况 - release(int) 方法中,还会唤醒队列中的下一个线程
isOnSyncQueue(Node) - 节点是否还在同步队列中等待
while (!isOnSyncQueue(node)) {
// 线程自我挂起
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
isOnSyncQueue(Node)
方法用来判断该 Node 是否在 AQS 队列中checkInterruptWhileWaiting(Node)
方法会检查中断信号,并作相应处理
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
该方法用来判断该 Node 是否在 AQS 队列中,初始的时候,Node 只存在于 ConditionObject 队列中。在执行 signal()
操作之后,调用 transferForSignal(Node) -> enq(Node)
方法将 Node 会放进 AQS 的同步队列中。
checkInterruptWhileWaiting(Node) - 检查线程中断状态
private int checkInterruptWhileWaiting(Node node) {
// THROW_IE:-1
// REINTERRUPT:1
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
checkInterruptWhileWaiting(Node)
方法在 park() 方法之后调用,是因为线程从 park() 状态醒来时有两种可能:
- 其他线程调用了
unpark(Thread)
方法进行唤醒 - 收到中断信号唤醒
所以,这里需要检测中断信号,当发现自己是被中断唤醒的,而不是被 unpark(Thread)
唤醒的,会直接退出循环,await()
方法也会返回。
小结
调用 Condition#await()
将当前线程阻塞在 while 循环中,直到调用 Condition#signal()
唤醒线程,将当前节点加入到 AQS 同步队列中,然后退出 while 循环。
awaitUninterruptibly() - 非中断等待
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 重点
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
该方法的最核心的部分则是 while 循环中收到中断信息之后不会退出 while 循环,而是设置标志位,继续循环。
signal() - 唤醒
public final void signal() {
// 只有持有锁的线程才能够调用 signal() 方法
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 唤醒队列中的第一个线程
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null; // 如果第二个节点为null,表示尾节点也为空
//将 node从Condition队列移除
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将节点从条件队列转移到同步队列,成功则返回true
final boolean transferForSignal(Node node) {
// 重置node的状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// node节点加入AQS同步队列,并获取AQS队列的前驱结点
Node p = enq(node);
int ws = p.waitStatus;
// ws > 0 即是CANCELLED状态
// 设置前驱节点为Signal状态失败
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 通过
enq()
方法自旋将条件队列中的头节点放入 AQS 同步队列尾部,并获取它在 AQS 队列中的前驱节点 - 如果前驱节点的状态是取消状态,或者设置前驱节点为 Signal 状态失败,就唤醒当前节点的线程;否则节点在同步队列的尾部,参与排队
- 同步队列中的线程被唤醒后,表示重新获取了锁,然后继续执行
Condition#await()
方法的临界区代码