一 介绍
- 阻塞队列(BlockingQueue):顾名思义,首先它是一个队列。
- 当阻塞队列是空时,从队列中获取元素的操作线程将会被阻塞。
- 当阻塞队列是满时,往队列中添加元素的操作线程将会被阻塞。
阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
1.1 好处
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即线程阻塞),一旦条件满足,被挂起的线程又会被自动唤醒。
- 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为BlockingQueue都一手给你包办好了。
- 在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
1.2 核心用法
在阻塞队列不可用的时候,它对于操作线程提供了4种处理方式。
注意:阻塞队列如果是无界队列,那么插入元素的时是一直成功的,不会发生阻塞操作线程的情况。
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
抛出异常 | 当阻塞队列满的时候,再往队列add插入元素会抛出 IllegalStateException:Queue full。当阻塞队列空的时候,再往队列remove移除元素时候会抛出异常NoSuchElementException |
---|---|
特殊值 | 插入方法,成功返回true,失败返回false。移除方法,成功返回元素,队列为空返回null |
一直阻塞 | 当队列满的时候,生产者继续往队列里面put插入元素,队列会一直阻塞直到put插入成功或者响应中断退出。当队列为空的时候,消费者会从队列take移除元素,队列会一直阻塞消费者,直到队列有元素可以被移除。 |
超时退出 | 当阻塞队列满的时候,生产者会阻塞消费者一定时间。超时后生产者线程自动退出。 |
二 JDK中的阻塞队列
// JDK8源码:BlockingQueue
package java.util.concurrent;
// 直接父接口是Queue,Queue的直接父接口是Collection
public interface BlockingQueue<E> extends Queue<E> {
2.1 ArrayBlockingQueue
由数组结构组成的有界阻塞队列。三个构造方法如下
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* fair:指的是公平性,当有很多线程都访问阻塞队列已经阻塞,此时TRUE表示先阻塞的线程在阻塞队列
* 有空余的位置时优先访问阻塞队列,这时候会适当的降低程序的吞吐量。
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
2.2 LinkedBlockingQueue
由链表结构组成的有界阻塞队列,但是队列默认最大长度为:Integer.MAX_VALUE
此队列按照先进先出的原则对元素进行排序。
2.3 PriorityBlockingQueue
支持元素优先级顺序排列的无界阻塞队列。
默认情况下,元素按照自然排序的规则升序排列,但是也可以自定义类实现 compareTo 方法来指定元素的排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来进行排序。需要注意的是不能保证同优先级元素的顺序。
2.4 DelayQueue
由PriorityBlockingQueue实现的延时无界阻塞队列。
队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
2.5 SynchronousQueue
不存储元素的阻塞队列,也即是单个元素的队列,每一个put操作必须等待一个take操作,否则不能继续添加元素。
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
2.6 LinkedTransferQueue
由链表结构组成的无界阻塞队列,相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
- transfer方法
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者了才返回。
- tryTransfer方法
tryTransfer方法时用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回fasle。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。
2.7 LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。
所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双向队列的最后一个元素。
三 生产者消费者
阻塞队列实现生产者消费者
class MyResource
{
private volatile boolean FLAG = true; // 默认开启,进行生产+消费
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
// 生产者
public void MyProd() throws Exception{
String data = null;
boolean retValue ; // 默认是false
while (FLAG)
{
// 往阻塞队列填充数据
data = atomicInteger.incrementAndGet()+""; // 等于++i的意思
retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS); // 插入成功则返回true
if (retValue){ // 如果是true,那么代表当前这个线程插入数据成功
System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
}else { // 那么就是插入失败
System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
// 如果FLAG是false了,马上打印
System.out.println(Thread.currentThread().getName()+"\t大老板叫停了,表示FLAG=false,生产结束");
}
// 消费者
public void MyConsumer() throws Exception
{
String result = null;
while (FLAG) { // 开始消费
// 两秒钟等不到生产者生产出来的数据就不取了
result = blockingQueue.poll(2L,TimeUnit.SECONDS); // 阻塞队列移除元素失败则返回null
if (null == result || result.equalsIgnoreCase("")){ // 如果取不到数据了
FLAG = false;
System.out.println(Thread.currentThread().getName()+"\t 超过两秒钟没有取到数据,消费退出")
return; // 退出
}
System.out.println(Thread.currentThread().getName()+"\t消费队列数据"+result+"成功");
}
}
// 叫停方法
public void stop() throws Exception{
this.FLAG = false;
}
}
public class ProdConsumer_BlockQueueDemo {
public static void main(String[] args) throws Exception{
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 生产线程启动");
try {
myResource.MyProd();
} catch (Exception e) {
e.printStackTrace();
}
},"Prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 消费线程启动");
try {
myResource.MyConsumer();
} catch (Exception e) {
e.printStackTrace();
}
},"Consumer").start();
try { TimeUnit.SECONDS.sleep(5); }catch (Exception e) {e.printStackTrace();}
System.out.println("5秒钟时间到,大bossMain主线程叫停,活动结束");
myResource.stop();
}
}