wait()
notify()
notifyAll()
唤醒所有处于阻塞状态下的线程
生产消费模型
public class Application implements Runnable{private Queue<String> bags;private int maxSize;public Application(Queue<String> bags, int maxSize) {this.bags = bags;this.maxSize = maxSize;}public void run() {int i = 0 ;while (true) {i++;synchronized (bags) {if (bags.size() ==maxSize) {System.out.println("队列已满");try {bags.wait();} catch (InterruptedException e) {e.printStackTrace();}}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("生产者");bags.add("bags"+i);bags.notify();}}}}
public class Consumer implements Runnable{private Queue<String> bags;private int maxSize;public Consumer(Queue<String> bags, int maxSize) {this.bags = bags;this.maxSize = maxSize;}public void run() {while (true) {synchronized (bags) {if (bags.isEmpty()) {try {bags.wait();} catch (InterruptedException e) {e.printStackTrace();}}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}String bag = bags.remove();System.out.println("消费数据"+bag);bags.notify();}}}public static void main(String[] args) throws InterruptedException {Queue<String> queue = new LinkedList<String>();int maxSize = 2;Application application = new Application(queue,maxSize);Consumer consumer = new Consumer(queue,maxSize);new Thread(application).start();Thread.sleep(1);new Thread(consumer).start();}}
线程A在同步快中调用 wait方法先释放锁 然后加入等待队列中 由于线程A释放了锁 原本在队列中的线程B会被唤醒 竞争到锁 当线程B调用到notify方法的时候会把等待队列的线程A唤醒 然后重新竞争锁资源 直到线程B退出同步快之后线程A才有资格抢占锁
Thread.join()
等待前面的线程执行结束在执行
public final synchronized void join(long millis)throws InterruptedException {long base = System.currentTimeMillis();long now = 0;if (millis < 0) {throw new IllegalArgumentException("timeout value is negative");}if (millis == 0) {//如果当前线程时存活状态while (isAlive()) {//调用waitwait(0);}} else {while (isAlive()) {long delay = millis - now;if (delay <= 0) {break;}wait(delay);now = System.currentTimeMillis() - base;}}}
Condition
public class Application implements Runnable {private Lock lock;private Condition condition;private AtomicInteger count;public Application(Lock lock, Condition condition, AtomicInteger integer) {this.lock = lock;this.condition = condition;this.count = integer;}public void run() {for (; ; ) {try {lock.lock();while (count.intValue() >= 10) { // 池子小于最大值(这里设置10,自定义)// 池子满了阻塞System.out.println("池子满了阻塞,等待消费。。。。。。");condition.await();// Thread.sleep(500);}count.incrementAndGet();System.out.println("池子生产了 count=" + count);condition.signal(); // 唤醒消费者线程Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}}
public class Consumer implements Runnable {private Lock lock;private Condition condition;private AtomicInteger count;public Consumer(Lock lock, Condition condition, AtomicInteger integer) {this.lock = lock;this.condition = condition;this.count = integer;}public void run() {for (; ; ) {try {lock.lock();while (count.intValue() <= 0) { // 池子不为空// 池子为空 阻塞System.out.println("池子空了,等待生产count=" + count);condition.await();// Thread.sleep(500);}System.out.println("开始消费 count=" + count);count.decrementAndGet();condition.signal();// 唤醒生产者可以生产// Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}public static void main(String[] args) throws InterruptedException {Lock lock = new ReentrantLock();Condition condition = lock.newCondition();AtomicInteger integer = new AtomicInteger(0);Application conditionConsumer = new Application(lock,condition,integer);Consumer conditionProducer = new Consumer(lock,condition,integer);// 启动一个消费者new Thread(conditionConsumer).start();// 启动一个生产者new Thread(conditionProducer).start();}}
原理

当调用wait 方法的时候从拿到一个最新创建的Node并加入 Condition 队列
唤醒AQS队列中的一个线程
判断node是否在aqs 队列上
如果不在的话将当前线程阻塞
当调用signal方法的时候会唤醒指定的线程 并添加当前节点到AQS队列
wait()
会释放一个处于 aqs等待队列的线程 然后将自己构建成一个节点 从尾部 插入到 condition队列
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//创建一个新节点 节点状态为 condtitionNode node = addConditionWaiter();//释放当前的锁 得到锁的状态 并唤醒处于 aqs队列的一个线程long savedState = fullyRelease(node);int interruptMode = 0;//判断一个节点是否在aqs队列上while (!isOnSyncQueue(node)) {//如果在 park自己阻塞等待LockSupport.park(this);//判断自己是否被中断if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}//尝试获取锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;//如果node 的下一个节点不是null 清理condition队列上的节点if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)//线程中断抛异常reportInterruptAfterWait(interruptMode);}
private Node addConditionWaiter() {//拿到尾节点Node t = lastWaiter;// If lastWaiter is cancelled, clean out.//如果尾节点不等于空 并且尾节点的状态不是 CONDITIONif (t != null && t.waitStatus != Node.CONDITION) {//从链表中删除unlinkCancelledWaiters();t = lastWaiter;}//构建一个节点 节点状态为 CONDITIONNode node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)//如果尾节点== null 把构建的节点设置成尾节点firstWaiter = node;else//否则把前一个节点的next指针指向构建的节点t.nextWaiter = node;//把刚构建的节点设置成尾节点lastWaiter = node;return node;}
构建node
private void unlinkCancelledWaiters() {Node t = firstWaiter;Node trail = null;// 如果首节点不为空while (t != null) {// 获取到下个节点Node next = t.nextWaiter;// 如果该节点的状态不等于conditon,则该节点需要在链表中删除if (t.waitStatus != Node.CONDITION) {// 该节点的下个节点设置为空,意味着垃圾回收后就回收该节点t.nextWaiter = null;// trail 为空,则把下一个节点负责给首节点if (trail == null)firstWaiter = next;else// 把下一个节点赋值给next,这样链表就要继续连接起来trail.nextWaiter = next;if (next == null)lastWaiter = trail;}// 等于condtion,把该节点赋值给尾节点elsetrail = t;// 下个一个节点赋值给t,进行下一次循环t = next;}}
final long fullyRelease(Node node) {boolean failed = true;try {//拿当前锁的状态值long savedState = getState();//释放锁if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}
signal()
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();//得到头Node first = firstWaiter;//有头if (first != null)doSignal(first);}
private void doSignal(Node first) {do {//如果头是空 头的下一个节点是空 则直接清空if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}
final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*///如果CAS失败 则说明当前节点状态为 CONDITION 此时需要继续查找等待队列中的下一个节点if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*///加入aqs队列Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//唤醒节点上的线程LockSupport.unpark(node.thread);return true;}
