1 概述

它与 wait()方法和 notify()方法的作用是大致相同的,但是 wait()方法和 notify()方法是与 synchronized 关键字合作使用的,而 Conditition 是与重入锁相关联的。通过 lock 接口(重 入锁就实现了这一接口)的 Condition newCondition() 方法可以生成 个与当前重入锁绑定的Condition 实例。利用 Condit on 对象,我们就可以让线程在合适的时间等待,或者在某特定的时刻得到通知,继续执行。

2 示例

2.1 一个简单的demo

  1. public class ConditionTest {
  2. public static void main(String[] args) {
  3. Lock lock = new ReentrantLock();
  4. Condition condition = lock.newCondition();
  5. new Thread(() -> {
  6. lock.lock();
  7. try {
  8. System.out.println(Thread.currentThread().getName() + "执行await(会释放锁)...");
  9. condition.await();
  10. System.out.println(Thread.currentThread().getName() + "被唤醒");
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }, "线程1").start();
  15. System.out.println("main线程...");
  16. new Thread(() -> {
  17. lock.lock();
  18. try {
  19. System.out.println(Thread.currentThread().getName() + "执行sleep...");
  20. Thread.sleep(2000);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println(Thread.currentThread().getName() + "执行sleep完毕,开始唤醒线程1");
  25. condition.signal();
  26. lock.unlock();
  27. }, "线程2").start();
  28. }
  29. }

输出:

  1. main线程...
  2. 线程1执行await(会释放锁)...
  3. 线程2执行sleep...
  4. 线程2执行sleep完毕,开始唤醒线程1
  5. 线程1被唤醒

2.2 生产者消费者示例

  1. public class ProductConsumeByLockTest {
  2. public static void main(String[] args) {
  3. DealDataByLock data = new DealDataByLock();
  4. new Thread(() -> {
  5. Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());
  6. }, "线程1").start();
  7. new Thread(() -> {
  8. Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());
  9. }, "线程2").start();
  10. new Thread(() -> {
  11. Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());
  12. }, "线程3").start();
  13. new Thread(() -> {
  14. Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());
  15. }, "线程4").start();
  16. }
  17. }
  18. class DealDataByLock {
  19. private int number = 0;
  20. private Lock lock = new ReentrantLock();
  21. private Condition condition = lock.newCondition();
  22. public void product() {
  23. lock.lock();
  24. try {
  25. while (number != 0)
  26. condition.await();
  27. number++;
  28. System.out.println(Thread.currentThread().getName() + " 生产者生产了number=" + number);
  29. condition.signalAll();
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. } finally {
  33. lock.unlock();
  34. }
  35. }
  36. public void consume() {
  37. lock.lock();
  38. try {
  39. while (number == 0)
  40. condition.await();
  41. number--;
  42. System.out.println(Thread.currentThread().getName() + " 消费者消费了number=" + number);
  43. condition.signalAll();
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. } finally {
  47. lock.unlock();
  48. }
  49. }
  50. }

输出:

  1. 线程3 生产者生产了number=1
  2. 线程4 消费者消费了number=0
  3. 线程1 生产者生产了number=1
  4. 线程4 消费者消费了number=0
  5. 线程1 生产者生产了number=1
  6. 线程4 消费者消费了number=0
  7. 线程1 生产者生产了number=1
  8. 线程4 消费者消费了number=0
  9. 线程1 生产者生产了number=1
  10. 线程4 消费者消费了number=0
  11. 线程1 生产者生产了number=1
  12. 线程2 消费者消费了number=0
  13. 线程3 生产者生产了number=1
  14. 线程2 消费者消费了number=0
  15. 线程3 生产者生产了number=1
  16. 线程2 消费者消费了number=0
  17. 线程3 生产者生产了number=1
  18. 线程2 消费者消费了number=0
  19. 线程3 生产者生产了number=1
  20. 线程2 消费者消费了number=0

3 实现原理

3.1 ConditionObject

ConditionObject是AbstractQueuedSynchronizer的内部类

  1. public class ConditionObject implements Condition, java.io.Serializable {
  2. private static final long serialVersionUID = 1173984872572414699L;
  3. /** 头节点 */
  4. private transient Node firstWaiter;
  5. /** 尾节点 */
  6. private transient Node lastWaiter;
  7. /**
  8. * 构造函数
  9. */
  10. public ConditionObject() { }
  11. ......

3.1.1 Node(单向链表)

  1. static final class Node {
  2. /** Marker to indicate a node is waiting in shared mode */
  3. static final Node SHARED = new Node();
  4. /** Marker to indicate a node is waiting in exclusive mode */
  5. static final Node EXCLUSIVE = null;
  6. static final int CANCELLED = 1;//取消
  7. static final int SIGNAL = -1;//唤醒
  8. static final int CONDITION = -2;//等待条件
  9. static final int PROPAGATE = -3;//广播
  10. volatile int waitStatus;//等待状态
  11. volatile Node prev;
  12. volatile Node next;
  13. volatile Thread thread;
  14. Node nextWaiter;
  15. ......

3.2 await()

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter(); //当前线程new Node()加入条件队列
  5. int savedState = fullyRelease(node); // 释放当先线程的锁
  6. int interruptMode = 0;
  7. /**
  8. * 自旋:
  9. * 1.当前节点不在同步队列(刚new的节点肯定不在),挂起当前线程,等待被唤醒
  10. * 2.当其他线程调用同一个ConditionObject的signal方法时,会将队列里的节点放入同步队列,并unpark线程(排队唤醒)
  11. * 3.如果该节点被唤醒,再自旋检查是否在同步队列。发现已经在队列中,就可以跳出循环,获取lock
  12. */
  13. while (!isOnSyncQueue(node)) {
  14. LockSupport.park(this);
  15. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  16. break;
  17. }
  18. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  19. interruptMode = REINTERRUPT;
  20. if (node.nextWaiter != null)
  21. unlinkCancelledWaiters(); //解除条件队列中被取消的侍者节点的链接
  22. if (interruptMode != 0)
  23. reportInterruptAfterWait(interruptMode); //抛出中断异常
  24. }

3.2.1 addConditionWaiter

创建或加入单项链表

  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter;
  3. // If lastWaiter is cancelled, clean out.
  4. if (t != null && t.waitStatus != Node.CONDITION) {
  5. unlinkCancelledWaiters();
  6. t = lastWaiter;
  7. }
  8. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  9. if (t == null)
  10. firstWaiter = node;
  11. else
  12. t.nextWaiter = node;
  13. lastWaiter = node;
  14. return node;
  15. }

3.2.2 fullyRelease

锁释放,使用当前状态值调用release;返回保存的状态。

  1. final int fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. int savedState = getState();
  5. if (release(savedState)) {
  6. failed = false;
  7. return savedState;
  8. } else {
  9. throw new IllegalMonitorStateException();
  10. }
  11. } finally {
  12. if (failed)
  13. node.waitStatus = Node.CANCELLED;
  14. }
  15. }

3.3 signal()

  1. public final void signal() {
  2. if (!isHeldExclusively()) //检查是否获取到锁
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;
  5. if (first != null)
  6. doSignal(first);
  7. }

3.3.1 doSignal

  1. private void doSignal(Node first) {
  2. do {
  3. if ( (firstWaiter = first.nextWaiter) == null)
  4. lastWaiter = null;
  5. first.nextWaiter = null;
  6. } while (!transferForSignal(first) && // 唤醒队列第一个节点
  7. (first = firstWaiter) != null);
  8. }

3.3.2 transferForSignal

将节点从条件队列转移到同步队列。如果成功返回true。

  1. final boolean transferForSignal(Node node) {
  2. /*
  3. * If cannot change waitStatus, the node has been cancelled.
  4. */
  5. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  6. return false;
  7. Node p = enq(node); //拼接到队列上
  8. int ws = p.waitStatus;
  9. //并尝试将前任的waitStatus设置为表示线程(可能)正在等待。
  10. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  11. LockSupport.unpark(node.thread); //唤醒线程
  12. return true;
  13. }

3.3.3 enq

将节点插入队列,必要时进行初始化。

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { // Must initialize
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }