wait - notify

Java 为每个对象提供了 wait() 方法使线程阻塞等待,notify() 方法来唤醒由于 wait() 方法阻塞等待的线程。使用这两个方法的前提是当前线程持有该对象的监视器锁(隐式锁),而给对象添加隐式锁的方式就是通过 synchronized 关键字声明。

  • wait(),当前线程释放该对象的监视器锁,等待被唤醒或者被中断退出等待
  • wait(long timeoutMillis),同 wait() 方法,同时如果超过指定时间没被唤醒,可以退出等待
  • notify(),唤醒一个正在阻塞等待的线程
  • notifyAll(),唤醒所有的正在阻塞等待的线程

wait() 方法会释放对对象的监视器锁,notify() 方法只是唤醒阻塞等待的线程,和其它线程去竞争获取对象的监视器锁,具体能不能获取到监视器锁,要看 CPU 的具体调度。 代码示例:两个线程交替打印数字隐式锁锁住同一个对象,一个线程打印完后,唤醒另一个线程,然后阻塞等待另一个线程唤醒。

  1. public class Main {
  2. static class Counter {
  3. private int count = 0;
  4. public synchronized void printX() {
  5. while (count < 100 && count % 2 == 0) {
  6. count++;
  7. System.out.println(Thread.currentThread().getName() + count);
  8. notifyAll();
  9. }
  10. try {
  11. wait();
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. return;
  15. }
  16. }
  17. public synchronized void printY() {
  18. while (count < 100 && count % 2 == 1) {
  19. count++;
  20. System.out.println(Thread.currentThread().getName() + count);
  21. notifyAll();
  22. }
  23. try {
  24. wait();
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. return;
  28. }
  29. }
  30. }
  31. public static void main(String[] args) {
  32. final Counter counter = new Counter();
  33. new Thread(() -> {
  34. while (counter.count < 100) {
  35. counter.printX();
  36. }
  37. Thread.currentThread().interrupt();
  38. }, "线程A: ").start();
  39. new Thread(() -> {
  40. while (counter.count < 100) {
  41. counter.printY();
  42. }
  43. Thread.currentThread().interrupt();
  44. }, "线程B: ").start();
  45. try {
  46. Thread.sleep(10000);
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. }

await - signal

如果使用显式锁替代了隐式锁使用,则应该使用 Condition 类提供的 await()signal() 替代 wait()notify()
显示锁提供了 newCondition() 方法来声明一个条件,其实就是一个普通对象,它提供了如下方法:

  • await(),自动释放锁,进入阻塞等待状态,等待被唤醒或者被中断退出等待
  • awaitUninterruptibly(),释放锁,进入阻塞等待状态,无法通过中断唤醒
  • signal(),唤醒一个阻塞等待的线程
  • signalAll(),唤醒所有的正在阻塞等待的线程

和隐式锁的使用一样,调用以上方法需要保证当前线程持有该 Condition 条件对象的锁,否则会报 IllegalMonitorStateException 异常。

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.ThreadLocalRandom;
  4. import java.util.concurrent.atomic.AtomicBoolean;
  5. import java.util.concurrent.locks.Condition;
  6. import java.util.concurrent.locks.ReentrantLock;
  7. public class Main {
  8. static class Factory {
  9. private volatile AtomicBoolean RUNNING_STATUS = new AtomicBoolean(true);
  10. private final int MIN_SIZE = 0;
  11. private final int MAX_SIZE = 10;
  12. private List<Integer> blockedQueue = new ArrayList<>(MAX_SIZE);
  13. ReentrantLock lock = new ReentrantLock();
  14. Condition full = lock.newCondition();
  15. Condition empty = lock.newCondition();
  16. public void produce() {
  17. lock.lock();
  18. while (blockedQueue.size() >= MAX_SIZE) {
  19. System.out.println("阻塞队列满," + Thread.currentThread().getName() + " 等待生产");
  20. try {
  21. full.await();
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. return;
  25. }
  26. }
  27. try {
  28. final int num = ThreadLocalRandom.current().nextInt();
  29. blockedQueue.add(num);
  30. System.out.println(Thread.currentThread().getName() + " 生产:" + num);
  31. Thread.sleep(100);
  32. empty.signalAll();
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. } finally {
  36. lock.unlock();
  37. }
  38. }
  39. public void consume() {
  40. lock.lock();
  41. while (blockedQueue.size() <= MIN_SIZE) {
  42. System.out.println("阻塞队列空," + Thread.currentThread().getName() + " 等待消费");
  43. try {
  44. empty.await();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. try {
  50. System.out.println(Thread.currentThread().getName() + " 消费:" + blockedQueue.remove(0));
  51. Thread.sleep(100);
  52. full.signalAll();
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. } finally {
  56. lock.unlock();
  57. }
  58. }
  59. public boolean running() {
  60. return RUNNING_STATUS.get();
  61. }
  62. public void stop() {
  63. RUNNING_STATUS.set(false);
  64. }
  65. }
  66. public static void main(String[] args) {
  67. Factory factory = new Factory();
  68. new Thread(() -> {
  69. while (factory.running()) {
  70. factory.produce();
  71. }
  72. Thread.currentThread().interrupt();
  73. }, "线程A").start();
  74. new Thread(() -> {
  75. while (factory.running()) {
  76. factory.produce();
  77. }
  78. Thread.currentThread().interrupt();
  79. }, "线程B").start();
  80. new Thread(() -> {
  81. while (factory.running()) {
  82. factory.consume();
  83. }
  84. Thread.currentThread().interrupt();
  85. }, "线程C").start();
  86. new Thread(() -> {
  87. while (factory.running()) {
  88. factory.consume();
  89. }
  90. Thread.currentThread().interrupt();
  91. }, "线程D").start();
  92. try {
  93. // 两秒后停止生产和消费
  94. Thread.sleep(2000);
  95. factory.stop();
  96. } catch (InterruptedException e) {
  97. e.printStackTrace();
  98. }
  99. }
  100. }

在线程协调的条件块中,使用 while 而不是 if 通过wait() 或者 await() 使线程等待。因为当线程重新获取到锁后,会从 wait()await() 处重新开始执行,此时竞态条件可能已经发生变化,所以需要再次进入循环判断。

阻塞队列

JDK 5 新增了队列这种集合,并实现了很多常用的阻塞队列,阻塞队列通过在内部使用锁或其它并发控制来实现原子操作,它是线程安全的,我们不需要手动去实现加锁解锁阻塞等待等操作了,可以像使用普通集合类那样方便。阻塞队列 BlockingQueue 接口提供了如下方法:

  • 插入元素
    • put(E e),直接插入元素,如果没有可用空间则阻塞等待
    • add(E e),容量足够则直接插入并返回 true,超出容量限制则抛 IllegalStateException 异常
    • offer(E e),容量足够则直接插入并返回 true,超出容量限制则返回 false
    • offer(E e, long timeout, TimeUnit unit),容量足够则直接插入并返回 true,否则等待指定时间再尝试插入
  • 删除元素
    • take(),检索并删除第一个元素,如果没有元素则阻塞等待
    • poll(long timeout, TimeUnit unit),检索并删除第一个元素,如果没有元素则阻塞等待指定时间再尝试,超时没有则返回 null
    • remove(Object o),删除指定元素的单个实例,有则返回 true;contains(Object o) 判断是否含有某个元素
    • remainingCapacity(),获取队列剩余可用容量,没有限制则返回 Integer.MAX_VALUE

阻塞队列不能插入 null 值,因为 null 用作标识 poll 操作失败。阻塞队列通常用于特定场景,尽量不要当作普通队列使用。

ArrayBlockingQueue

基于数组的、有界(必须指定 capacity)、先进先出(FIFO)的阻塞队列。
内部使用可重入锁 ReentrantLock,可选择公平策略(默认非公平)。

  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.ThreadLocalRandom;
  4. public class Main {
  5. static class Producer implements Runnable {
  6. private final BlockingQueue<Integer> queue;
  7. public Producer(BlockingQueue<Integer> queue) {
  8. this.queue = queue;
  9. }
  10. @Override
  11. public void run() {
  12. while (true) {
  13. try {
  14. Thread.sleep(50);
  15. queue.put(produce());
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. return;
  19. }
  20. }
  21. }
  22. private Integer produce() {
  23. final int num = ThreadLocalRandom.current().nextInt();
  24. System.out.println(Thread.currentThread().getName() + " 生产: " + num);
  25. return num;
  26. }
  27. }
  28. static class Consumer implements Runnable {
  29. private final BlockingQueue<Integer> queue;
  30. public Consumer(BlockingQueue<Integer> queue) {
  31. this.queue = queue;
  32. }
  33. @Override
  34. public void run() {
  35. while (true) {
  36. try {
  37. Thread.sleep(80);
  38. consume(queue.take());
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. return;
  42. }
  43. }
  44. }
  45. private void consume(Integer take) {
  46. System.out.println(Thread.currentThread().getName() + " 消费: " + take);
  47. }
  48. }
  49. public static void main(String[] args) {
  50. final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
  51. final Thread a = new Thread(new Producer(queue), "A");
  52. final Thread b = new Thread(new Producer(queue), "B");
  53. final Thread c = new Thread(new Consumer(queue), "C");
  54. final Thread d = new Thread(new Consumer(queue), "D");
  55. a.start();
  56. b.start();
  57. c.start();
  58. d.start();
  59. try {
  60. Thread.sleep(1000);
  61. a.interrupt();
  62. Thread.sleep(1000);
  63. b.interrupt();
  64. Thread.sleep(1000);
  65. c.interrupt();
  66. Thread.sleep(1000);
  67. d.interrupt();
  68. Thread.sleep(1000);
  69. } catch (InterruptedException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. }

LinkedBlockingQueue

基于链表的、理论有界(Integer.MAX_VALUE)、先进先出(FIFO)的阻塞队列。
相比于 ArrayBlockingQueue 有更高的吞吐量,但性能较差。

LinkedBlockingDequeJDK 6

基于链表的、有界(默认 Integer.MAX_VALUE,可选 capacity)的阻塞双端队列。

PriorityBlockingQueue

基于优先级队列 PriorityQueue、无界的阻塞队列。
因为是无界的,插入元素不会阻塞,获取元素可以阻塞。可以通过 Comparator 参数指定如何比较优先级并提供实现 Comparable 接口的元素,不指定默认以元素插入顺序。

DelayQueue

包含具有延迟指定时间才可用的元素、无界的阻塞队列。
元素需要实现 Delayed 接口声明延迟时间。由于无界,插入元素永不阻塞;如果队列为空获取元素可以阻塞等待。
内部使用可重入锁 ReentrantLock 保证安全,使用优先级队列 PriorityQueue 实现延迟。该延迟阻塞队列可用于以下几种情况:

  • 缓存过期处理
  • 事件需要延迟处理

    SynchronousQueue

    同步阻塞队列,它不存储元素,用于线程和线程间的一对一通信。插入和获取元素必须成对出现才能同时操作,否则都一直等待对应的线程到来。

  • 公平模式下是先进先出的队列

  • 非公平模式下是后进先出栈 ```java AtomicInteger atomicNUm = new AtomicInteger(0); final SynchronousQueue queue = new SynchronousQueue<>();

new Thread(() -> { while (atomicNUm.get() < 100) { try { queue.put(atomicNUm.incrementAndGet()); System.out.println(“A run: “ + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();

new Thread(() -> { while (atomicNUm.get() < 100) { try { System.out.println(“B run: “ + queue.take()); queue.put(atomicNUm.incrementAndGet()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); ```

LinkedTransferQueueJDK 7

基于链表的、理论有界(Integer.MAX_VALUE)、先进先出(FIFO)的阻塞队列。(双同步队列)