Java Condition
Condition接口提供了与Object阻塞(wait())与唤醒(notify()notifyAll())相似的功能,只不过Condition接口提供了更为丰富的功能,如:限定等待时长等。Condition需要与Lock结合使用,需要通过锁对象获取Condition

一、基本使用

基于Condition实现生产者、消费者模式。代码基本与Object#wait()Object#notify()类似,只不过这里使用Lock替换了synchronized关键字。

生产者

  1. public class Producer implements Runnable {
  2. private Lock lock;
  3. private Condition condition;
  4. private Queue<String> queue;
  5. private int maxSize;
  6. public Producer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {
  7. this.lock = lock;
  8. this.condition = condition;
  9. this.queue = queue;
  10. this.maxSize = maxSize;
  11. }
  12. @Override
  13. public void run() {
  14. int i = 0;
  15. for (; ; ) {
  16. lock.lock();
  17. // 如果满了,则阻塞
  18. while (queue.size() == maxSize) {
  19. System.out.println("生产者队列满了,等待...");
  20. try {
  21. condition.await();
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. try {
  27. TimeUnit.SECONDS.sleep(2);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. queue.add("一个消息:" + ++i);
  32. System.out.printf("生产者%s生产了一个消息:%s\n", Thread.currentThread().getName(), i);
  33. condition.signal();
  34. lock.unlock();
  35. }
  36. }
  37. }

消费者

  1. public class Consumer implements Runnable {
  2. private Lock lock;
  3. private Condition condition;
  4. private Queue<String> queue;
  5. private int maxSize;
  6. public Consumer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {
  7. this.lock = lock;
  8. this.condition = condition;
  9. this.queue = queue;
  10. this.maxSize = maxSize;
  11. }
  12. @Override
  13. public void run() {
  14. for (; ; ) {
  15. lock.lock();
  16. while (queue.isEmpty()) {
  17. System.out.println("消费者队列为空,等待...");
  18. try {
  19. condition.await();
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. try {
  25. TimeUnit.SECONDS.sleep(1);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. String obj = queue.remove();
  30. System.out.printf("消费者%s消费一个消息:%s\n", Thread.currentThread().getName(), obj);
  31. condition.signal();
  32. lock.unlock();
  33. }
  34. }
  35. }

测试类

  1. public class ConditionProducerConsumer {
  2. public static void main(String[] args) {
  3. Lock lock = new ReentrantLock();
  4. Condition condition = lock.newCondition();
  5. Queue<String> queue = new LinkedBlockingQueue<>();
  6. int maxSize = 10;
  7. Producer producer = new Producer(lock, condition, queue, maxSize);
  8. Consumer consumer = new Consumer(lock, condition, queue, maxSize);
  9. new Thread(producer).start();
  10. new Thread(consumer).start();
  11. }
  12. }

二、源码分析

上述示例中使用的LockReentrantLock,关于它的lock方法与unlock方法的原理详见ReentrantLock实现原理。上述示例中的Condition对象是调用了Lock#newCondition()方法,源码如下:

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2. ...
  3. public Condition newCondition() {
  4. return sync.newCondition();
  5. }
  6. abstract static class Sync extends AbstractQueuedSynchronizer {
  7. ...
  8. final ConditionObject newCondition() {
  9. return new ConditionObject();
  10. }
  11. ...
  12. }
  13. ...
  14. }

上述的ConditionObject定义在AQS中,如下:

  1. public abstract class AbstractQueuedSynchronizer
  2. extends AbstractOwnableSynchronizer
  3. implements java.io.Serializable {
  4. ...
  5. public class ConditionObject implements Condition, java.io.Serializable {
  6. ...
  7. }
  8. ...
  9. }

首先来分析下Condition#await()方法

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. Node node = addConditionWaiter();
  5. int savedState = fullyRelease(node);
  6. int interruptMode = 0;
  7. while (!isOnSyncQueue(node)) {
  8. LockSupport.park(this);
  9. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  10. break;
  11. }
  12. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  13. interruptMode = REINTERRUPT;
  14. if (node.nextWaiter != null) // clean up if cancelled
  15. unlinkCancelledWaiters();
  16. if (interruptMode != 0)
  17. reportInterruptAfterWait(interruptMode);
  18. }
  19. private Node addConditionWaiter() {
  20. Node t = lastWaiter;
  21. // If lastWaiter is cancelled, clean out.
  22. if (t != null && t.waitStatus != Node.CONDITION) {
  23. unlinkCancelledWaiters();
  24. t = lastWaiter;
  25. }
  26. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  27. if (t == null)
  28. firstWaiter = node;
  29. else
  30. t.nextWaiter = node;
  31. lastWaiter = node;
  32. return node;
  33. }

根据AQS队列的特性,若有多个线程执行lock#lock()方法,会将处于阻塞状态的线程维护到一个双向链表中,如下:
2021-07-24-18-34-19-791476.png
假设当前是线程A获取到锁,其他线程执行lock#lock()方法时,将会构建成一个上述链表。
若获取锁的线程(线程A)执行Condition#await()方法,则会将当前线程添加至Condition队列中,如下:
2021-07-24-18-34-19-971481.png
然后在调用fullyRelease()方法时会释放当前线程的锁,然后唤醒处于阻塞队列中的下一个线程:
2021-07-24-18-34-20-079457.png
在调用isOnSyncQueue()方法时会检查当前节点是否在同步队列中,若不存在,则会调用LockSupport.park()进行阻塞。
假设当前线程A是生产者线程,调用await()方法后,会释放锁,并且将当前线程加入到Condition队列中。此时,消费者能获取到锁资源,然后继续执行。假设线程B是消费者线程,当添加一个元素后会调用condition#signal()方法,定义如下:

  1. public final void signal() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. Node first = firstWaiter;
  5. if (first != null)
  6. doSignal(first);
  7. }
  8. private void doSignal(Node first) {
  9. do {
  10. if ( (firstWaiter = first.nextWaiter) == null)
  11. lastWaiter = null;
  12. first.nextWaiter = null;
  13. } while (!transferForSignal(first) &&
  14. (first = firstWaiter) != null);
  15. }
  16. final boolean transferForSignal(Node node) {
  17. /*
  18. * If cannot change waitStatus, the node has been cancelled.
  19. */
  20. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  21. return false;
  22. /*
  23. * Splice onto queue and try to set waitStatus of predecessor to
  24. * indicate that thread is (probably) waiting. If cancelled or
  25. * attempt to set waitStatus fails, wake up to resync (in which
  26. * case the waitStatus can be transiently and harmlessly wrong).
  27. */
  28. Node p = enq(node);
  29. int ws = p.waitStatus;
  30. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  31. LockSupport.unpark(node.thread);
  32. return true;
  33. }

执行signal()方法,会将Condition队列中的第一个节点移除,将其变为同步队列中的尾结点,如下:
Condition实现原理 - 图4
至此,完成了Condition队列转换为同步队列的过程。后续流程基本就是重复以上操作。
详细介绍了单个Condition队列的执行流程,其实一个Lock中可以有多个Condition队列,比如:JUC中提供的LinkedBlockingDequeArrayBlockingQueue等。