本文内容

  1. 掌握 Queue、BlockingQueue 接口中常用的方法
  2. 介绍 6 种阻塞队列,及相关场景示例
  3. 重点掌握 4 种常用的阻塞队列

    Queue 接口

    队列是一种先进先出(FIFO)的数据结构,java 中用Queue接口来表示队列。
    Queue接口中定义了 6 个方法:
    public interface Queue<E> extends Collection<E> {
    boolean add(e);
    boolean offer(E e);
    E remove();
    E poll();
    E element();
    E peek();
    }

每个Queue方法都有两种形式:
(1)如果操作失败则抛出异常,
(2)如果操作失败,则返回特殊值(null或false,具体取决于操作),接口的常规结构如下表所示。

操作类型 抛出异常 返回特殊值
插入 add(e) offer(e)
移除 remove() poll()
检查 element() peek()

Queue从Collection继承的add方法插入一个元素,除非它违反了队列的容量限制,在这种情况下它会抛出IllegalStateException;offer方法与add不同之处仅在于它通过返回false来表示插入元素失败。
remove和poll方法都移除并返回队列的头部,确切地移除哪个元素是由具体的实现来决定的,仅当队列为空时,remove和poll方法的行为才有所不同,在这些情况下,remove抛出NoSuchElementException,而poll返回null。
element和peek方法返回队列头部的元素,但不移除,它们之间的差异与remove和poll的方式完全相同,如果队列为空,则element抛出NoSuchElementException,而peek返回null。
队列一般不要插入空元素。

BlockingQueue 接口

BlockingQueue位于 juc 中,熟称阻塞队列, 阻塞队列首先它是一个队列,继承Queue接口,是队列就会遵循先进先出(FIFO)的原则,又因为它是阻塞的,故与普通的队列有两点区别:

  1. 当一个线程向队列里面添加数据时,如果队列是满的,那么将阻塞该线程,暂停添加数据
  2. 当一个线程从队列里面取出数据时,如果队列是空的,那么将阻塞该线程,暂停取出数据

BlockingQueue相关方法:

操作类型 抛出异常 返回特殊值 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e,timeuout,unit)
移除 remove() poll() take() poll(timeout,unit)
检查 element() peek() 不支持 不支持

重点,再来解释一下,加深印象:

  1. 3 个可能会有异常的方法,add、remove、element;这 3 个方法不会阻塞(是说队列满或者空的情况下是否会阻塞);队列满的情况下,add 抛出异常;队列为空情况下,remove、element 抛出异常
  2. offer、poll、peek 也不会阻塞(是说队列满或者空的情况下是否会阻塞);队列满的情况下,offer 返回 false;队列为空的情况下,pool、peek 返回 null
  3. 队列满的情况下,调用 put 方法会导致当前线程阻塞
  4. 队列为空的情况下,调用 take 方法会导致当前线程阻塞
  5. offer(e,timeuout,unit),超时之前,插入成功返回 true,否则返回 false
  6. poll(timeout,unit),超时之前,获取到头部元素并将其移除,返回 true,否则返回 false
  7. 以上一些方法希望大家都记住,方便以后使用

    BlockingQueue 常见的实现类

    看一下相关类图
    image.png
    ArrayBlockingQueue
    基于数组的阻塞队列实现,其内部维护一个定长的数组,用于存储队列元素。线程阻塞的实现是通过 ReentrantLock 来完成的,数据的插入与取出共用同一个锁,因此 ArrayBlockingQueue 并不能实现生产、消费同时进行。而且在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
    LinkedBlockingQueue
    基于单向链表的阻塞队列实现,在初始化 LinkedBlockingQueue 的时候可以指定大小,也可以不指定,默认类似一个无限大小的容量(Integer.MAX_VALUE),不指队列容量大小也是会有风险的,一旦数据生产速度大于消费速度,系统内存将有可能被消耗殆尽,因此要谨慎操作。另外 LinkedBlockingQueue 中用于阻塞生产者、消费者的锁是两个(锁分离),因此生产与消费是可以同时进行的。
    PriorityBlockingQueue
    一个支持优先级排序的无界阻塞队列,进入队列的元素会按照优先级进行排序
    SynchronousQueue
    同步阻塞队列,SynchronousQueue 没有容量,与其他 BlockingQueue 不同,SynchronousQueue 是一个不存储元素的 BlockingQueue,每一个 put 操作必须要等待一个 take 操作,否则不能继续添加元素,反之亦然
    DelayQueue
    DelayQueue 是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素
    LinkedTransferQueue
    LinkedTransferQueue 是基于链表的 FIFO 无界阻塞队列,它出现在 JDK7 中,Doug Lea 大神说 LinkedTransferQueue 是一个聪明的队列,它是 ConcurrentLinkedQueue、SynchronousQueue(公平模式下)、无界的 LinkedBlockingQueues 等的超集,LinkedTransferQueue包含了ConcurrentLinkedQueue、SynchronousQueue、LinkedBlockingQueues三种队列的功能
    下面我们来介绍每种阻塞队列的使用。

    ArrayBlockingQueue

    有界阻塞队列,内部使用数组存储元素,有 2 个常用构造方法:
    //capacity表示容量大小,默认内部采用非公平锁
    public ArrayBlockingQueue(int capacity)
    //capacity:容量大小,fair:内部是否是使用公平锁
    public ArrayBlockingQueue(int capacity, boolean fair)

需求:业务系统中有很多地方需要推送通知,由于需要推送的数据太多,我们将需要推送的信息先丢到阻塞队列中,然后开一个线程进行处理真实发送,代码如下:
package com.itsoku.chat25;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import sun.text.normalizer.NormalizerBase;

import java.util.Calendar;
import java.util.concurrent.*;

/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo1 {
//推送队列
static ArrayBlockingQueue pushQueue = new** ArrayBlockingQueue(10000);

  1. **static** {<br /> //启动一个线程做真实推送<br /> **new** Thread(() -> {<br /> **while** (**true**) {<br /> String msg;<br /> **try** {<br /> **long** starTime = System.currentTimeMillis();<br /> //获取一条推送消息,此方法会进行阻塞,直到返回结果<br /> msg = pushQueue.take();<br /> **long** endTime = System.currentTimeMillis();<br /> //模拟推送耗时<br /> TimeUnit.MILLISECONDS.sleep(500);
  2. System.out.println(String.format("[%s,%s,take耗时:%s],%s,发送消息:%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName(), msg));<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }<br /> }).start();<br /> }
  3. //推送消息,需要发送推送消息的调用该方法,会将推送信息先加入推送队列<br /> **public** **static** **void** **pushMsg**(String msg) **throws** InterruptedException {<br /> pushQueue.put(msg);<br /> }
  4. **public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> **for** (**int** i = 1; i <= 5; i++) {<br /> String msg = "一起来学java高并发,第" + i + "天";<br /> //模拟耗时<br /> TimeUnit.SECONDS.sleep(i);<br /> Demo1.pushMsg(msg);<br /> }<br /> }<br />}

输出:
[1565595629206,1565595630207,take耗时:1001],Thread-0,发送消息:一起来学java高并发,第1天
[1565595630208,1565595632208,take耗时:2000],Thread-0,发送消息:一起来学java高并发,第2天
[1565595632208,1565595635208,take耗时:3000],Thread-0,发送消息:一起来学java高并发,第3天
[1565595635208,1565595639209,take耗时:4001],Thread-0,发送消息:一起来学java高并发,第4天
[1565595639209,1565595644209,take耗时:5000],Thread-0,发送消息:一起来学java高并发,第5天

代码中我们使用了有界队列ArrayBlockingQueue,创建ArrayBlockingQueue时候需要制定容量大小,调用pushQueue.put将推送信息放入队列中,如果队列已满,此方法会阻塞。代码中在静态块中启动了一个线程,调用pushQueue.take();从队列中获取待推送的信息进行推送处理。
注意:ArrayBlockingQueue如果队列容量设置的太小,消费者发送的太快,消费者消费的太慢的情况下,会导致队列空间满,调用 put 方法会导致发送者线程阻塞,所以注意设置合理的大小,协调好消费者的速度。

LinkedBlockingQueue

内部使用单向链表实现的阻塞队列,3 个构造方法:
//默认构造方法,容量大小为Integer.MAX_VALUE
public LinkedBlockingQueue();
//创建指定容量大小的LinkedBlockingQueue
public LinkedBlockingQueue(int capacity);
//容量为Integer.MAX_VALUE,并将传入的集合丢入队列中
public LinkedBlockingQueue(Collection<? extends E> c);

LinkedBlockingQueue的用法和ArrayBlockingQueue类似,建议使用的时候指定容量,如果不指定容量,插入的太快,移除的太慢,可能会产生 OOM。

PriorityBlockingQueue

无界的优先级阻塞队列,内部使用数组存储数据,达到容量时,会自动进行扩容,放入的元素会按照优先级进行排序,4 个构造方法:
//默认构造方法,默认初始化容量是11
public PriorityBlockingQueue();
//指定队列的初始化容量
public PriorityBlockingQueue(int initialCapacity);
//指定队列的初始化容量和放入元素的比较器
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator);
//传入集合放入来初始化队列,传入的集合可以实现SortedSet接口或者PriorityQueue接口进行排序,如果没有实现这2个接口,按正常顺序放入队列
public PriorityBlockingQueue(Collection<? extends E> c);

优先级队列放入元素的时候,会进行排序,所以我们需要指定排序规则,有 2 种方式:

  1. 创建PriorityBlockingQueue指定比较器Comparator
  2. 放入的元素需要实现Comparable接口

上面 2 种方式必须选一个,如果 2 个都有,则走第一个规则排序。
需求:还是上面的推送业务,目前推送是按照放入的先后顺序进行发送的,比如有些公告比较紧急,优先级比较高,需要快点发送,怎么搞?此时PriorityBlockingQueue就派上用场了,代码如下:
package com.itsoku.chat25;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo2** {

  1. //推送信息封装<br /> **static** **class** **Msg** **implements** **Comparable**<**Msg**> {<br /> //优先级,越小优先级越高<br /> **private** **int** priority;<br /> //推送的信息<br /> **private** String msg;
  2. **public** **Msg**(**int** priority, String msg) {<br /> **this**.priority = priority;<br /> **this**.msg = msg;<br /> }
  3. @Override<br /> **public** **int** **compareTo**(Msg o) {<br /> **return** Integer.compare(**this**.priority, o.priority);<br /> }
  4. @Override<br /> **public** String **toString**() {<br /> **return** "Msg{" +<br /> "priority=" + priority +<br /> ", msg='" + msg + '\'' +<br /> '}';<br /> }<br /> }
  5. //推送队列<br /> **static** PriorityBlockingQueue<Msg> pushQueue = **new** PriorityBlockingQueue<Msg>();
  6. **static** {<br /> //启动一个线程做真实推送<br /> **new** Thread(() -> {<br /> **while** (**true**) {<br /> Msg msg;<br /> **try** {<br /> **long** starTime = System.currentTimeMillis();<br /> //获取一条推送消息,此方法会进行阻塞,直到返回结果<br /> msg = pushQueue.take();<br /> //模拟推送耗时<br /> TimeUnit.MILLISECONDS.sleep(100);<br /> **long** endTime = System.currentTimeMillis();<br /> System.out.println(String.format("[%s,%s,take耗时:%s],%s,发送消息:%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName(), msg));<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }<br /> }).start();<br /> }
  7. //推送消息,需要发送推送消息的调用该方法,会将推送信息先加入推送队列<br /> **public** **static** **void** **pushMsg**(**int** priority, String msg) **throws** InterruptedException {<br /> pushQueue.put(**new** Msg(priority, msg));<br /> }
  8. **public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> **for** (**int** i = 5; i >= 1; i--) {<br /> String msg = "一起来学java高并发,第" + i + "天";<br /> Demo2.pushMsg(i, msg);<br /> }<br /> }<br />}

输出:
[1565598857028,1565598857129,take耗时:101],Thread-0,发送消息:Msg{priority=1, msg=’一起来学java高并发,第1天’}
[1565598857162,1565598857263,take耗时:101],Thread-0,发送消息:Msg{priority=2, msg=’一起来学java高并发,第2天’}
[1565598857263,1565598857363,take耗时:100],Thread-0,发送消息:Msg{priority=3, msg=’一起来学java高并发,第3天’}
[1565598857363,1565598857463,take耗时:100],Thread-0,发送消息:Msg{priority=4, msg=’一起来学java高并发,第4天’}
[1565598857463,1565598857563,take耗时:100],Thread-0,发送消息:Msg{priority=5, msg=’一起来学java高并发,第5天’}

main 中放入了 5 条推送信息,i 作为消息的优先级按倒序放入的,最终输出结果中按照优先级由小到大输出。注意 Msg 实现了Comparable接口,具有了比较功能。

SynchronousQueue

同步阻塞队列,SynchronousQueue 没有容量,与其他 BlockingQueue 不同,SynchronousQueue 是一个不存储元素的 BlockingQueue,每一个 put 操作必须要等待一个 take 操作,否则不能继续添加元素,反之亦然。
SynchronousQueue 在现实中用的不多,线程池中有用到过,Executors.newCachedThreadPool()实现中用到了这个队列,当有任务丢入线程池的时候,如果已创建的工作线程都在忙于处理任务,则会新建一个线程来处理丢入队列的任务。
来个示例代码:
package com.itsoku.chat25;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo3** {

  1. **static** SynchronousQueue<String> queue = **new** SynchronousQueue<>();
  2. **public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> **new** Thread(() -> {<br /> **try** {<br /> **long** starTime = System.currentTimeMillis();<br /> queue.put("java高并发系列,路人甲Java!");<br /> **long** endTime = System.currentTimeMillis();<br /> System.out.println(String.format("[%s,%s,take耗时:%s],%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName()));<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }).start();<br /> //休眠5秒之后,从队列中take一个元素<br /> TimeUnit.SECONDS.sleep(5);<br /> System.out.println(System.currentTimeMillis() + "调用take获取并移除元素," + queue.take());<br /> }<br />}

输出:
1565600421645调用take获取并移除元素,java高并发系列,路人甲Java!
[1565600416645,1565600421645,take耗时:5000],Thread-0

main 方法中启动了一个线程,调用queue.put方法向队列中丢入一条数据,调用的时候产生了阻塞,从输出结果中可以看出,直到 take 方法被调用时,put 方法才从阻塞状态恢复正常。

DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素。
需求:还是推送的业务,有时候我们希望早上 9 点或者其他指定的时间进行推送,如何实现呢?此时DelayQueue就派上用场了。
我们先看一下DelayQueue类的声明:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>

元素 E 需要实现接口Delayed,我们看一下这个接口的代码:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

Delayed继承了Comparable接口,这个接口是用来做比较用的,DelayQueue内部使用PriorityQueue来存储数据的,PriorityQueue是一个优先级队列,丢入的数据会进行排序,排序方法调用的是Comparable接口中的方法。下面主要说一下Delayed接口中的getDelay方法:此方法在给定的时间单位内返回与此对象关联的剩余延迟时间。
对推送我们再做一下处理,让其支持定时发送(定时在将来某个时间也可以说是延迟发送),代码如下:
package com.itsoku.chat25;

import java.util.Calendar;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo4** {

  1. //推送信息封装<br /> **static** **class** **Msg** **implements** **Delayed** {<br /> //优先级,越小优先级越高<br /> **private** **int** priority;<br /> //推送的信息<br /> **private** String msg;<br /> //定时发送时间,毫秒格式<br /> **private** **long** sendTimeMs;
  2. **public** **Msg**(**int** priority, String msg, **long** sendTimeMs) {<br /> **this**.priority = priority;<br /> **this**.msg = msg;<br /> **this**.sendTimeMs = sendTimeMs;<br /> }
  3. @Override<br /> **public** String **toString**() {<br /> **return** "Msg{" +<br /> "priority=" + priority +<br /> ", msg='" + msg + '\'' +<br /> ", sendTimeMs=" + sendTimeMs +<br /> '}';<br /> }
  4. @Override<br /> **public** **long** **getDelay**(TimeUnit unit) {<br /> **return** unit.convert(**this**.sendTimeMs - Calendar.getInstance().getTimeInMillis(), TimeUnit.MILLISECONDS);<br /> }
  5. @Override<br /> **public** **int** **compareTo**(Delayed o) {<br /> **if** (o **instanceof** Msg) {<br /> Msg c2 = (Msg) o;<br /> **return** Integer.compare(**this**.priority, c2.priority);<br /> }<br /> **return** 0;<br /> }<br /> }
  6. //推送队列<br /> **static** DelayQueue<Msg> pushQueue = **new** DelayQueue<Msg>();
  7. **static** {<br /> //启动一个线程做真实推送<br /> **new** Thread(() -> {<br /> **while** (**true**) {<br /> Msg msg;<br /> **try** {<br /> //获取一条推送消息,此方法会进行阻塞,直到返回结果<br /> msg = pushQueue.take();<br /> //此处可以做真实推送<br /> **long** endTime = System.currentTimeMillis();<br /> System.out.println(String.format("定时发送时间:%s,实际发送时间:%s,发送消息:%s", msg.sendTimeMs, endTime, msg));<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }<br /> }).start();<br /> }
  8. //推送消息,需要发送推送消息的调用该方法,会将推送信息先加入推送队列<br /> **public** **static** **void** **pushMsg**(**int** priority, String msg, **long** sendTimeMs) **throws** InterruptedException {<br /> pushQueue.put(**new** Msg(priority, msg, sendTimeMs));<br /> }
  9. **public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> **for** (**int** i = 5; i >= 1; i--) {<br /> String msg = "一起来学java高并发,第" + i + "天";<br /> Demo4.pushMsg(i, msg, Calendar.getInstance().getTimeInMillis() + i * 2000);<br /> }<br /> }<br />}

输出:
定时发送时间:1565603357198,实际发送时间:1565603357198,发送消息:Msg{priority=1, msg=’一起来学java高并发,第1天’, sendTimeMs=1565603357198}
定时发送时间:1565603359198,实际发送时间:1565603359198,发送消息:Msg{priority=2, msg=’一起来学java高并发,第2天’, sendTimeMs=1565603359198}
定时发送时间:1565603361198,实际发送时间:1565603361199,发送消息:Msg{priority=3, msg=’一起来学java高并发,第3天’, sendTimeMs=1565603361198}
定时发送时间:1565603363198,实际发送时间:1565603363199,发送消息:Msg{priority=4, msg=’一起来学java高并发,第4天’, sendTimeMs=1565603363198}
定时发送时间:1565603365182,实际发送时间:1565603365183,发送消息:Msg{priority=5, msg=’一起来学java高并发,第5天’, sendTimeMs=1565603365182}

可以看出时间发送时间,和定时发送时间基本一致,代码中Msg需要实现Delayed接口,重点在于getDelay方法,这个方法返回剩余的延迟时间,代码中使用this.sendTimeMs减去当前时间的毫秒格式时间,得到剩余延迟时间。

LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
LinkedTransferQueue 类继承自 AbstractQueue 抽象类,并且实现了 TransferQueue 接口:
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则返回false,并且不进入队列。
boolean tryTransfer(E e);
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则等待直到元素被消费者接收。
void transfer(E e) throws InterruptedException;
// 在上述方法的基础上设置超时时间
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 如果至少有一位消费者在等待,则返回true
boolean hasWaitingConsumer();
// 获取所有等待获取元素的消费线程数量
int getWaitingConsumerCount();
}

再看一下上面的这些方法,transfer(E e)方法和SynchronousQueue的put方法类似,都需要等待消费者取走元素,否则一直等待。其他方法和ArrayBlockingQueue、LinkedBlockingQueue中的方法类似。

总结

  1. 重点需要了解BlockingQueue中的所有方法,以及他们的区别
  2. 重点掌握ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue的使用场景
  3. 需要处理的任务有优先级的,使用PriorityBlockingQueue
  4. 处理的任务需要延时处理的,使用DelayQueue