wait()

使当前线程阻塞 并释放持有的锁

notify()

唤醒一个处于阻塞状态下的线程

notifyAll()

唤醒所有处于阻塞状态下的线程

生产消费模型

  1. public class Application implements Runnable{
  2. private Queue<String> bags;
  3. private int maxSize;
  4. public Application(Queue<String> bags, int maxSize) {
  5. this.bags = bags;
  6. this.maxSize = maxSize;
  7. }
  8. public void run() {
  9. int i = 0 ;
  10. while (true) {
  11. i++;
  12. synchronized (bags) {
  13. if (bags.size() ==maxSize) {
  14. System.out.println("队列已满");
  15. try {
  16. bags.wait();
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. try {
  22. Thread.sleep(1000);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. System.out.println("生产者");
  27. bags.add("bags"+i);
  28. bags.notify();
  29. }
  30. }
  31. }
  32. }
  1. public class Consumer implements Runnable{
  2. private Queue<String> bags;
  3. private int maxSize;
  4. public Consumer(Queue<String> bags, int maxSize) {
  5. this.bags = bags;
  6. this.maxSize = maxSize;
  7. }
  8. public void run() {
  9. while (true) {
  10. synchronized (bags) {
  11. if (bags.isEmpty()) {
  12. try {
  13. bags.wait();
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. try {
  19. Thread.sleep(1000);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. String bag = bags.remove();
  24. System.out.println("消费数据"+bag);
  25. bags.notify();
  26. }
  27. }
  28. }
  29. public static void main(String[] args) throws InterruptedException {
  30. Queue<String> queue = new LinkedList<String>();
  31. int maxSize = 2;
  32. Application application = new Application(queue,maxSize);
  33. Consumer consumer = new Consumer(queue,maxSize);
  34. new Thread(application).start();
  35. Thread.sleep(1);
  36. new Thread(consumer).start();
  37. }
  38. }

线程A在同步快中调用 wait方法先释放锁 然后加入等待队列中 由于线程A释放了锁 原本在队列中的线程B会被唤醒 竞争到锁 当线程B调用到notify方法的时候会把等待队列的线程A唤醒 然后重新竞争锁资源 直到线程B退出同步快之后线程A才有资格抢占锁

Thread.join()

等待前面的线程执行结束在执行

  1. public final synchronized void join(long millis)
  2. throws InterruptedException {
  3. long base = System.currentTimeMillis();
  4. long now = 0;
  5. if (millis < 0) {
  6. throw new IllegalArgumentException("timeout value is negative");
  7. }
  8. if (millis == 0) {
  9. //如果当前线程时存活状态
  10. while (isAlive()) {
  11. //调用wait
  12. wait(0);
  13. }
  14. } else {
  15. while (isAlive()) {
  16. long delay = millis - now;
  17. if (delay <= 0) {
  18. break;
  19. }
  20. wait(delay);
  21. now = System.currentTimeMillis() - base;
  22. }
  23. }
  24. }

Condition

  1. public class Application implements Runnable {
  2. private Lock lock;
  3. private Condition condition;
  4. private AtomicInteger count;
  5. public Application(Lock lock, Condition condition, AtomicInteger integer) {
  6. this.lock = lock;
  7. this.condition = condition;
  8. this.count = integer;
  9. }
  10. public void run() {
  11. for (; ; ) {
  12. try {
  13. lock.lock();
  14. while (count.intValue() >= 10) { // 池子小于最大值(这里设置10,自定义)
  15. // 池子满了阻塞
  16. System.out.println("池子满了阻塞,等待消费。。。。。。");
  17. condition.await();
  18. // Thread.sleep(500);
  19. }
  20. count.incrementAndGet();
  21. System.out.println("池子生产了 count=" + count);
  22. condition.signal(); // 唤醒消费者线程
  23. Thread.sleep(500);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. } finally {
  27. lock.unlock();
  28. }
  29. }
  30. }
  31. }
  1. public class Consumer implements Runnable {
  2. private Lock lock;
  3. private Condition condition;
  4. private AtomicInteger count;
  5. public Consumer(Lock lock, Condition condition, AtomicInteger integer) {
  6. this.lock = lock;
  7. this.condition = condition;
  8. this.count = integer;
  9. }
  10. public void run() {
  11. for (; ; ) {
  12. try {
  13. lock.lock();
  14. while (count.intValue() <= 0) { // 池子不为空
  15. // 池子为空 阻塞
  16. System.out.println("池子空了,等待生产count=" + count);
  17. condition.await();
  18. // Thread.sleep(500);
  19. }
  20. System.out.println("开始消费 count=" + count);
  21. count.decrementAndGet();
  22. condition.signal();// 唤醒生产者可以生产
  23. // Thread.sleep(500);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. } finally {
  27. lock.unlock();
  28. }
  29. }
  30. }
  31. public static void main(String[] args) throws InterruptedException {
  32. Lock lock = new ReentrantLock();
  33. Condition condition = lock.newCondition();
  34. AtomicInteger integer = new AtomicInteger(0);
  35. Application conditionConsumer = new Application(lock,condition,integer);
  36. Consumer conditionProducer = new Consumer(lock,condition,integer);
  37. // 启动一个消费者
  38. new Thread(conditionConsumer).start();
  39. // 启动一个生产者
  40. new Thread(conditionProducer).start();
  41. }
  42. }

原理

Dingtalk_20220420104659.jpg
当调用wait 方法的时候从拿到一个最新创建的Node并加入 Condition 队列
唤醒AQS队列中的一个线程
判断node是否在aqs 队列上
如果不在的话将当前线程阻塞
当调用signal方法的时候会唤醒指定的线程 并添加当前节点到AQS队列

wait()

会释放一个处于 aqs等待队列的线程 然后将自己构建成一个节点 从尾部 插入到 condition队列

  1. public final void await() throws InterruptedException {
  2. if (Thread.interrupted())
  3. throw new InterruptedException();
  4. //创建一个新节点 节点状态为 condtition
  5. Node node = addConditionWaiter();
  6. //释放当前的锁 得到锁的状态 并唤醒处于 aqs队列的一个线程
  7. long savedState = fullyRelease(node);
  8. int interruptMode = 0;
  9. //判断一个节点是否在aqs队列上
  10. while (!isOnSyncQueue(node)) {
  11. //如果在 park自己阻塞等待
  12. LockSupport.park(this);
  13. //判断自己是否被中断
  14. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  15. break;
  16. }
  17. //尝试获取锁
  18. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  19. interruptMode = REINTERRUPT;
  20. //如果node 的下一个节点不是null 清理condition队列上的节点
  21. if (node.nextWaiter != null) // clean up if cancelled
  22. unlinkCancelledWaiters();
  23. if (interruptMode != 0)
  24. //线程中断抛异常
  25. reportInterruptAfterWait(interruptMode);
  26. }
  1. private Node addConditionWaiter() {
  2. //拿到尾节点
  3. Node t = lastWaiter;
  4. // If lastWaiter is cancelled, clean out.
  5. //如果尾节点不等于空 并且尾节点的状态不是 CONDITION
  6. if (t != null && t.waitStatus != Node.CONDITION) {
  7. //从链表中删除
  8. unlinkCancelledWaiters();
  9. t = lastWaiter;
  10. }
  11. //构建一个节点 节点状态为 CONDITION
  12. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  13. if (t == null)
  14. //如果尾节点== null 把构建的节点设置成尾节点
  15. firstWaiter = node;
  16. else
  17. //否则把前一个节点的next指针指向构建的节点
  18. t.nextWaiter = node;
  19. //把刚构建的节点设置成尾节点
  20. lastWaiter = node;
  21. return node;
  22. }

构建node

  1. private void unlinkCancelledWaiters() {
  2. Node t = firstWaiter;
  3. Node trail = null;
  4. // 如果首节点不为空
  5. while (t != null) {
  6. // 获取到下个节点
  7. Node next = t.nextWaiter;
  8. // 如果该节点的状态不等于conditon,则该节点需要在链表中删除
  9. if (t.waitStatus != Node.CONDITION) {
  10. // 该节点的下个节点设置为空,意味着垃圾回收后就回收该节点
  11. t.nextWaiter = null;
  12. // trail 为空,则把下一个节点负责给首节点
  13. if (trail == null)
  14. firstWaiter = next;
  15. else
  16. // 把下一个节点赋值给next,这样链表就要继续连接起来
  17. trail.nextWaiter = next;
  18. if (next == null)
  19. lastWaiter = trail;
  20. }
  21. // 等于condtion,把该节点赋值给尾节点
  22. else
  23. trail = t;
  24. // 下个一个节点赋值给t,进行下一次循环
  25. t = next;
  26. }
  27. }
  1. final long fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. //拿当前锁的状态值
  5. long savedState = getState();
  6. //释放锁
  7. if (release(savedState)) {
  8. failed = false;
  9. return savedState;
  10. } else {
  11. throw new IllegalMonitorStateException();
  12. }
  13. } finally {
  14. if (failed)
  15. node.waitStatus = Node.CANCELLED;
  16. }
  17. }

signal()

  1. public final void signal() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. //得到头
  5. Node first = firstWaiter;
  6. //有头
  7. if (first != null)
  8. doSignal(first);
  9. }
  1. private void doSignal(Node first) {
  2. do {
  3. //如果头是空 头的下一个节点是空 则直接清空
  4. if ( (firstWaiter = first.nextWaiter) == null)
  5. lastWaiter = null;
  6. first.nextWaiter = null;
  7. } while (!transferForSignal(first) &&
  8. (first = firstWaiter) != null);
  9. }
  1. final boolean transferForSignal(Node node) {
  2. /*
  3. * If cannot change waitStatus, the node has been cancelled.
  4. */
  5. //如果CAS失败 则说明当前节点状态为 CONDITION 此时需要继续查找等待队列中的下一个节点
  6. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  7. return false;
  8. /*
  9. * Splice onto queue and try to set waitStatus of predecessor to
  10. * indicate that thread is (probably) waiting. If cancelled or
  11. * attempt to set waitStatus fails, wake up to resync (in which
  12. * case the waitStatus can be transiently and harmlessly wrong).
  13. */
  14. //加入aqs队列
  15. Node p = enq(node);
  16. int ws = p.waitStatus;
  17. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  18. //唤醒节点上的线程
  19. LockSupport.unpark(node.thread);
  20. return true;
  21. }