BlockingQueue
BlockingQueue 是 Java 并发包中的一个接口,用来表示线程安全队列,可以放入并获取实例。它长应用于生产者消费者模式,例如下图:
- 生产线程:持续生产新对象并将它们插入队列,直到队列达到它可以包含的上限。换句话说,这是极限。如果阻塞队列达到其上限,则会在尝试插入新对象时阻塞生产线程。在消耗线程将对象带出队列之前,它一直处于阻塞状态。
- 消费线程:不断将对象从阻塞队列中取出,并对其进行处理。如果消费线程试图将对象从空队列中取出,则消费线程将被阻塞,直到生成的线程将对象放入队列。
BlockingQueue 方法按照官方的分类可以分为四类
- Throws exception:如果操作不能立即发生,则抛出一个异常
- Special value:如果操作不能立即执行,则返回空或者 false
- Blocks:如果操作不能立即执行,则阻塞
- Times out:如果操作不能立即执行,则定时阻塞,返回一个特殊值(boolen 空或者队列的值)用来告知操作成功与否
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(E e)) | bool offer(E e)) | put(E e) | offer(E e,long timeout TimUnit unit) |
Remove | remove(Object o)) | E poll() | E take() | E poll(long timeout,TimeUnit unit) |
Examine | element()) | E peek()) | - | - |
BlockingQueue 是个接口,在并发包下有多个它的实现类
- ArrayBlockingQueue
- DelayQueue
- LinkedBlockQueue
- PriorityBlockingQueue
- SynchronousQueue
- LinkedBlockingDeque
接下来就是挨个的去看下这些实现
ArrayBlockingQueue
ArrayBlockingQueue 由数组支持的有界阻塞队列,队列基于数组实现,容量大小在初始化对象时已经定义好。此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。默认采用非公平锁,结构如下:
队列的内部使用 ReentrantLock 锁两个 Condition(条件) 来实现的 take 和 put 方法的阻塞
// ArrayBlockingQueue.java
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
下面是个阻塞的 take 的简单实例:
Executor executor = Executors.newFixedThreadPool(2);
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(7);
executor.execute(() -> {
try {
System.out.println("等待任务...");
String str = queue.take();
System.out.println("从队列里消费了" + str);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
try {
Thread.sleep(2000);
System.out.println("发布任务");
queue.offer("任务 1");
} catch (InterruptedException e) {
e.printStackTrace();
}
});