1. Condition 简介
任何一个 java 对象都天然继承于 Object 类,线程间通信会用到 Object 的一些方法,比如:
wait()
wait(long timeout)
wait(long timeout, int nanos)
notify()
notifyAll()
同样,在 Java Lock 体系下,会有同样的方法去实现等待/通知机制。
从整体上来看 Object的 **wait**
和 **notify/notify**
是与对象监视器配合完成线程间的等待/通知机制,而 Condition 与 **Lock**
配合完成等待通知机制,前者是 java 底层级别的,后者是语言级别的,具有更高的可控制性和扩展性。
1.1 Condition 的优势
- Condition 能够支持不响应中断,而通过使用 Object 方式不支持;
- Condition 能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个;
- Condition 能够支持超时时间的设置,而 Object 不支持
参照 Object 的 wait
和 notify/notifyAll
方法,Condition 也提供了同样的方法:
1.2 针对 Object 的 **wait**
方法
void await() throws InterruptedException
:当前线程进入等待状态。如果其他线程调用 condition 的signal
或者signalAll
方法并且当前线程获取 Lock 从await
方法返回,如果在等待状态中被中断会抛出被中断异常;long awaitNanos(long nanosTimeout)
:当前线程进入等待状态直到被通知,中断或者超时;boolean await(long time, TimeUnit unit)throws InterruptedException
:同第二种,支持自定义时间单位boolean awaitUntil(Date deadline) throws InterruptedException
:当前线程进入等待状态直到被通知,中断或者到了某个时间
1.3 针对Object的 notify/notifyAll
方法
void signal()
:唤醒一个等待在 condition 上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到 Lock 则可以从等待方法中返回。void signalAll()
:与第一个方法的区别在于能够唤醒所有等待在 condition 上的线程
2. Condition 实现原理
2.1 等待队列
Condition 创建是通过:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
而这个方法实际上是会 new 出一个 ConditionObject 对象,该类是 AQS 的一个内部类,我们知道在锁机制的实现上,AQS 内部维护了一个同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列,同样的,condition 内部也是使用同样的方式,内部维护了一个 等待队列,所有调用 condition.await
方法的线程会加入到等待队列中,并且线程状态转换为等待状态。
ContidionObject 中有连个成员变量
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
可以看出来 ConditionObject 通过持有等待队列的头尾指针来管理等待队列。
主要注意的是 Node 类复用了在 AQS 中的 Node 类,其节点状态和相关属性可以去看AQS的实现原理的文章,如果您仔细看完这篇文章对 condition 的理解易如反掌,对 lock 体系的实现也会有一个质的提升。Node类有这样一个属性:
//后继节点
Node nextWaiter;
进一步说明,等待队列是一个单向队列,而在之前说 AQS 时知道同步队列是一个双向队列。接下来我们用一个demo,通过 debug 进去看是不是符合我们的猜想:
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
});
thread.start();
}
}
这段代码没有任何实际意义,甚至很臭,只是想说明下我们刚才所想的。新建了10个线程,没有线程先获取锁,然后调用condition.await
方法释放锁将当前线程加入到等待队列中,通过 debug 控制当走到第10个线程的时候查看firstWaiter
即等待队列中的头结点,debug 模式下情景图如下:
从这个图我们可以很清楚的看到这样几点:
- 调用
condition.await
方法后线程依次尾插入到等待队列中,如图队列中的线程引用依次为Thread-0,Thread-1,Thread-2….Thread-8; - 等待队列是一个单向队列。通过我们的猜想然后进行实验验证,我们可以得出等待队列的示意图如下图所示:
同时还有一点需要注意的是:我们可以多次调用 lock.newCondition()
方法创建多个condition对象,也就是一个 lock 可以持有多个等待队列。而在之前利用 Object 的方式实际上是指在对象 Object 对象监视器上只能拥有一个同步队列和一个等待队列,而并发包中的 Lock 拥有一个同步队列和多个等待队列。示意图如下:
如图所示,ConditionObject 是 AQS 的内部类,因此每个 ConditionObject 能够访问到 AQS 提供的方法,相当于每个 Condition 都拥有所属同步器的引用。
2.2 await 实现原理
当调用**condition.await()**
方法后会使得当前获取 lock 的线程进入到等待队列,如果该线程能够从**await()**
方法返回的话一定是该线程获取了与 condition 相关联的 lock。await()
方法源码为:
public final void await() throws InterruptedException {
if (Thread.interrupted())throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
当前线程调用**condition.await()**
后,会使得当前线程释放 lock 并加入等待队列,直至被**signal/signalAll**
会使得当前线程从等待队列中移至到同步队列中去,直到获得了lock后才会从await方法返回,或者在等待时被中断会做中断处理。关于这个过程的实现会有以下三个问题:
- 怎样将当前线程添加到等待队列中去的?
在第一步调用addConditionWaiter
将当前线程放入队列,源码如下:
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//将当前线程包装成Node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
// 尾插入
t.nextWaiter = node;
// 更新lastWaiter
lastWaiter = node;
return node;
}
将当前节点包装成 Node,如果等待队列 firstWaiter 为 null(等待队列为空),则将 firstWaiter 指向当前 Node,否则,更新 lastWaiter(尾节点)即可。
也就是通过尾插入的方式将当前线程封装的Node插入到等待队列中即可,同时可以看出等待队列是一个不带头结点的链式队列,之前我们学习AQS时知道同步队列是一个带头结点的链式队列,这是两者的一个区别。
- 释放锁的步骤?
第二个步骤:将当前节点插入到等待对列之后,会使当前线程释放 lock,由fullyRelease
方法实现,fullyRelease
源码为:
final long fullyRelease(Node node) {
boolean failed = true;
try {
long savedState = getState();
if (release(savedState)) {
// 成功释放同步状态
failed = false;
return savedState;
} else {
// 不成功释放同步状态抛出异常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
调用AQS的模板方法**release**
方法释放AQS的同步状态并且唤醒在同步队列中头结点的后继节点引用的线程。如果释放成功则正常返回,若失败的话就抛出异常。
- 怎样从 await 方法退出?
现在回过头再来看await
方法有这样一段逻辑:
while (!isOnSyncQueue(node)) {
// 3. 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
很显然,当线程第一次调用 condition.await()
方法时,会进入到这个 while()
循环中,然后通过LockSupport.park(this)
方法使得当前线程进入等待状态,那么要想退出这个await
方法第一个前提条件自然而然的是要先退出这个while循环,出口就只剩下两个地方:
- 逻辑走到break退出while循环;
- 再看代码出现第1种情况的条件是当前等待的线程被中断后代码会走到break退出
- while循环中的逻辑判断为false。
- 当前节点被移动到了同步队列中(即另外线程调用的 condition 的 signal 或者 signalAll 方法),while中逻辑判断为false后结束while循环。
总结下,就是当前线程被中断或者调用**condition.signal/condition.signalAll**
方法当前节点移动到了同步队列后 ,这是当前线程退出await
方法的前提条件。
当退出 while 循环后就会调用acquireQueued(node, savedState)
,这个方法在介绍AQS的底层实现时说过了,若感兴趣的话可以去看这篇文章,该方法的作用是在自旋过程中线程不断尝试获取同步状态,直至成功(线程获取到lock)。这样也说明了退出 await 方法必须是已经获得了 condition 引用(关联)的 lock。
到目前为止,开头的三个问题我们通过阅读源码的方式已经完全找到了答案,也对await
方法的理解加深。await方法示意图如下图:
如图,调用condition.await
方法的线程必须是已经获得了lock,也就是当前线程是同步队列中的头结点。调用该方法后会使得当前线程所封装的Node尾插入到等待队列中。
超时机制的支持
condition 还额外支持了超时机制,使用者可调用方法awaitNanos,awaitUtil
。这两个方法的实现原理,基本上与AQS中的 tryAcquire 方法如出一辙。
不响应中断的支持
要想不响应中断可以调用condition.awaitUninterruptibly()
方法,该方法的源码为:
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
这段方法与上面的 await 方法基本一致,只不过减少了对中断的处理,并省略了 reportInterruptAfterWait 方法抛被中断的异常。
2.3 signal/signalAll实现原理
调用 condition 的 **signal**
或者**signalAll**
方法可以将等待队列中等待时间最长的节点移动到同步队列中,使得该节点能够有机会获得 lock。按照等待队列是先进先出(FIFO)的,所以等待队列的头节点必然会是等待时间最长的节点,也就是每次调用 condition 的 signal
方法是将头节点移动到同步队列中。signal方法源码为:
public final void signal() {
//1. 先检测当前线程是否已经获取lock
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2. 获取等待队列中第一个节点,之后的操作都是针对这个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
signal 方法首先会检测当前线程是否已经获取 lock,如果没有获取 lock 会直接抛出异常,如果获取的话,再拿到等待队列的头指针 引用的节点,之后的操作的 doSignal 方法也是基于该节点。下面我们来看看doSignal
方法做了些什么事情,doSignal
方法源码为:
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//1. 将头结点从等待队列中移除
first.nextWaiter = null;
//2. while中transferForSignal方法对头结点做真正的处理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
具体逻辑请看注释,真正对头节点做处理的逻辑在 transferForSignal 放,该方法源码为:
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 1. 更新状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 2.将该节点移入到同步队列中去
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
这段代码主要做了两件事情
- 将头结点的状态更改为
CONDITION
; - 调用
enq
方法,将该节点尾插入到同步队列中,关于enq
方法请看AQS的底层实现这篇文章。
现在我们可以得出结论:调用**condition**
的**signal**
的前提条件是当前线程已经获取了**lock**
,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等待线程被唤醒,即从**await**
方法中的**LockSupport.park(this)**
方法中返回,从而才有机会使得调用**await**
方法的线程成功退出。signal执行示意图如下图:
signalAll
sigllAll 与 sigal 方法的区别体现在doSignalAll方法上,前面我们已经知道 doSignal 方法只会对等待队列的头节点进行操作,,而doSignalAll
的源码为:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
该方法只不过时间等待队列中的每一个节点都移入到同步队列中,即「通知」当前调用condition.await()
方法的每一个线程。
3. await与signal/signalAll的结合思考
文章开篇提到等待/通知机制,通过使用 condition 提供的await
和signal/signalAll
方法就可以实现这种机制,而这种机制能够解决最经典的问题就是「生产者与消费者问题」。await
和signal
和signalAll
方法就像一个开关控制着线程A(等待方)和线程B(通知方)。它们之间的关系可以用下面一个图来表现得更加贴切:
- 线程 awaitThread 先通过
**lock.lock()**
方法获取锁成功后调用了**condition.await**
方法进入等待队列 - 而另一个线程 signalThread 通过
**lock.lock()**
方法获取锁成功后调用了**condition.signal**
或者**signalAll**
方法,使得线程 awaitThread 能够有机会从等待队列移入到同步队列中 - 当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取 lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。
- 如果 awaitThread 获取 lock 失败会直接进入到同步队列。
4. 一个例子
public class AwaitSignal {
private static ReentrantLock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
private static volatile boolean flag = false;
public static void main(String[] args) {
Thread waiter = new Thread(new waiter());
waiter.start();
Thread signaler = new Thread(new signaler());
signaler.start();
}
static class waiter implements Runnable {
@Override
public void run() {
lock.lock();
try {
while (!flag) {
System.out.println(Thread.currentThread().getName() + "当前条件不满足等待");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "接收到通知条件满足");
} finally {
lock.unlock();
}
}
}
static class signaler implements Runnable {
@Override
public void run() {
lock.lock();
try {
flag = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
}
}
输出结果为:
Thread-0当前条件不满足等待
Thread-0接收到通知,条件满足
开启了两个线程 waiter 和 signaler,waiter 线程开始执行的时候由于条件不满足,执行 condition.await
方法使该线程进入等待状态同时释放锁,signaler 线程获取到锁之后更改条件,并通知所有的等待线程后释放锁。这时,waiter 线程获取到锁,并由于 signaler 线程更改了条件此时相对于 waiter 来说条件满足,继续执行。