Java ConditionCondition接口提供了与Object阻塞(wait())与唤醒(notify()或notifyAll())相似的功能,只不过Condition接口提供了更为丰富的功能,如:限定等待时长等。Condition需要与Lock结合使用,需要通过锁对象获取Condition。
一、基本使用
基于Condition实现生产者、消费者模式。代码基本与Object#wait()和Object#notify()类似,只不过这里使用Lock替换了synchronized关键字。
生产者
public class Producer implements Runnable {private Lock lock;private Condition condition;private Queue<String> queue;private int maxSize;public Producer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {this.lock = lock;this.condition = condition;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run() {int i = 0;for (; ; ) {lock.lock();// 如果满了,则阻塞while (queue.size() == maxSize) {System.out.println("生产者队列满了,等待...");try {condition.await();} catch (InterruptedException e) {e.printStackTrace();}}try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}queue.add("一个消息:" + ++i);System.out.printf("生产者%s生产了一个消息:%s\n", Thread.currentThread().getName(), i);condition.signal();lock.unlock();}}}
消费者
public class Consumer implements Runnable {private Lock lock;private Condition condition;private Queue<String> queue;private int maxSize;public Consumer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {this.lock = lock;this.condition = condition;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run() {for (; ; ) {lock.lock();while (queue.isEmpty()) {System.out.println("消费者队列为空,等待...");try {condition.await();} catch (InterruptedException e) {e.printStackTrace();}}try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}String obj = queue.remove();System.out.printf("消费者%s消费一个消息:%s\n", Thread.currentThread().getName(), obj);condition.signal();lock.unlock();}}}
测试类
public class ConditionProducerConsumer {public static void main(String[] args) {Lock lock = new ReentrantLock();Condition condition = lock.newCondition();Queue<String> queue = new LinkedBlockingQueue<>();int maxSize = 10;Producer producer = new Producer(lock, condition, queue, maxSize);Consumer consumer = new Consumer(lock, condition, queue, maxSize);new Thread(producer).start();new Thread(consumer).start();}}
二、源码分析
上述示例中使用的Lock是ReentrantLock,关于它的lock方法与unlock方法的原理详见ReentrantLock实现原理。上述示例中的Condition对象是调用了Lock#newCondition()方法,源码如下:
public class ReentrantLock implements Lock, java.io.Serializable {...public Condition newCondition() {return sync.newCondition();}abstract static class Sync extends AbstractQueuedSynchronizer {...final ConditionObject newCondition() {return new ConditionObject();}...}...}
上述的ConditionObject定义在AQS中,如下:
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {...public class ConditionObject implements Condition, java.io.Serializable {...}...}
首先来分析下Condition#await()方法
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter();int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
根据AQS队列的特性,若有多个线程执行lock#lock()方法,会将处于阻塞状态的线程维护到一个双向链表中,如下:
假设当前是线程A获取到锁,其他线程执行lock#lock()方法时,将会构建成一个上述链表。
若获取锁的线程(线程A)执行Condition#await()方法,则会将当前线程添加至Condition队列中,如下:
然后在调用fullyRelease()方法时会释放当前线程的锁,然后唤醒处于阻塞队列中的下一个线程:
在调用isOnSyncQueue()方法时会检查当前节点是否在同步队列中,若不存在,则会调用LockSupport.park()进行阻塞。
假设当前线程A是生产者线程,调用await()方法后,会释放锁,并且将当前线程加入到Condition队列中。此时,消费者能获取到锁资源,然后继续执行。假设线程B是消费者线程,当添加一个元素后会调用condition#signal()方法,定义如下:
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
执行signal()方法,会将Condition队列中的第一个节点移除,将其变为同步队列中的尾结点,如下:
至此,完成了Condition队列转换为同步队列的过程。后续流程基本就是重复以上操作。
详细介绍了单个Condition队列的执行流程,其实一个Lock中可以有多个Condition队列,比如:JUC中提供的LinkedBlockingDeque、ArrayBlockingQueue等。
