wait - notify
Java 为每个对象提供了 wait() 方法使线程阻塞等待,notify() 方法来唤醒由于 wait() 方法阻塞等待的线程。使用这两个方法的前提是当前线程持有该对象的监视器锁(隐式锁),而给对象添加隐式锁的方式就是通过 synchronized 关键字声明。
wait(),当前线程释放该对象的监视器锁,等待被唤醒或者被中断退出等待wait(long timeoutMillis),同wait()方法,同时如果超过指定时间没被唤醒,可以退出等待notify(),唤醒一个正在阻塞等待的线程notifyAll(),唤醒所有的正在阻塞等待的线程
wait() 方法会释放对对象的监视器锁,notify() 方法只是唤醒阻塞等待的线程,和其它线程去竞争获取对象的监视器锁,具体能不能获取到监视器锁,要看 CPU 的具体调度。
代码示例:两个线程交替打印数字隐式锁锁住同一个对象,一个线程打印完后,唤醒另一个线程,然后阻塞等待另一个线程唤醒。
public class Main {static class Counter {private int count = 0;public synchronized void printX() {while (count < 100 && count % 2 == 0) {count++;System.out.println(Thread.currentThread().getName() + count);notifyAll();}try {wait();} catch (InterruptedException e) {e.printStackTrace();return;}}public synchronized void printY() {while (count < 100 && count % 2 == 1) {count++;System.out.println(Thread.currentThread().getName() + count);notifyAll();}try {wait();} catch (InterruptedException e) {e.printStackTrace();return;}}}public static void main(String[] args) {final Counter counter = new Counter();new Thread(() -> {while (counter.count < 100) {counter.printX();}Thread.currentThread().interrupt();}, "线程A: ").start();new Thread(() -> {while (counter.count < 100) {counter.printY();}Thread.currentThread().interrupt();}, "线程B: ").start();try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}}}
await - signal
如果使用显式锁替代了隐式锁使用,则应该使用 Condition 类提供的 await() 和 signal() 替代 wait() 和 notify()。
显示锁提供了 newCondition() 方法来声明一个条件,其实就是一个普通对象,它提供了如下方法:
await(),自动释放锁,进入阻塞等待状态,等待被唤醒或者被中断退出等待awaitUninterruptibly(),释放锁,进入阻塞等待状态,无法通过中断唤醒signal(),唤醒一个阻塞等待的线程signalAll(),唤醒所有的正在阻塞等待的线程
和隐式锁的使用一样,调用以上方法需要保证当前线程持有该 Condition 条件对象的锁,否则会报 IllegalMonitorStateException 异常。
import java.util.ArrayList;import java.util.List;import java.util.concurrent.ThreadLocalRandom;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class Main {static class Factory {private volatile AtomicBoolean RUNNING_STATUS = new AtomicBoolean(true);private final int MIN_SIZE = 0;private final int MAX_SIZE = 10;private List<Integer> blockedQueue = new ArrayList<>(MAX_SIZE);ReentrantLock lock = new ReentrantLock();Condition full = lock.newCondition();Condition empty = lock.newCondition();public void produce() {lock.lock();while (blockedQueue.size() >= MAX_SIZE) {System.out.println("阻塞队列满," + Thread.currentThread().getName() + " 等待生产");try {full.await();} catch (InterruptedException e) {e.printStackTrace();return;}}try {final int num = ThreadLocalRandom.current().nextInt();blockedQueue.add(num);System.out.println(Thread.currentThread().getName() + " 生产:" + num);Thread.sleep(100);empty.signalAll();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void consume() {lock.lock();while (blockedQueue.size() <= MIN_SIZE) {System.out.println("阻塞队列空," + Thread.currentThread().getName() + " 等待消费");try {empty.await();} catch (InterruptedException e) {e.printStackTrace();}}try {System.out.println(Thread.currentThread().getName() + " 消费:" + blockedQueue.remove(0));Thread.sleep(100);full.signalAll();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public boolean running() {return RUNNING_STATUS.get();}public void stop() {RUNNING_STATUS.set(false);}}public static void main(String[] args) {Factory factory = new Factory();new Thread(() -> {while (factory.running()) {factory.produce();}Thread.currentThread().interrupt();}, "线程A").start();new Thread(() -> {while (factory.running()) {factory.produce();}Thread.currentThread().interrupt();}, "线程B").start();new Thread(() -> {while (factory.running()) {factory.consume();}Thread.currentThread().interrupt();}, "线程C").start();new Thread(() -> {while (factory.running()) {factory.consume();}Thread.currentThread().interrupt();}, "线程D").start();try {// 两秒后停止生产和消费Thread.sleep(2000);factory.stop();} catch (InterruptedException e) {e.printStackTrace();}}}
在线程协调的条件块中,使用 while 而不是 if 通过wait() 或者 await() 使线程等待。因为当线程重新获取到锁后,会从 wait() 或 await() 处重新开始执行,此时竞态条件可能已经发生变化,所以需要再次进入循环判断。
阻塞队列
JDK 5 新增了队列这种集合,并实现了很多常用的阻塞队列,阻塞队列通过在内部使用锁或其它并发控制来实现原子操作,它是线程安全的,我们不需要手动去实现加锁解锁阻塞等待等操作了,可以像使用普通集合类那样方便。阻塞队列 BlockingQueue 接口提供了如下方法:
- 插入元素
put(E e),直接插入元素,如果没有可用空间则阻塞等待add(E e),容量足够则直接插入并返回 true,超出容量限制则抛 IllegalStateException 异常offer(E e),容量足够则直接插入并返回 true,超出容量限制则返回 falseoffer(E e, long timeout, TimeUnit unit),容量足够则直接插入并返回 true,否则等待指定时间再尝试插入
- 删除元素
take(),检索并删除第一个元素,如果没有元素则阻塞等待poll(long timeout, TimeUnit unit),检索并删除第一个元素,如果没有元素则阻塞等待指定时间再尝试,超时没有则返回 nullremove(Object o),删除指定元素的单个实例,有则返回 true;contains(Object o)判断是否含有某个元素remainingCapacity(),获取队列剩余可用容量,没有限制则返回 Integer.MAX_VALUE
阻塞队列不能插入 null 值,因为 null 用作标识 poll 操作失败。阻塞队列通常用于特定场景,尽量不要当作普通队列使用。
ArrayBlockingQueue
基于数组的、有界(必须指定 capacity)、先进先出(FIFO)的阻塞队列。
内部使用可重入锁 ReentrantLock,可选择公平策略(默认非公平)。
import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ThreadLocalRandom;public class Main {static class Producer implements Runnable {private final BlockingQueue<Integer> queue;public Producer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {while (true) {try {Thread.sleep(50);queue.put(produce());} catch (InterruptedException e) {e.printStackTrace();return;}}}private Integer produce() {final int num = ThreadLocalRandom.current().nextInt();System.out.println(Thread.currentThread().getName() + " 生产: " + num);return num;}}static class Consumer implements Runnable {private final BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue) {this.queue = queue;}@Overridepublic void run() {while (true) {try {Thread.sleep(80);consume(queue.take());} catch (InterruptedException e) {e.printStackTrace();return;}}}private void consume(Integer take) {System.out.println(Thread.currentThread().getName() + " 消费: " + take);}}public static void main(String[] args) {final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);final Thread a = new Thread(new Producer(queue), "A");final Thread b = new Thread(new Producer(queue), "B");final Thread c = new Thread(new Consumer(queue), "C");final Thread d = new Thread(new Consumer(queue), "D");a.start();b.start();c.start();d.start();try {Thread.sleep(1000);a.interrupt();Thread.sleep(1000);b.interrupt();Thread.sleep(1000);c.interrupt();Thread.sleep(1000);d.interrupt();Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}
LinkedBlockingQueue
基于链表的、理论有界(Integer.MAX_VALUE)、先进先出(FIFO)的阻塞队列。
相比于 ArrayBlockingQueue 有更高的吞吐量,但性能较差。
LinkedBlockingDequeJDK 6
基于链表的、有界(默认 Integer.MAX_VALUE,可选 capacity)的阻塞双端队列。
PriorityBlockingQueue
基于优先级队列 PriorityQueue、无界的阻塞队列。
因为是无界的,插入元素不会阻塞,获取元素可以阻塞。可以通过 Comparator 参数指定如何比较优先级并提供实现 Comparable 接口的元素,不指定默认以元素插入顺序。
DelayQueue
包含具有延迟指定时间才可用的元素、无界的阻塞队列。
元素需要实现 Delayed 接口声明延迟时间。由于无界,插入元素永不阻塞;如果队列为空获取元素可以阻塞等待。
内部使用可重入锁 ReentrantLock 保证安全,使用优先级队列 PriorityQueue 实现延迟。该延迟阻塞队列可用于以下几种情况:
- 缓存过期处理
-
SynchronousQueue
同步阻塞队列,它不存储元素,用于线程和线程间的一对一通信。插入和获取元素必须成对出现才能同时操作,否则都一直等待对应的线程到来。
公平模式下是先进先出的队列
- 非公平模式下是后进先出栈
```java
AtomicInteger atomicNUm = new AtomicInteger(0);
final SynchronousQueue
queue = new SynchronousQueue<>();
new Thread(() -> { while (atomicNUm.get() < 100) { try { queue.put(atomicNUm.incrementAndGet()); System.out.println(“A run: “ + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
new Thread(() -> { while (atomicNUm.get() < 100) { try { System.out.println(“B run: “ + queue.take()); queue.put(atomicNUm.incrementAndGet()); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); ```
LinkedTransferQueueJDK 7
基于链表的、理论有界(Integer.MAX_VALUE)、先进先出(FIFO)的阻塞队列。(双同步队列)
