Java ConditionCondition
接口提供了与Object
阻塞(wait()
)与唤醒(notify()
或notifyAll()
)相似的功能,只不过Condition
接口提供了更为丰富的功能,如:限定等待时长等。Condition
需要与Lock
结合使用,需要通过锁对象获取Condition
。
一、基本使用
基于Condition
实现生产者、消费者模式。代码基本与Object#wait()
和Object#notify()
类似,只不过这里使用Lock
替换了synchronized
关键字。
生产者
public class Producer implements Runnable {
private Lock lock;
private Condition condition;
private Queue<String> queue;
private int maxSize;
public Producer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {
this.lock = lock;
this.condition = condition;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
int i = 0;
for (; ; ) {
lock.lock();
// 如果满了,则阻塞
while (queue.size() == maxSize) {
System.out.println("生产者队列满了,等待...");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
queue.add("一个消息:" + ++i);
System.out.printf("生产者%s生产了一个消息:%s\n", Thread.currentThread().getName(), i);
condition.signal();
lock.unlock();
}
}
}
消费者
public class Consumer implements Runnable {
private Lock lock;
private Condition condition;
private Queue<String> queue;
private int maxSize;
public Consumer(Lock lock, Condition condition, Queue<String> queue, int maxSize) {
this.lock = lock;
this.condition = condition;
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
for (; ; ) {
lock.lock();
while (queue.isEmpty()) {
System.out.println("消费者队列为空,等待...");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String obj = queue.remove();
System.out.printf("消费者%s消费一个消息:%s\n", Thread.currentThread().getName(), obj);
condition.signal();
lock.unlock();
}
}
}
测试类
public class ConditionProducerConsumer {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Queue<String> queue = new LinkedBlockingQueue<>();
int maxSize = 10;
Producer producer = new Producer(lock, condition, queue, maxSize);
Consumer consumer = new Consumer(lock, condition, queue, maxSize);
new Thread(producer).start();
new Thread(consumer).start();
}
}
二、源码分析
上述示例中使用的Lock
是ReentrantLock
,关于它的lock
方法与unlock
方法的原理详见ReentrantLock
实现原理。上述示例中的Condition
对象是调用了Lock#newCondition()
方法,源码如下:
public class ReentrantLock implements Lock, java.io.Serializable {
...
public Condition newCondition() {
return sync.newCondition();
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
final ConditionObject newCondition() {
return new ConditionObject();
}
...
}
...
}
上述的ConditionObject
定义在AQS中,如下:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
public class ConditionObject implements Condition, java.io.Serializable {
...
}
...
}
首先来分析下Condition#await()
方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
根据AQS队列的特性,若有多个线程执行lock#lock()
方法,会将处于阻塞状态的线程维护到一个双向链表中,如下:
假设当前是线程A获取到锁,其他线程执行lock#lock()
方法时,将会构建成一个上述链表。
若获取锁的线程(线程A)执行Condition#await()
方法,则会将当前线程添加至Condition
队列中,如下:
然后在调用fullyRelease()
方法时会释放当前线程的锁,然后唤醒处于阻塞队列中的下一个线程:
在调用isOnSyncQueue()
方法时会检查当前节点是否在同步队列中,若不存在,则会调用LockSupport.park()
进行阻塞。
假设当前线程A是生产者线程,调用await()
方法后,会释放锁,并且将当前线程加入到Condition
队列中。此时,消费者能获取到锁资源,然后继续执行。假设线程B是消费者线程,当添加一个元素后会调用condition#signal()
方法,定义如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
执行signal()
方法,会将Condition
队列中的第一个节点移除,将其变为同步队列中的尾结点,如下:
至此,完成了Condition
队列转换为同步队列的过程。后续流程基本就是重复以上操作。
详细介绍了单个Condition
队列的执行流程,其实一个Lock
中可以有多个Condition
队列,比如:JUC中提供的LinkedBlockingDeque
、ArrayBlockingQueue
等。