阻塞队列介绍
Queue接口
public interface Queue<E> extends Collection<E> {
//添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常
boolean add(E e);
//添加一个元素,添加成功返回true, 如果队列满了,返回false
boolean offer(E e);
//返回并删除队首元素,队列为空则抛出异常
E remove();
//返回并删除队首元素,队列为空则返回null
E poll();
//返回队首元素,但不移除,队列为空则抛出异常
E element();
//获取队首元素,但不移除,队列为空则返回null
E peek();
}
BlockingQueue接口
BlockingQueue 继承了 Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。两个附加操作:
- put:支持阻塞的插入方式,队列满时,队列会阻塞插入元素的线程,直到队列不满
take:支持阻塞的移除方法:队列为空时,获取元素的线程会等待队列变为非空 ```java public interface BlockingQueue
extends Queue { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
}
BlockingQueue和JDK集合包中的Queue接口兼容,同时在其基础上增加了阻塞功能。<br />(1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不阻塞)<br />(2)offer(E e, long timeout, TimeUnit unit):可以设置阻塞时间,如果队列已满,则进行阻塞。超过阻塞时间,则返回false<br />(3)put(E e):队列没满的时候是正常的插入,如果队列已满,则阻塞,直至队列空出位置 <br />**出队:**<br />(1)poll():如果有数据,出队,如果没有数据,返回null (不阻塞)<br />(2)poll(long timeout, TimeUnit unit):可以设置阻塞时间,如果没有数据,则阻塞,超过阻塞时间,则返回null<br />(3)take():队列里有数据会正常取出数据并删除;但是如果队列里无数据,则阻塞,直到队列里有数据
**BlockingQueue常用方法示例**<br />当队列满了无法添加元素,或者是队列空了无法移除元素时:
| 方法 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞特定时间 |
| --- | --- | --- | --- | --- |
| 入队 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 出队 | remove() | poll() | take() | poll(time, unit) |
| 获取队首元素 | element() | peek() | 不支持 | 不支持 |
<a name="CGk1y"></a>
#### 阻塞队列特性
**阻塞**<br />**阻塞功能使得生产者和消费者两端能力得到平衡,**<br />**最重要的方法是:take put**<br />take方法<br />:获取并移除队列的头结点,如果无数据,阻塞,直到有数据<br />![1648737049(1).png](https://cdn.nlark.com/yuque/0/2022/png/1275320/1648737054175-090557d3-701f-4dd8-9442-9cdfbe6543e7.png#clientId=uc1abc99c-741a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=223&id=u5e281681&margin=%5Bobject%20Object%5D&name=1648737049%281%29.png&originHeight=335&originWidth=723&originalType=binary&ratio=1&rotation=0&showTitle=false&size=17462&status=done&style=none&taskId=u9c8cb8b7-834f-490e-a762-1680c4eeafb&title=&width=482)<br />put方法<br /> 插入元素,如果已经满了,阻塞,直到可以插入<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/1275320/1648737097159-f811c0e3-5f92-4d0b-9c32-e363675da2db.png#clientId=uc1abc99c-741a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=221&id=u73bc8746&margin=%5Bobject%20Object%5D&name=image.png&originHeight=332&originWidth=762&originalType=binary&ratio=1&rotation=0&showTitle=false&size=25053&status=done&style=none&taskId=u6114f564-5d1e-481e-ace7-993afa2b0a4&title=&width=508)<br />是否有界<br />无界:LinkedBlockingQueue 上线Integer。Max_VALUE<br />有界::初始化一个大小<br />应用场景<br />生产者,消费者<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/1275320/1648737229334-1b9e9d6d-4a6b-491d-b9f2-2d05a475dddf.png#clientId=uc1abc99c-741a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=255&id=u55de0d1a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=383&originWidth=752&originalType=binary&ratio=1&rotation=0&showTitle=false&size=33849&status=done&style=none&taskId=u75642ffe-1eaa-4122-85a2-255e0b7ea49&title=&width=501.3333333333333)1<br />1:线程安全<br />2:隔离作用,具体任务与执行任务类的解耦<br />提高安全性
**常见阻塞队列**<br />BlockingQueue 接口的实现类都被放在了 juc 包中,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
| **队列** | 描述 |
| --- | --- |
| **ArrayBlockingQueue** | 基于数组结构实现的一个有界阻塞队列 |
| **LinkedBlockingQueue** | 基于链表结构实现的一个有界阻塞队列 |
| **PriorityBlockingQueue** | 支持按优先级排序的无界阻塞队列 |
| **DelayQueue** | 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列 |
| **SynchronousQueue** | 不存储元素的阻塞队列 |
| **LinkedTransferQueue** | 基于链表结构实现的一个无界阻塞队列 |
| **LinkedBlockingDeque** | 基于链表结构实现的一个双端阻塞队列 |
![31437.png](https://cdn.nlark.com/yuque/0/2022/png/1275320/1648738794347-803375a9-465e-4b64-97a8-50dffbd7e68f.png#clientId=uc1abc99c-741a-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=1442&id=u26ed7f40&margin=%5Bobject%20Object%5D&name=31437.png&originHeight=2163&originWidth=1815&originalType=binary&ratio=1&rotation=0&showTitle=false&size=446107&status=done&style=none&taskId=ubf80fdb1-86ea-4dd2-a71c-9aaf0abbbea&title=&width=1210)<br />[https://www.processon.com/view/link/618ce3941e0853689b0818e2#map](https://www.processon.com/view/link/618ce3941e0853689b0818e2#map)
<a name="pdfL3"></a>
# ArrayBlockingQueue
- 有界
- 数组
- ReentrantLock
- 入队,出队,同一把锁
场景:生产速度和消费速度基本匹配的情况下<br />生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞<br /> ![image.png](https://cdn.nlark.com/yuque/0/2022/png/1275320/1648818133458-ee95a534-6051-40dd-8fd9-ec5e6cea47a5.png#clientId=u0c4b7e7e-8333-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=187&id=ue292b4a8&margin=%5Bobject%20Object%5D&name=image.png&originHeight=280&originWidth=797&originalType=binary&ratio=1&rotation=0&showTitle=false&size=36497&status=done&style=none&taskId=ub8e2615e-8426-4336-a0d7-bdefde7fc61&title=&width=531.3333333333334)<br />**ArrayBlockingQueue使用**
```java
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1"); //向队列中添加元素
Object object = queue.take(); //从队列中取出元素
ArrayBlockingQueue的原理
数据结构
利用了Lock锁的Condition通知机制进行阻塞控制。
核心:一把锁,两个条件
//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者
private final Condition notEmpty;
//生产者
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
...
lock = new ReentrantLock(fair); //公平,非公平
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
入队put方法
public void put(E e) throws InterruptedException {
//检查是否为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁,如果线程中断抛出异常
lock.lockInterruptibly();
try {
//阻塞队列已满,则将生产者挂起,等待消费者唤醒
//设计注意点: 用while不用if是为了防止虚假唤醒
while (count == items.length)
notFull.await(); //队列满了,使用notFull等待(生产者阻塞)
// 入队
enqueue(e);
} finally {
lock.unlock(); // 唤醒消费者线程
}
}
private void enqueue(E x) {
final Object[] items = this.items;
//入队 使用的putIndex
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0; //设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部
count++;
//notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了
notEmpty.signal();
}
出队take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加锁,如果线程中断抛出异常
lock.lockInterruptibly();
try {
//如果队列为空,则消费者挂起
while (count == 0)
notEmpty.await();
//出队
return dequeue();
} finally {
lock.unlock();// 唤醒生产者线程
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //取出takeIndex位置的元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0; //设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部
count--;
if (itrs != null)
itrs.elementDequeued();
//notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位
notFull.signal();
return x;
}
LinkedBlockingQueue
- 链表实现
- 无界,最大Integer.MAX_VALUE
- 两把锁
如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
内部由单链表实现,只能从head取元素,从tail添加元素
采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。
LinkedBlockingQueue使用
//指定队列的大小创建有界队列
BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);
//无界队列
BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();
LinkedBlockingQueue的原理
// 容量,指定容量就是有界队列
private final int capacity;
// 元素数量
private final AtomicInteger count = new AtomicInteger();
// 链表头 本身是不存储任何元素的,初始化时item指向null
transient Node<E> head;
// 链表尾
private transient Node<E> last;
// take锁 锁分离,提高效率
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty条件
// 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
private final Condition notEmpty = takeLock.newCondition();
// put锁
private final ReentrantLock putLock = new ReentrantLock();
// notFull条件
// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
private final Condition notFull = putLock.newCondition();
//典型的单链表结构
static class Node<E> {
E item; //存储元素
Node<E> next; //后继节点 单链表结构
Node(E x) { item = x; }
}
构造器
public LinkedBlockingQueue() {
// 如果没传容量,就使用最大int值初始化其容量
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化head和last指针为空值节点
last = head = new Node<E>(null);
}
入队put方法
public void put(E e) throws InterruptedException {
// 不允许null元素
if (e == null) throw new NullPointerException();
int c = -1;
// 新建一个节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 使用put锁加锁
putLock.lockInterruptibly();
try {
// 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)
while (count.get() == capacity) {
notFull.await();
}
// 队列不满,就入队
enqueue(node);
c = count.getAndIncrement();// 队列长度加1,返回原值
// 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)
// 这里为啥要唤醒一下呢?
// 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock(); // 真正唤醒生产者线程
}
// 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程
if (c == 0)
signalNotEmpty();
}
private void enqueue(Node<E> node) {
// 直接加到last后面,last指向入队元素
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();// 加take锁
try {
notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
} finally {
takeLock.unlock(); // 真正唤醒消费者线程
}
}
出队take方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 使用takeLock加锁
takeLock.lockInterruptibly();
try {
// 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)
while (count.get() == 0) {
notEmpty.await();
}
// 否则,出队
x = dequeue();
c = count.getAndDecrement();//长度-1,返回原值
if (c > 1)// 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理
notEmpty.signal();
} finally {
takeLock.unlock(); // 真正唤醒消费者线程
}
// 为什么队列是满的才唤醒阻塞在notFull上的线程呢?
// 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,
// 这也是锁分离带来的代价
// 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// head节点本身是不存储任何元素的
// 这里把head删除,并把head下一个节点作为新的值
// 并把其值置空,返回原来的值
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // 方便GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程
} finally {
putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程
}
}
LinkedBlockingQueue与ArrayBlockingQueue对比
LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
- 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
- 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。