1 概述
它与 wait()方法和 notify()方法的作用是大致相同的,但是 wait()方法和 notify()方法是与 synchronized 关键字合作使用的,而 Conditition 是与重入锁相关联的。通过 lock 接口(重 入锁就实现了这一接口)的 Condition newCondition() 方法可以生成 个与当前重入锁绑定的Condition 实例。利用 Condit on 对象,我们就可以让线程在合适的时间等待,或者在某特定的时刻得到通知,继续执行。
2 示例
2.1 一个简单的demo
public class ConditionTest {public static void main(String[] args) {Lock lock = new ReentrantLock();Condition condition = lock.newCondition();new Thread(() -> {lock.lock();try {System.out.println(Thread.currentThread().getName() + "执行await(会释放锁)...");condition.await();System.out.println(Thread.currentThread().getName() + "被唤醒");} catch (InterruptedException e) {e.printStackTrace();}}, "线程1").start();System.out.println("main线程...");new Thread(() -> {lock.lock();try {System.out.println(Thread.currentThread().getName() + "执行sleep...");Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "执行sleep完毕,开始唤醒线程1");condition.signal();lock.unlock();}, "线程2").start();}}
输出:
main线程...线程1执行await(会释放锁)...线程2执行sleep...线程2执行sleep完毕,开始唤醒线程1线程1被唤醒
2.2 生产者消费者示例
public class ProductConsumeByLockTest {public static void main(String[] args) {DealDataByLock data = new DealDataByLock();new Thread(() -> {Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());}, "线程1").start();new Thread(() -> {Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());}, "线程2").start();new Thread(() -> {Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.product());}, "线程3").start();new Thread(() -> {Arrays.asList(1, 2, 3, 4, 5).stream().forEach(i -> data.consume());}, "线程4").start();}}class DealDataByLock {private int number = 0;private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();public void product() {lock.lock();try {while (number != 0)condition.await();number++;System.out.println(Thread.currentThread().getName() + " 生产者生产了number=" + number);condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}public void consume() {lock.lock();try {while (number == 0)condition.await();number--;System.out.println(Thread.currentThread().getName() + " 消费者消费了number=" + number);condition.signalAll();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}}
输出:
线程3 生产者生产了number=1线程4 消费者消费了number=0线程1 生产者生产了number=1线程4 消费者消费了number=0线程1 生产者生产了number=1线程4 消费者消费了number=0线程1 生产者生产了number=1线程4 消费者消费了number=0线程1 生产者生产了number=1线程4 消费者消费了number=0线程1 生产者生产了number=1线程2 消费者消费了number=0线程3 生产者生产了number=1线程2 消费者消费了number=0线程3 生产者生产了number=1线程2 消费者消费了number=0线程3 生产者生产了number=1线程2 消费者消费了number=0线程3 生产者生产了number=1线程2 消费者消费了number=0
3 实现原理
3.1 ConditionObject
ConditionObject是AbstractQueuedSynchronizer的内部类
public class ConditionObject implements Condition, java.io.Serializable {private static final long serialVersionUID = 1173984872572414699L;/** 头节点 */private transient Node firstWaiter;/** 尾节点 */private transient Node lastWaiter;/*** 构造函数*/public ConditionObject() { }......
3.1.1 Node(单向链表)
static final class Node {/** Marker to indicate a node is waiting in shared mode */static final Node SHARED = new Node();/** Marker to indicate a node is waiting in exclusive mode */static final Node EXCLUSIVE = null;static final int CANCELLED = 1;//取消static final int SIGNAL = -1;//唤醒static final int CONDITION = -2;//等待条件static final int PROPAGATE = -3;//广播volatile int waitStatus;//等待状态volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;......
3.2 await()
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();Node node = addConditionWaiter(); //当前线程new Node()加入条件队列int savedState = fullyRelease(node); // 释放当先线程的锁int interruptMode = 0;/*** 自旋:* 1.当前节点不在同步队列(刚new的节点肯定不在),挂起当前线程,等待被唤醒* 2.当其他线程调用同一个ConditionObject的signal方法时,会将队列里的节点放入同步队列,并unpark线程(排队唤醒)* 3.如果该节点被唤醒,再自旋检查是否在同步队列。发现已经在队列中,就可以跳出循环,获取lock*/while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null)unlinkCancelledWaiters(); //解除条件队列中被取消的侍者节点的链接if (interruptMode != 0)reportInterruptAfterWait(interruptMode); //抛出中断异常}
3.2.1 addConditionWaiter
创建或加入单项链表
private Node addConditionWaiter() {Node t = lastWaiter;// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}
3.2.2 fullyRelease
锁释放,使用当前状态值调用release;返回保存的状态。
final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}
3.3 signal()
public final void signal() {if (!isHeldExclusively()) //检查是否获取到锁throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignal(first);}
3.3.1 doSignal
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) && // 唤醒队列第一个节点(first = firstWaiter) != null);}
3.3.2 transferForSignal
将节点从条件队列转移到同步队列。如果成功返回true。
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;Node p = enq(node); //拼接到队列上int ws = p.waitStatus;//并尝试将前任的waitStatus设置为表示线程(可能)正在等待。if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread); //唤醒线程return true;}
3.3.3 enq
将节点插入队列,必要时进行初始化。
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
