1 概述
它与 wait()方法和 notify()方法的作用是大致相同的,但是 wait()方法和 notify()方法是与 synchronized 关键字合作使用的,而 Conditition 是与重入锁相关联的。通过 lock 接口(重 入锁就实现了这一接口)的 Condition newCondition() 方法可以生成 个与当前重入锁绑定的Condition 实例。利用 Condit on 对象,我们就可以让线程在合适的时间等待,或者在某特定的时刻得到通知,继续执行。
2 示例
2.1 一个简单的demo
public class ConditionTest {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "执行await(会释放锁)...");
condition.await();
System.out.println(Thread.currentThread().getName() + "被唤醒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程1").start();
System.out.println("main线程...");
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "执行sleep...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行sleep完毕,开始唤醒线程1");
condition.signal();
lock.unlock();
}, "线程2").start();
}
}
输出:
main线程...
线程1执行await(会释放锁)...
线程2执行sleep...
线程2执行sleep完毕,开始唤醒线程1
线程1被唤醒
2.2 生产者消费者示例
public class ProductConsumeByLockTest {
public static void main(String[] args) {
DealDataByLock data = new DealDataByLock();
new Thread(() -> {
Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());
}, "线程1").start();
new Thread(() -> {
Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());
}, "线程2").start();
new Thread(() -> {
Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());
}, "线程3").start();
new Thread(() -> {
Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());
}, "线程4").start();
}
}
class DealDataByLock {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void product() {
lock.lock();
try {
while (number != 0)
condition.await();
number++;
System.out.println(Thread.currentThread().getName() + " 生产者生产了number=" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
while (number == 0)
condition.await();
number--;
System.out.println(Thread.currentThread().getName() + " 消费者消费了number=" + number);
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
输出:
线程3 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程4 消费者消费了number=0
线程1 生产者生产了number=1
线程2 消费者消费了number=0
线程3 生产者生产了number=1
线程2 消费者消费了number=0
线程3 生产者生产了number=1
线程2 消费者消费了number=0
线程3 生产者生产了number=1
线程2 消费者消费了number=0
线程3 生产者生产了number=1
线程2 消费者消费了number=0
3 实现原理
3.1 ConditionObject
ConditionObject是AbstractQueuedSynchronizer的内部类
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 头节点 */
private transient Node firstWaiter;
/** 尾节点 */
private transient Node lastWaiter;
/**
* 构造函数
*/
public ConditionObject() { }
......
3.1.1 Node(单向链表)
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;//取消
static final int SIGNAL = -1;//唤醒
static final int CONDITION = -2;//等待条件
static final int PROPAGATE = -3;//广播
volatile int waitStatus;//等待状态
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
......
3.2 await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //当前线程new Node()加入条件队列
int savedState = fullyRelease(node); // 释放当先线程的锁
int interruptMode = 0;
/**
* 自旋:
* 1.当前节点不在同步队列(刚new的节点肯定不在),挂起当前线程,等待被唤醒
* 2.当其他线程调用同一个ConditionObject的signal方法时,会将队列里的节点放入同步队列,并unpark线程(排队唤醒)
* 3.如果该节点被唤醒,再自旋检查是否在同步队列。发现已经在队列中,就可以跳出循环,获取lock
*/
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters(); //解除条件队列中被取消的侍者节点的链接
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode); //抛出中断异常
}
3.2.1 addConditionWaiter
创建或加入单项链表
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
3.2.2 fullyRelease
锁释放,使用当前状态值调用release;返回保存的状态。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
3.3 signal()
public final void signal() {
if (!isHeldExclusively()) //检查是否获取到锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
3.3.1 doSignal
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 唤醒队列第一个节点
(first = firstWaiter) != null);
}
3.3.2 transferForSignal
将节点从条件队列转移到同步队列。如果成功返回true。
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node); //拼接到队列上
int ws = p.waitStatus;
//并尝试将前任的waitStatus设置为表示线程(可能)正在等待。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); //唤醒线程
return true;
}
3.3.3 enq
将节点插入队列,必要时进行初始化。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}