概述
BlockingQueue是Queue的子类,叫做阻塞队列,是一个经常用于并发编程中的并发容器。阻塞队列广泛地在最常用的“生产者——消费者模型”中用作数据容器。这样,可以对各个模块的业务功能进行解耦,生产者将“生产”出来的数据放置在数据容器中,而消费者仅仅只需要在“数据容器”中进行获取数据即可,两种线程只需专注于自己的业务。
BlockingQueue提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

方法
public interface BlockingQueue<E> extends Queue<E> {//添加一个元素。如果队列满,则抛出IllegalStateException异常boolean add(E e);//添加一个元素并返回true。如果队列满,返回falseboolean offer(E e);//添加一个元素,如果队列满,则阻塞void put(E e) throws InterruptedException;boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;//移出并返回头元素,如果队列空,则阻塞E take() throws InterruptedException;//移除并返回队列的头元素,如果队列空,则返回nullE poll(long timeout, TimeUnit unit)throws InterruptedException;//返回队列剩余容量int remainingCapacity();//溢出并返回头元素,如果队列空,则抛出NoSuchElementException异常boolean remove(Object o);public boolean contains(Object o);int drainTo(Collection<? super E> c);int drainTo(Collection<? super E> c, int maxElements);}
BlockingQueue继承于Queue接口,其(Queue)基本操作有
- 添加元素
add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常;
offer(E e):当往队列插入数据时,插入成功返回true,否则则返回false。当队列满时不会抛出异常;
- 删除元素
remove(Object o):从队列中删除数据,成功则返回true,否则为false
poll:删除数据,当队列为空时,返回null;
- 查看元素
element:获取队头元素,如果队列为空时则抛出NoSuchElementException异常;
peek:获取队头元素,如果队列为空则抛出NoSuchElementException异常
对于BlockingQueue
- 插入数据
put:添加元素,当队列满时,线程阻塞,直至队列有空余容量
offer(E e, long timeout, TimeUnit unit):添加元素,当队列满时,线程阻塞,直至队列有空余容量。与put方法不同的是,该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出;
- 删除数据
take():出队,当队列为空时,获取队头数据的线程会被阻塞;
poll(long timeout, TimeUnit unit):出队,当队列为空时,获取数据的线程会被阻塞,另外,如果被阻塞的线程超过了给定的时长,该线程会退出。
ArrayBlockingQueue

ArrayBlockingQueue是BlockingQueue的一个实现类,其使用的是数组来实现有界阻塞队列。
/** 数组实现的队列 */final Object[] items;/** Number of elements in the queue */int count;/** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;
从上面属性可以看出,ArrayBlockingQueue使用的数组来实现队列,另外采用的ReetrantLock锁来实现同步机制,采用Condition条件机制来实现线程通信。
此外,ArrayBlockingQueue的队列容量,即数组大小一旦设定便无法更改。
添加元素
主要留意offer()和put()两个添加元素的方法
/***添加元素,当队列满时,返回false,直至队列有空余容量。*/public boolean offer(E e) {//检测元素非空checkNotNull(e);//创建一个lockfinal ReentrantLock lock = this.lock;//申请锁lock.lock();try {//如果当前队列内容已满,则直接返回falseif (count == items.length)return false;else {//如果不为空,则执行入队操作,返回trueenqueue(e);return true;}} finally {//释放锁lock.unlock();}}/***添加元素,当队列满时,线程阻塞*/public void put(E e) throws InterruptedException {//检测元素非空checkNotNull(e);final ReentrantLock lock = this.lock;//申请可中断锁lock.lockInterruptibly();try {//如果队列已满,调用await进入阻塞状态while (count == items.length)notFull.await();//未满,入队enqueue(e);} finally {//释放锁lock.unlock();}}/***入对操作,只有持有锁时才能进入该方法*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;//队列的操作,如果入队指针指到数组最后一位,重新回到第一位if (++putIndex == items.length)putIndex = 0;count++;//通知其他线程,已入队,队列不为空notEmpty.signal();}
删除元素
/***删除元素,如果队列为空,则阻塞*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//申请可中断锁lock.lockInterruptibly();try {//若队列为空,则await,线程阻塞while (count == 0)notEmpty.await();//出队,返回出队元素return dequeue();} finally {//释放锁lock.unlock();}}/***删除元素,如果队列为空,返回null*/public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;//申请可中断锁lock.lockInterruptibly();try {//如果队列为空,且没有设置超时时间,直接返回nullwhile (count == 0) {if (nanos <= 0)return null;//否则,阻塞nanos秒nanos = notEmpty.awaitNanos(nanos);}//队列不空,出队return dequeue();} finally {//释放锁lock.unlock();}}private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;//队列性质if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();//通知其他线程,已出队,队列不满notFull.signal();return x;}
ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到ArrayBlockingQueue。而非公平性则是指访问ArrayBlockingQueue的顺序不是遵守严格的时间顺序,有可能存在,一旦ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。
其他实现类
LinkedBlockingQueue
LinkedBlockingQueue是用链表实现的有界阻塞队列,同样满足FIFO的特性,与ArrayBlockingQueue相比起来具有更高的吞吐量,为了防止LinkedBlockingQueue容量迅速增,损耗大量内存。通常在创建LinkedBlockingQueue对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化时通过构造器参数Comparator来指定排序规则。
SynchronousQueue
SynchronousQueue每个插入操作必须等待另一个线程进行相应的删除操作,因此,SynchronousQueue实际上没有存储任何数据元素,因为只有线程在删除数据时,其他线程才能插入数据,同样的,如果当前有线程在插入数据时,线程才能删除数据。SynchronousQueue也可以通过构造器参数来为其指定公平性。
LinkedTransferQueue
LinkedTransferQueue是一个由链表数据结构构成的无界阻塞队列,由于该队列实现了TransferQueue接口,与其他阻塞队列相比主要有以下不同的方法:
transfer(E e)
如果当前有线程(消费者)正在调用take()方法或者可延时的poll()方法进行消费数据时,生产者线程可以调用transfer方法将数据传递给消费者线程。如果当前没有消费者线程的话,生产者线程就会将数据插入到队尾,直到有消费者能够进行消费才能退出;
tryTransfer(E e)
tryTransfer方法如果当前有消费者线程(调用take方法或者具有超时特性的poll方法)正在消费数据的话,该方法可以将数据立即传送给消费者线程,如果当前没有消费者线程消费数据的话,就立即返回false。因此,与transfer方法相比,transfer方法是必须等到有消费者线程消费数据时,生产者线程才能够返回。而tryTransfer方法能够立即返回结果退出。
tryTransfer(E e,long timeout,imeUnit unit)
与transfer基本功能一样,只是增加了超时特性,如果数据才规定的超时时间内没有消费者进行消费的话,就返回false。
LinkedBlockingDeque
LinkedBlockingDeque是基于链表数据结构的有界阻塞双端队列,如果在创建对象时为指定大小时,其默认大小为Integer.MAX_VALUE。与LinkedBlockingQueue相比,主要的不同点在于,LinkedBlockingDeque具有双端队列的特性。
DelayQueue
DelayQueue是一个存放实现Delayed接口的数据的无界阻塞队列,只有当数据对象的延时时间达到时才能插入到队列进行存储。如果当前所有的数据都还没有达到创建时所指定的延时期,则队列没有队头,并且线程通过poll等方法获取数据元素则返回null。所谓数据延时期满时,则是通过Delayed接口的getDelay(TimeUnit.NANOSECONDS)来进行判定,如果该方法返回的是小于等于0则说明该数据元素的延时期已满。
