condition是同步器AQS的内部类,因为condition的操作需要获取相关联的锁,每个condition都包含一个等待队列。该队列是实现等待/通知功能的关键。
Condition接口
public interface Condition {void await() throws InterruptedException;//阻塞线程,并释放锁。直到被唤醒或者打断 ,被打断的话会抛出InterruptedException异常void awaitUninterruptibly();//会一直等到被唤醒,被打断,不会抛出异常//带超时时间的awaitlong awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();//唤醒单个阻塞线程void signalAll();//唤醒全部的阻塞线程}
condition.await()阻塞线程
public final void await() throws InterruptedException {if (Thread.interrupted())//如果线程有中断标识,抛出InterruptedExceptionthrow new InterruptedException();Node node = addConditionWaiter();//添加condition等待队列int savedState = fullyRelease(node);//释放当前线程的锁。int interruptMode = 0;while (!isOnSyncQueue(node)) {//不在同步队列上。while保证节点从aqs队列上移除LockSupport.park(this);//线程阻塞在这里等待被唤醒if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //如果被中断过,跳出while循环break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //自旋直到抢到锁或者挂起 THROW_IE模式意味着在退出等待时抛出InterruptedExceptioninterruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelled,清理被取消的线程unlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);//等待后,恢复中断标识}
添加到condition队列中
private Node addConditionWaiter() {Node t = lastWaiter;//尾节点// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {//尾部有节点,但是节点不是condition(-2) 等待状态unlinkCancelledWaiters(); //拆开并取消等待t = lastWaiter; //指向新的尾节点}Node node = new Node(Thread.currentThread(), Node.CONDITION);//创建一个新的condition节点if (t == null)//队列里没值firstWaiter = node;//当头节点也指向当前节点elset.nextWaiter = node;//放入队列尾部lastWaiter = node;//last—>当前节点return node;}
释放线程
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();//获取当前线程的state 。多次重入的话大于等于1if (release(savedState)) {//全部释放,唤醒aqs队列排在第一的线程failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;//释放异常,将当前线程状态改为取消状态}}
释放锁之后,此线程不在aqs 队列中,所以线程会被阻塞在LockSupport.park(this)等待signal唤醒。
condition会将该节点加回aqs队列
线程会进入acquireQueued(node, savedState)方法自旋检查,是被唤醒还是挂起(由unlock再次被唤醒)
并回复中断标识
condition.signal()唤醒阻塞
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;//condition队列的第一个节点if (first != null)doSignal(first);//唤醒方法}
找到过滤取消的节点
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)//将后面的节点设为第一节点lastWaiter = null;//队列没有值了first.nextWaiter = null;//方便gc回收} while (!transferForSignal(first) && //加入aqs队列失败(节点被取消)(first = firstWaiter) != null);}
加入aqs队列
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//cas 将condition->signalreturn false; //失败的话,说明该节点已经被取消/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/Node p = enq(node);//加入aqs队列int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//如果上个节点是取消状态,或者修改上个节点的状态为signal失败LockSupport.unpark(node.thread);//唤醒该节点return true;}
