Condition的设计目的
:::tips
Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.
Condition
将 Object监视器(monitor)中 的方法(wait
、notify
和 notifyAll
)分解成不同的java对象,可以将这些对象与任意 Lock
接口的实现组合起来,为每一个对象提供多个等待集合(wait-set)。其中,Lock
替代了 synchronized
方法和语句的使用,Condition
替代了Object监视器方法的使用。
:::
- Lock中Condition出现的目的是实现类似synchornized中的wait()和notify()机制,以此来实现多个线程之间的协调与通信。
- 使用Condition,整个过程都是由开发者来控制的,相比于synchornized的传统方式,更加灵活,功能也更加强大。
Condition的使用
- Condition必须绑定一个Lock实例来使用
- 调用Condition.await()方法将会在wait之前自动释放锁,在wait返回之前会重新尝试获取锁。
-
await
class BoundedBuffer {
final Lock lock = new ReentrantLock();
//多个等待集合
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
awaitUninterruptibly
使用condition.awaitUninterruptibly()后,调用thread.interrupt(0则不会报错
awaitNanos
该方法返回了所剩毫微秒数的一个估计值,以等待所提供的 nanosTimeout
值的时间,如果超时,则返回一个小于等于 0 的值。可以用此值来确定在等待返回但某一等待条件仍不具备的情况下,是否要再次等待,以及再次等待的时间。
synchronized boolean aMethod(long timeout, TimeUnit unit) {
long nanosTimeout = unit.toNanos(timeout);
while (!conditionBeingWaitedFor) {
if (nanosTimeout > 0)
nanosTimeout = theCondition.awaitNanos(nanosTimeout);
else
return false;
}
// ...
}
signal
唤醒一个等待线程。
AQS 链表中的第一个元素
signalAll
唤醒所有等待线程。
package com.shengsiyuan.concurrency4;
/*
传统上,我们可以通过synchronized关键字 + wait + notify/notifyAll 来实现多个线程之间的协调与通信,整个过程都是由JVM来帮助
我们实现的;开发者无需(也是无法)了解底层的实现细节
从JDK 5开始,并发包提供了Lock, Condition(await与signal/signalAll)来实现多个线程之间的协调与通信,
Thread.sleep与await(或是Object的wait方法)的本质区别:sleep方法本质上不会释放锁,而await会释放锁,并且在signal后,还需要
重新获得锁才能继续执行(该行为与Object的wait方法完全一致)
*/
import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
public class MyTest2 {
public static void main(String[] args) {
BoundedContainer boundedContainer = new BoundedContainer();
IntStream.range(0, 10).forEach(i -> new Thread(() -> {
try {
boundedContainer.put("hello");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}).start());
IntStream.range(0, 10).forEach(i -> new Thread(() -> {
try {
boundedContainer.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}).start());
}
}
class BoundedContainer {
private String[] elements = new String[10];
private Lock lock = new ReentrantLock();
private Condition notEmptyCondition = lock.newCondition();
private Condition notFullCondition = lock.newCondition();
private int elementCount; // elements数组中已有的元素数量
private int putIndex;
private int takeIndex;
public void put(String element) throws InterruptedException {
this.lock.lock();
try {
while (this.elementCount == this.elements.length) {
notFullCondition.await();
}
elements[putIndex] = element;
if (++putIndex == this.elements.length) {
putIndex = 0;
}
++elementCount;
System.out.println("put method: " + Arrays.toString(elements));
notEmptyCondition.signal();
} finally {
this.lock.unlock();
}
}
public String take() throws InterruptedException {
this.lock.lock();
try {
while (0 == this.elementCount) {
notEmptyCondition.await();
}
String element = elements[takeIndex];
elements[takeIndex] = null;
if (++takeIndex == this.elements.length) {
takeIndex = 0;
}
--elementCount;
System.out.println("take method: " + Arrays.toString(elements));
notFullCondition.signal();
return element;
} finally {
this.lock.unlock();
}
}
}