condition是同步器AQS的内部类,因为condition的操作需要获取相关联的锁,每个condition都包含一个等待队列。该队列是实现等待/通知功能的关键。

Condition接口

  1. public interface Condition {
  2. void await() throws InterruptedException;//阻塞线程,并释放锁。直到被唤醒或者打断 ,被打断的话会抛出InterruptedException异常
  3. void awaitUninterruptibly();//会一直等到被唤醒,被打断,不会抛出异常
  4. //带超时时间的await
  5. long awaitNanos(long nanosTimeout) throws InterruptedException;
  6. boolean await(long time, TimeUnit unit) throws InterruptedException;
  7. boolean awaitUntil(Date deadline) throws InterruptedException;
  8. void signal();//唤醒单个阻塞线程
  9. void signalAll();//唤醒全部的阻塞线程
  10. }

condition.await()阻塞线程

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())//如果线程有中断标识,抛出InterruptedException
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();//添加condition等待队列
  5. int savedState = fullyRelease(node);//释放当前线程的锁。
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) {//不在同步队列上。while保证节点从aqs队列上移除
  8. LockSupport.park(this);//线程阻塞在这里等待被唤醒
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //如果被中断过,跳出while循环
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //自旋直到抢到锁或者挂起 THROW_IE模式意味着在退出等待时抛出InterruptedException
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled,清理被取消的线程
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);//等待后,恢复中断标识
  18. }

添加到condition队列中

  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter;//尾节点
  3. // If lastWaiter is cancelled, clean out.
  4. if (t != null && t.waitStatus != Node.CONDITION) {//尾部有节点,但是节点不是condition(-2) 等待状态
  5. unlinkCancelledWaiters(); //拆开并取消等待
  6. t = lastWaiter; //指向新的尾节点
  7. }
  8. Node node = new Node(Thread.currentThread(), Node.CONDITION);//创建一个新的condition节点
  9. if (t == null)//队列里没值
  10. firstWaiter = node;//当头节点也指向当前节点
  11. else
  12. t.nextWaiter = node;//放入队列尾部
  13. lastWaiter = node;//last—>当前节点
  14. return node;
  15. }

condition为单向链表

释放线程

  1. final int fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. int savedState = getState();//获取当前线程的state 。多次重入的话大于等于1
  5. if (release(savedState)) {//全部释放,唤醒aqs队列排在第一的线程
  6. failed = false;
  7. return savedState;
  8. } else {
  9. throw new IllegalMonitorStateException();
  10. }
  11. } finally {
  12. if (failed)
  13. node.waitStatus = Node.CANCELLED;//释放异常,将当前线程状态改为取消状态
  14. }
  15. }

释放锁之后,此线程不在aqs 队列中,所以线程会被阻塞在LockSupport.park(this)等待signal唤醒。
condition会将该节点加回aqs队列
线程会进入acquireQueued(node, savedState)方法自旋检查,是被唤醒还是挂起(由unlock再次被唤醒)
并回复中断标识

condition.signal()唤醒阻塞

  1. public final void signal() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;//condition队列的第一个节点
  5. if (first != null)
  6. doSignal(first);//唤醒方法
  7. }

找到过滤取消的节点

  1. private void doSignal(Node first) {
  2. do {
  3. if ( (firstWaiter = first.nextWaiter) == null)//将后面的节点设为第一节点
  4. lastWaiter = null;//队列没有值了
  5. first.nextWaiter = null;//方便gc回收
  6. } while (!transferForSignal(first) && //加入aqs队列失败(节点被取消)
  7. (first = firstWaiter) != null);
  8. }

加入aqs队列

  1. final boolean transferForSignal(Node node) {
  2. /*
  3. * If cannot change waitStatus, the node has been cancelled.
  4. */
  5. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//cas 将condition->signal
  6. return false; //失败的话,说明该节点已经被取消
  7. /*
  8. * Splice onto queue and try to set waitStatus of predecessor to
  9. * indicate that thread is (probably) waiting. If cancelled or
  10. * attempt to set waitStatus fails, wake up to resync (in which
  11. * case the waitStatus can be transiently and harmlessly wrong).
  12. */
  13. Node p = enq(node);//加入aqs队列
  14. int ws = p.waitStatus;
  15. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//如果上个节点是取消状态,或者修改上个节点的状态为signal失败
  16. LockSupport.unpark(node.thread);//唤醒该节点
  17. return true;
  18. }