Condition结构从上到下
| 面向开发人员的API | |
|---|---|
await() |
让当前线程进入等待,直到被 signal() 或中断 |
awaitUninterruptibly() |
让当前线程进入等待,直到被 signal() |
awaitNanos(long) |
让当前线程进入等待,直到被 signal() 、中断、超时 |
await(long, TimeUnit) |
让当前线程进入等待,直到被 signal() 、中断、超时 |
awaitUntil(Date) |
让当前线程进入等待,直到被 signal() 、中断、超时 |
signal() |
唤醒一个等待中的线程 |
signalAll() |
唤醒所有等待中的线程 |
| 由开发人员具体实现的方法 | |
| 无 | |
| 底层实现 | |
| 基于AQS的队列、LockSupport |
Condition 结构整体比较简单,它复用了 AQS的节点 单独维护了一套队列。
Condition流程
在实现简易的生产队列和消费队列时,第一个想到的大多数都是 Object.wait() 和 Object.notify() ,而今天要介绍一个自带的一个工具 ReentrantLock#Condition 。它相比 Object 在唤醒方面多了一些可控性,阻塞方面多了一个限时功能。两者的等待唤醒机制几乎一致,宏观流程图如下所示:
根据这个流程类比图,我们可以对 Condition 的执行流程有了一个整体上的认知:
- A线程获取锁之后,执行完
await()会释放锁 - B线程获取锁后执行
sinal()会唤醒A线程 - B线程释放锁
- A线程获取锁,开始
doSomething... - A线程释放锁
主要难点应该就在于 B线程唤醒A线程之后,A线程做了哪些事;其次就是 Condition#await() 和 Condition#signal() 做了什么事。
流程详解
执行 await()
当A线程执行完 await() 方法后,它需要进入 Condition 等待队列,其次是释放当前线程的锁,最后进入阻塞状态,其流程示意图如下所示:
await() 的源码如下所示:
public final void await() throws InterruptedException {// 判断当前显示是否已经被中断if (Thread.interrupted())throw new InterruptedException();// 将当前线程封装成结点,加入Condition队列尾部,类似enq()方法,但是在锁内进行await(),不需要CASNode node = addConditionWaiter();// 释放当前线程持有的所有锁,在这个操作之后很有可能出现竞态条件int savedState = fullyRelease(node);int interruptMode = 0;// 结点在同步队列里时就返回true// 结点在CONDITION队列里时就返回falsewhile (!isOnSyncQueue(node)) {// 当唤醒线程调用unlock()时就会解除该线程的等待LockSupport.park(this);/* 第一种情况,该线程被中断,就会将interruptMode设置为THROW_IE* 第二种情况,该线程被中断且在检查过程中状态改变(比如中断时,被唤醒),就会将mode设置为REINTERRUPT* 第三种情况,该线程被正常signal(notify),此时结点状态值为0*/if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 加入获取锁的Sync队列中,等待获取锁;获取成功时判断一下mode类型if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 如果当前结点有后续结点,那么就清理一下链表if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)// 这里判断事抛出异常还是简单的中断reportInterruptAfterWait(interruptMode);}
整体思路应该不难,里面的个别方法需要展开来分析一下:
addConditionWaiter(),该方法主要用来生成一个Node进入Condition等待队列:private Node addConditionWaiter() { Node t = lastWaiter; // 获取Condition队列的最后一个节点 // 如果发现 最后一个节点已经取消了,那么就往前查找最近的一个没取消的等待节点 // 并将其设置为lastWaiter if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; // 这里的lastWaiter已经更新了 } // 创建一个 Condition Node Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }fullRelease(),该方法会将head进行释放,唤醒下一个同步队列中的等待节点,具体的源码就不分析了,大家可以看前面的Exclusive或Share模式isOnSyncQueue(),该方法蛮有意思的,里面的判断逻辑还挺搞:final boolean isOnSyncQueue(Node node) { // node是CONDITION状态 或者 是CONDITION队列里的第一个 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 为什么有next就是在同步队列呢?因为signal()是唤醒第一个的 // 新的Condition节点一定是在队尾,且next为空;因为之前的都在阻塞等待唤醒~ if (node.next != null) return true; // CAS失败的情况有可能存在 node.prev != null 但是却不在队列中的情况 // 如果真发生了 CAS 失败的情况,就会进入该方法 从队尾往队首遍历 return findNodeFromTail(node); }进入阻塞
- 如果从阻塞中唤醒(调用
signal()的线程会将 该节点放到同步队列中去并变更状态),成功跳出while()循环 - 后续的逻辑和
exclusive争抢锁的流程差不多~
执行signal()
整个 signal() 的逻辑也比较简单,主要的内容就三行:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
当调用 signal() 时,获取Condition队列的队首并唤醒它。主要流程在下面的 doSignal() 方法中:
private void doSignal(Node first) {
do {
// 如果 Condition队列的队首 的 nextWaiter 为空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null; // 设置lastWaiter为空
first.nextWaiter = null; // 设置nextWaiter为空
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignal() 里面主要处理了队列中的空数据,并执行唤醒的逻辑—— transferForSignal() :
final boolean transferForSignal(Node node) {
/*
* 更改node结点的状态;如果更改失败就是已经取消了
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 将当前结点接入Sync队列,让这个结点加入等待;
*/
Node p = enq(node);
int ws = p.waitStatus;
// 更新p节点的状态为SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 将对应node的线程唤醒
return true;
}
最后当执行 signal() 方法的线程释放锁后, await() 就能上来尝试争抢一下了~
总结
AQS的锁机制主要依靠双向的链表,而 Condition等待唤醒机制只需要普通链表即可实现。这里对整体流程又进行了个总结。
