Condition结构从上到下
面向开发人员的API | |
---|---|
await() |
让当前线程进入等待,直到被 signal() 或中断 |
awaitUninterruptibly() |
让当前线程进入等待,直到被 signal() |
awaitNanos(long) |
让当前线程进入等待,直到被 signal() 、中断、超时 |
await(long, TimeUnit) |
让当前线程进入等待,直到被 signal() 、中断、超时 |
awaitUntil(Date) |
让当前线程进入等待,直到被 signal() 、中断、超时 |
signal() |
唤醒一个等待中的线程 |
signalAll() |
唤醒所有等待中的线程 |
由开发人员具体实现的方法 | |
无 | |
底层实现 | |
基于AQS的队列、LockSupport |
Condition
结构整体比较简单,它复用了 AQS的节点 单独维护了一套队列。
Condition流程
在实现简易的生产队列和消费队列时,第一个想到的大多数都是 Object.wait()
和 Object.notify()
,而今天要介绍一个自带的一个工具 ReentrantLock#Condition
。它相比 Object
在唤醒方面多了一些可控性,阻塞方面多了一个限时功能。两者的等待唤醒机制几乎一致,宏观流程图如下所示:
根据这个流程类比图,我们可以对 Condition 的执行流程有了一个整体上的认知:
- A线程获取锁之后,执行完
await()
会释放锁 - B线程获取锁后执行
sinal()
会唤醒A线程 - B线程释放锁
- A线程获取锁,开始
doSomething...
- A线程释放锁
主要难点应该就在于 B线程唤醒A线程之后,A线程做了哪些事;其次就是 Condition#await()
和 Condition#signal()
做了什么事。
流程详解
执行 await()
当A线程执行完 await()
方法后,它需要进入 Condition
等待队列,其次是释放当前线程的锁,最后进入阻塞状态,其流程示意图如下所示:await()
的源码如下所示:
public final void await() throws InterruptedException {
// 判断当前显示是否已经被中断
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程封装成结点,加入Condition队列尾部,类似enq()方法,但是在锁内进行await(),不需要CAS
Node node = addConditionWaiter();
// 释放当前线程持有的所有锁,在这个操作之后很有可能出现竞态条件
int savedState = fullyRelease(node);
int interruptMode = 0;
// 结点在同步队列里时就返回true
// 结点在CONDITION队列里时就返回false
while (!isOnSyncQueue(node)) {
// 当唤醒线程调用unlock()时就会解除该线程的等待
LockSupport.park(this);
/* 第一种情况,该线程被中断,就会将interruptMode设置为THROW_IE
* 第二种情况,该线程被中断且在检查过程中状态改变(比如中断时,被唤醒),就会将mode设置为REINTERRUPT
* 第三种情况,该线程被正常signal(notify),此时结点状态值为0
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 加入获取锁的Sync队列中,等待获取锁;获取成功时判断一下mode类型
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果当前结点有后续结点,那么就清理一下链表
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 这里判断事抛出异常还是简单的中断
reportInterruptAfterWait(interruptMode);
}
整体思路应该不难,里面的个别方法需要展开来分析一下:
addConditionWaiter()
,该方法主要用来生成一个Node
进入Condition等待队列
:private Node addConditionWaiter() { Node t = lastWaiter; // 获取Condition队列的最后一个节点 // 如果发现 最后一个节点已经取消了,那么就往前查找最近的一个没取消的等待节点 // 并将其设置为lastWaiter if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; // 这里的lastWaiter已经更新了 } // 创建一个 Condition Node Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
fullRelease()
,该方法会将head
进行释放,唤醒下一个同步队列中的等待节点,具体的源码就不分析了,大家可以看前面的Exclusive
或Share
模式isOnSyncQueue()
,该方法蛮有意思的,里面的判断逻辑还挺搞:final boolean isOnSyncQueue(Node node) { // node是CONDITION状态 或者 是CONDITION队列里的第一个 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 为什么有next就是在同步队列呢?因为signal()是唤醒第一个的 // 新的Condition节点一定是在队尾,且next为空;因为之前的都在阻塞等待唤醒~ if (node.next != null) return true; // CAS失败的情况有可能存在 node.prev != null 但是却不在队列中的情况 // 如果真发生了 CAS 失败的情况,就会进入该方法 从队尾往队首遍历 return findNodeFromTail(node); }
进入阻塞
- 如果从阻塞中唤醒(调用
signal()
的线程会将 该节点放到同步队列中去并变更状态),成功跳出while()
循环 - 后续的逻辑和
exclusive
争抢锁的流程差不多~
执行signal()
整个 signal()
的逻辑也比较简单,主要的内容就三行:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
当调用 signal()
时,获取Condition队列的队首并唤醒它。主要流程在下面的 doSignal()
方法中:
private void doSignal(Node first) {
do {
// 如果 Condition队列的队首 的 nextWaiter 为空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null; // 设置lastWaiter为空
first.nextWaiter = null; // 设置nextWaiter为空
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignal()
里面主要处理了队列中的空数据,并执行唤醒的逻辑—— transferForSignal()
:
final boolean transferForSignal(Node node) {
/*
* 更改node结点的状态;如果更改失败就是已经取消了
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 将当前结点接入Sync队列,让这个结点加入等待;
*/
Node p = enq(node);
int ws = p.waitStatus;
// 更新p节点的状态为SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 将对应node的线程唤醒
return true;
}
最后当执行 signal()
方法的线程释放锁后, await()
就能上来尝试争抢一下了~
总结
AQS的锁机制主要依靠双向的链表,而 Condition等待唤醒机制只需要普通链表即可实现。这里对整体流程又进行了个总结。