一、Lock接口
锁是用来控制多个线程访问共享资源的方式。Lock的特性如下:
- 尝试非阻塞地获取锁:当前线程尝试获取锁,如果这一时刻没有被其他线程获取到,则成功获取并持有锁
- 能被中断地获取锁:与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放。
- 超时获取锁:在指定的截止时间之间获取锁,如果截止时间到了仍旧无法获取锁,则返回
Lock是一个接口,它定义了锁获取和释放的基本操作,Lock接口的实现基本都是通过聚合一个同步器的子类来完成线程访问控制的。其Api如下:
方法名称 | 描述 |
---|---|
void lock() | 获取锁,调用该方法的当前线程会获取锁,当锁获得后,从该方法返回。 |
void lockInterruptibly() throws InterruptedException | 可中断的获取锁,和lock()不同之处在于该方法会响应中断,即在所得获取中可以终端当前线程 |
boolean tryLock() | 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false |
boolean tryLock(long time,TimeUnit unit) throws InterruptedException | 超时的获取锁,当前线程在以下3种情况下回返回: 1)当前线程在超时时间内获取了锁 2)当前线程在超时时间内被中断 3)超时时间解锁,返回false |
void unlock() | 释放锁 |
Condition newCondition() | 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的wait(),而调用后,当前线程将释放锁。 |
二、队列同步器
队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,子类推荐被定义成自定义同步组件的静态内部类。
2.1 队列同步器的接口
重写同步器指定方法时,需要使用同步器提供的3个方法来访问或修改同步状态。
- getState():获取当前同步状态
- setState(int newState):设置当前同步状态
- compareAndSetState(int except,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。
同步器可重写的方法如下表:
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
实现自定义同步组件时,将会调用同步器提供的模板方法。
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出异常并返回。 |
boolean tryAcquireNanos(int arg,long nanos) | 在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false。 |
void acquireShared(int arg) | 共享式的获取同步状态,如果当前线程未获取到同步状态,则进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInterruptibly(int arg) | 与acquireShared(int arg)相同,该方法响应中断 |
boolean tryAcquireSharedNanos(int arg,long nanos) | 在acquireSharedInterruptibly(int arg)基础上增加了超时限制 |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态后,将同步队列中第一个节点的线程唤醒 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection |
获取等待在同步队列上的线程集合 |
同步器提供的模板方法基本上分为3类:
- 独占式获取与释放同步状态
- 共享式获取与释放同步状态
- 查询同步队列中的等待线程情况
2.2 队列同步器的实现分析
1. 同步队列
同步器内部依赖同步队列(一个FIFO的双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点的线程唤醒,使其再次尝试获取同步状态。
同步队列中的节点用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。节点的属性类型与名称及描述如表所示
属性类型与名称 | 描述 |
---|---|
int waitStatus | 等待状态,包含如下状态: 1. CANCELLED,值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化 1. SIGNAL,值为-1,后继节点的线程处于等待状态,而当前结点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 1. CONDITION,值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中 1. PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件的被传播下去 1. INTIAL,值为0,初始状态 |
Node prev | 前驱节点,当节点加入同步队列时被设置(尾部添加) |
Node next | 后继节点 |
Node nextWaiter | 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,也就是说节点类型(独占和共享)和等待队列中的后继节点使用同一个字段 |
Thread thread | 获取同步状态的线程 |
同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会加入该队列的尾部。同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node except,Node update),它需要传递当前线程认为的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置成首节点。由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要CAS来保证。
2. 独占式同步状态获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行终端操作时,线程不会从同步队列中移出。
public final void acquire(int arg){
if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE),arg) )
selfInterrupt();
}
上述代码完成了同步状态的获取、节点的构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:
- 调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态。
- 如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部。
- 最后调用acquireQueued(Node node, int arg)方法,使得该节点以死循环的方式获取同步状态。
- 如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
具体源码实现:
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(),mode);
// 快速尝试在尾部添加
Node pred = tail;
if(pred!=null){
node.prev=pred;
if(compareAndSetTail(pred,node)){
pred.next=node;
return node;
}
}
enq(node);
return node;
}
// 将节点设置为尾节点
private Node enq(final Node node){
for(;;){
Node t = tail;
if(t==null){
if(compareAndSetHead(new Node())){
tail=head;
}
}else{
node.prev=t;
if(compareAndSetTail(t,node)){
t.next=node;
return t;
}
}
}
}
节点进入同步队列之后,就进入了一个自旋的过程。
final boolean acquireQueued(final Node node, int arg){
boolean failed=true;
try{
boolean interrupted=false;
for(;;){
final Node p= node.predecessor();
if(p==head && tryAcquire(arg)){
setHead(node);
p.next=null;
failed=false;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
interrupt=true;
}
}finall{
if(failed)
cancelAcquire(node);
}
}
在此方法中,当前线程在死循环中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态:
- 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
- 维护同步队列的FIFO原则。
前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自选过程。
当前线程获取同步状态并执行了相应的逻辑之后,就需要释放同步状态,是的后续节点能够继续获取同步状态。通过调用同步器的release(int arg)可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点进而使得后继节点重新尝试获取同步状态。
public final boolean release(int arg){
if(tryRelease(arg)){
Node h=head;
if(h!=null && h.waitStatus!=0)
unparkSuccessor(h);
return true;
}
return false;
}
该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)使用LockSupport来唤醒处于等待状态的线程。
总结:
- 在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被添加到队列中并在队列中自旋
- 移除队列或停止自旋的条件是当前驱节点为头节点且成功获取了同步状态。
- 在释放同步状态时,同步器调用tryRelease(int arg)释放同步状态,然后唤醒头节点的后继节点。
3. 共享式同步状态获取与释放
调用同步器的acquireShared(int arg)可以共享式的获取同步状态。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireShared(int arg)中,同步器调用tryAcquireShared(int arg)尝试获取同步状态,当返回值大于等于0时,表示能够获取到同步状态。doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。
共享式获取也需要释放同步状态,调用releaseShared(int arg)可以释放同步状态
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
该方法在释放同步状态之后,会唤醒后续处于等待状态的节点。他和独占式主要区别在于tryReleaseShared(int arg)方法必须确保同步状态线程安全释放,一般是通过循环和CAS保证的。
4. 独占式超时获取同步状态
通过调用同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到则返回true。
先分析一下响应中断的同步状态获取过程:在Java5之前,当一个线程获取不到锁而被阻塞在synchronized时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在synchronized上,等待着获取锁。 在Java5中,同步器提供了acquireInterruptibly(int arg)这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回并抛出异常。
超时获取同步状态doAcquireNanos(int arg, long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性。针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知,nanosTimeOut计算公式为:nanosTimeOut-= now -lastTime
,其中now为当前唤醒时间,lastTime为上次唤醒时间,如果nanosTimeOut>0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime=System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now=System.nanoTime();
//计算时间,当前时间now减去睡眠之前的时间lastTime得到已经睡眠的时间delta,
//然后被原有超时时间nanosTImeout减去,得到了还应该睡眠的时间.
nanosTimeout -= now -lastTime;
lastTime=now;
if (Thread.interrupted())
throw new InterruptedException();
} finally {
if (failed)
cancelAcquire(node);
}
}
- 该方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似,但是在同步状态获取失败的处理上不同。
- 如果当前线程同步状态获取失败,则判断是否超时,如果没有超时,重新计算超时间隔nanosTimeout,然后使当前线程等待nanosTimeout纳秒(当已到设置的超时时间,该线程会从LockSupport.parkNanos(Object blocker,long nacos)方法返回)。
如果nanosTimeout小于等于spinForTimeoutThreshold时,将不会使线程进行超时等待,而是快速进入自旋过程。
三、重入锁
可重入锁,也叫做递归锁,指的是同一线程外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。在 JAVA 环境下 ReentrantLock 和 synchronized 都是可重入锁。
3.1 非公平锁的重入
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
3.2 公平锁的重入
如果一个锁是公平的,那么锁的获取顺序应该符合FIFO。对于非公平锁只要CAS设置同步状态成功,则表示获取了锁,但是公平锁则不同
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
该方法与nonfairTryAcquire(int acquires)比较唯一不同的位置是判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果方法返回true则表示有线程比当前线程更早的请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续访问。
四、读写锁
4.1 概述
为了提高性能,Java 提供了读写锁,在读的地方使用读锁,在写的地方使用写锁,灵活控制,如果没有写锁的情况下,读是无阻塞的,在一定程度上提高了程序的执行效率。读写锁分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由 JVM自己控制的,你只要上好相应的锁即可。
ReentrantReadWriteLock的特性:公平性选择——支持非公平(默认)和公平的锁获取方式。
- 重进入
- 锁降级——遵循获取写锁、获取读锁在释放写锁的次序。
4.2 读写锁的实现分析
1. 读写状态的设计
读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。如果在一个整型变量上维护多种状态,就需要按位切割使用,读写锁将变量切分成了两部分,高16位表示读,低16位表示写。读写锁通过位运算来确定读和写的状态。假设当前同步状态值为S,写状态等于S & 0x0000FFFF,读状态等于S>>>16(无符号补0右移16位)。2. 写锁的获取与释放
写锁是一个支持重入的排他锁。protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 存在读锁或者当前获取线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
3. 读锁的获取与释放
读锁是一个支持重入的共享锁。
在方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
读锁的每次释放居委减少读状态,减少的值时1<<16。4. 锁降级
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后在获取读锁,这种分段完成的不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。五、锁的升级与对比
锁一共有4种状态,级别从低到高依次是:无锁状态——偏向锁状态——轻量级锁状态——重量级锁状态,锁可以升级但不能降级。5.1 偏向锁
1. 偏向锁的获取
当一个线程访问同步块并获取锁时,会在对象头和栈桢中的锁记录里存储锁偏向的线程ID,以后线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单的测试一下对象头的Mark Word里是否存储着指向当前现成的偏向锁。如果测试失败,则需要在测试一下Mark Word中偏向锁的标识是否设置成1(表示当前是偏向锁),如果没有设置则使用CAS竞争锁;如果设置了则尝试使用CAS将对象头的偏向锁指向当前线程。2. 偏向锁的撤销
偏向锁使用了一种等到竞争出现才释放锁的机制,偏向锁的撤销需要等待全局安全点(在这个时间点上没有正在执行的字节码)。他会首先暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否活着,如果线程不处于活动状态,则将对象头设置成无锁状态;如果线程仍然活着,拥有偏向锁的栈会被执行,遍历偏向对象的锁记录,栈中的锁记录和对象头的Mark Word要么重新偏向于其他线程,要么恢复到无锁或者标记对象不适合作为偏向锁,最后唤醒暂停的线程。3. 偏向锁的关闭
偏向锁在Java6和Java7默认启用,但是它在应用程序启动几秒钟后才激活,如有必要可以使用JVM参数来关闭延迟:-XX:BiasedLockingStartupDelay=0
。如果确定所有的锁通常处于竞争状态,可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false
。
5.2 轻量级锁
1. 轻量级锁加锁
线程在执行同步块之前,JVM会先在当前线程的栈帧中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中,官方称为Displaced Mark Word。然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针。如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。
2. 轻量级锁解锁
轻量级锁解锁时,会使用原子的CAS操作将Displaced Mark Word替换回对象头。如果成功表示没有竞争发生,如果失败,表示当前锁存在竞争,所就会膨胀成重量级锁。
3. 锁的优缺点对比
锁 | 优点 | 缺点 |
---|---|---|
偏向锁 | 加锁和解锁不需要额外的消耗,和执行非同步方法相比仅存在纳秒级的差距 | 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 |
轻量级锁 | 竞争的线程不会阻塞,提高了程序的响应速度 | 如果始终得不到锁竞争的线程,使用自旋会消耗CPU |
重量级锁 | 线程竞争不使用自旋,不会消耗CPU | 线程阻塞,响应时间缓慢 |
5.3 锁消除
锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。
锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除。
5.4 锁粗化
通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽量短,即在使用完公共资源后,应该立即释放锁。但是,凡事都有一个度,如果对同一个锁不停的进行请求、同步和释放,其本身也会消耗系统宝贵的资源,反而不利于性能的优化 。
六、Condition接口
6.1 Condition接口与示例
Conditon定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的。
方法名称 | 描述 |
---|---|
void await() throws InterruptedException | 当前线程进入等待状态直到域被通知(signal)或中断,当前线程将进入运行状态且从await()方法返回的情况,包括:其他线程调用该Condition的signal()或signalAll()方法,而当前线程被选中唤醒 - 其他线程(调用interrupt()方法)中断当前线程 - 如果当前等待线程从await()方法返回,那么表明该线程已经获取了Condition对象所对应的锁 |
void awaitUninterruptibly() | 当前线程进入等待状态直到被通知,对中断不敏感 |
long awaitNanos(long nanosTimeout) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或超时。返回值表示剩余的时间,如果在nanosTimeout纳秒之前被唤醒,那么返回值就是(nanosTimeout-实际耗时)如果返回值时0或者是负数,那么就是超时 |
boolean awaitUnitl(Date deadline) throws InterruptedException | 当前线程进入等待状态直到被通知、中断或者到某个时间。如果没有到指定时间就被通知,返回true |
void signal() | 唤醒一个等待在Condition上的线程,该线程从等待方法返回必须活得与Condition相关联的锁 |
void signalAll() | 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁 |
6.2 Condition的实现分析
ConditionObject是同步器AbstraceQueuedSynchronized的内部类。每个Condition对象都包含着一个队列。
1. 等待队列
等待队列是一个FIFO队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造节点加入等待队列并进入等待状态。
一个Condition包含了一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()将会以当前线程构造节点,并将节点从尾部加入等待队列。Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可,上述节点引用更新过程并没有使用CAS保证,因为在调用await()方法的线程必定是获取了锁的线程。
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列。而并发包中的Lock拥有一个同步队列和多个等待队列。
2. 等待
调用了Condition.await()方法会使当前线程进入等待队列并释放锁,同时线程状态变成为等待状态,当从await()返回时,当前线程一定获取了Condition相关联的锁。如果从队列的角度看await()相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入等待队列
Node node = addConditionWaiter();
// 释放同步状态即释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态,如果不是通过其他线程调用signal()方法唤醒,而是对等待线程进行中断,则会抛出异常。
3. 通知
调用signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
public final void signal() {
// 判断当前线程是否获取了锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
调用该方法的前提条件是当前线程获得了锁,接着获取等待队列的首节点,调用enq(Node node)将其线程安全的移动到同步队列尾部并使用LockSupport唤醒节点中的线程。被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的accquireQueued()方法加入到获取同步状态的竞争中。成功获取同步状态之后,被唤醒的线程将从先前调用await()方法返回。