本文内容
- 掌握 Queue、BlockingQueue 接口中常用的方法
- 介绍 6 种阻塞队列,及相关场景示例
- 重点掌握 4 种常用的阻塞队列
Queue 接口
队列是一种先进先出(FIFO)的数据结构,java 中用Queue接口来表示队列。
Queue接口中定义了 6 个方法:
public interface Queue<E> extends Collection<E> {
boolean add(e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}
每个Queue方法都有两种形式:
(1)如果操作失败则抛出异常,
(2)如果操作失败,则返回特殊值(null或false,具体取决于操作),接口的常规结构如下表所示。
操作类型 | 抛出异常 | 返回特殊值 |
---|---|---|
插入 | add(e) | offer(e) |
移除 | remove() | poll() |
检查 | element() | peek() |
Queue从Collection继承的add方法插入一个元素,除非它违反了队列的容量限制,在这种情况下它会抛出IllegalStateException;offer方法与add不同之处仅在于它通过返回false来表示插入元素失败。
remove和poll方法都移除并返回队列的头部,确切地移除哪个元素是由具体的实现来决定的,仅当队列为空时,remove和poll方法的行为才有所不同,在这些情况下,remove抛出NoSuchElementException,而poll返回null。
element和peek方法返回队列头部的元素,但不移除,它们之间的差异与remove和poll的方式完全相同,如果队列为空,则element抛出NoSuchElementException,而peek返回null。
队列一般不要插入空元素。
BlockingQueue 接口
BlockingQueue位于 juc 中,熟称阻塞队列, 阻塞队列首先它是一个队列,继承Queue接口,是队列就会遵循先进先出(FIFO)的原则,又因为它是阻塞的,故与普通的队列有两点区别:
- 当一个线程向队列里面添加数据时,如果队列是满的,那么将阻塞该线程,暂停添加数据
- 当一个线程从队列里面取出数据时,如果队列是空的,那么将阻塞该线程,暂停取出数据
BlockingQueue相关方法:
操作类型 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,timeuout,unit) |
移除 | remove() | poll() | take() | poll(timeout,unit) |
检查 | element() | peek() | 不支持 | 不支持 |
重点,再来解释一下,加深印象:
- 3 个可能会有异常的方法,add、remove、element;这 3 个方法不会阻塞(是说队列满或者空的情况下是否会阻塞);队列满的情况下,add 抛出异常;队列为空情况下,remove、element 抛出异常
- offer、poll、peek 也不会阻塞(是说队列满或者空的情况下是否会阻塞);队列满的情况下,offer 返回 false;队列为空的情况下,pool、peek 返回 null
- 队列满的情况下,调用 put 方法会导致当前线程阻塞
- 队列为空的情况下,调用 take 方法会导致当前线程阻塞
- offer(e,timeuout,unit),超时之前,插入成功返回 true,否则返回 false
- poll(timeout,unit),超时之前,获取到头部元素并将其移除,返回 true,否则返回 false
- 以上一些方法希望大家都记住,方便以后使用
BlockingQueue 常见的实现类
看一下相关类图
ArrayBlockingQueue
基于数组的阻塞队列实现,其内部维护一个定长的数组,用于存储队列元素。线程阻塞的实现是通过 ReentrantLock 来完成的,数据的插入与取出共用同一个锁,因此 ArrayBlockingQueue 并不能实现生产、消费同时进行。而且在创建 ArrayBlockingQueue 时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
LinkedBlockingQueue
基于单向链表的阻塞队列实现,在初始化 LinkedBlockingQueue 的时候可以指定大小,也可以不指定,默认类似一个无限大小的容量(Integer.MAX_VALUE),不指队列容量大小也是会有风险的,一旦数据生产速度大于消费速度,系统内存将有可能被消耗殆尽,因此要谨慎操作。另外 LinkedBlockingQueue 中用于阻塞生产者、消费者的锁是两个(锁分离),因此生产与消费是可以同时进行的。
PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列,进入队列的元素会按照优先级进行排序
SynchronousQueue
同步阻塞队列,SynchronousQueue 没有容量,与其他 BlockingQueue 不同,SynchronousQueue 是一个不存储元素的 BlockingQueue,每一个 put 操作必须要等待一个 take 操作,否则不能继续添加元素,反之亦然
DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素
LinkedTransferQueue
LinkedTransferQueue 是基于链表的 FIFO 无界阻塞队列,它出现在 JDK7 中,Doug Lea 大神说 LinkedTransferQueue 是一个聪明的队列,它是 ConcurrentLinkedQueue、SynchronousQueue(公平模式下)、无界的 LinkedBlockingQueues 等的超集,LinkedTransferQueue包含了ConcurrentLinkedQueue、SynchronousQueue、LinkedBlockingQueues三种队列的功能
下面我们来介绍每种阻塞队列的使用。ArrayBlockingQueue
有界阻塞队列,内部使用数组存储元素,有 2 个常用构造方法:
//capacity表示容量大小,默认内部采用非公平锁
public ArrayBlockingQueue(int capacity)
//capacity:容量大小,fair:内部是否是使用公平锁
public ArrayBlockingQueue(int capacity, boolean fair)
需求:业务系统中有很多地方需要推送通知,由于需要推送的数据太多,我们将需要推送的信息先丢到阻塞队列中,然后开一个线程进行处理真实发送,代码如下:
package com.itsoku.chat25;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import sun.text.normalizer.NormalizerBase;
import java.util.Calendar;
import java.util.concurrent.*;
/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo1 {
//推送队列
static ArrayBlockingQueue
**static** {<br /> //启动一个线程做真实推送<br /> **new** Thread(() -> {<br /> **while** (**true**) {<br /> String msg;<br /> **try** {<br /> **long** starTime = System.currentTimeMillis();<br /> //获取一条推送消息,此方法会进行阻塞,直到返回结果<br /> msg = pushQueue.take();<br /> **long** endTime = System.currentTimeMillis();<br /> //模拟推送耗时<br /> TimeUnit.MILLISECONDS.sleep(500);
System.out.println(String.format("[%s,%s,take耗时:%s],%s,发送消息:%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName(), msg));<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }<br /> }).start();<br /> }
//推送消息,需要发送推送消息的调用该方法,会将推送信息先加入推送队列<br /> **public** **static** **void** **pushMsg**(String msg) **throws** InterruptedException {<br /> pushQueue.put(msg);<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> **for** (**int** i = 1; i <= 5; i++) {<br /> String msg = "一起来学java高并发,第" + i + "天";<br /> //模拟耗时<br /> TimeUnit.SECONDS.sleep(i);<br /> Demo1.pushMsg(msg);<br /> }<br /> }<br />}
输出:
[1565595629206,1565595630207,take耗时:1001],Thread-0,发送消息:一起来学java高并发,第1天
[1565595630208,1565595632208,take耗时:2000],Thread-0,发送消息:一起来学java高并发,第2天
[1565595632208,1565595635208,take耗时:3000],Thread-0,发送消息:一起来学java高并发,第3天
[1565595635208,1565595639209,take耗时:4001],Thread-0,发送消息:一起来学java高并发,第4天
[1565595639209,1565595644209,take耗时:5000],Thread-0,发送消息:一起来学java高并发,第5天
代码中我们使用了有界队列ArrayBlockingQueue,创建ArrayBlockingQueue时候需要制定容量大小,调用pushQueue.put将推送信息放入队列中,如果队列已满,此方法会阻塞。代码中在静态块中启动了一个线程,调用pushQueue.take();从队列中获取待推送的信息进行推送处理。
注意:ArrayBlockingQueue如果队列容量设置的太小,消费者发送的太快,消费者消费的太慢的情况下,会导致队列空间满,调用 put 方法会导致发送者线程阻塞,所以注意设置合理的大小,协调好消费者的速度。
LinkedBlockingQueue
内部使用单向链表实现的阻塞队列,3 个构造方法:
//默认构造方法,容量大小为Integer.MAX_VALUE
public LinkedBlockingQueue();
//创建指定容量大小的LinkedBlockingQueue
public LinkedBlockingQueue(int capacity);
//容量为Integer.MAX_VALUE,并将传入的集合丢入队列中
public LinkedBlockingQueue(Collection<? extends E> c);
LinkedBlockingQueue的用法和ArrayBlockingQueue类似,建议使用的时候指定容量,如果不指定容量,插入的太快,移除的太慢,可能会产生 OOM。
PriorityBlockingQueue
无界的优先级阻塞队列,内部使用数组存储数据,达到容量时,会自动进行扩容,放入的元素会按照优先级进行排序,4 个构造方法:
//默认构造方法,默认初始化容量是11
public PriorityBlockingQueue();
//指定队列的初始化容量
public PriorityBlockingQueue(int initialCapacity);
//指定队列的初始化容量和放入元素的比较器
public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator);
//传入集合放入来初始化队列,传入的集合可以实现SortedSet接口或者PriorityQueue接口进行排序,如果没有实现这2个接口,按正常顺序放入队列
public PriorityBlockingQueue(Collection<? extends E> c);
优先级队列放入元素的时候,会进行排序,所以我们需要指定排序规则,有 2 种方式:
- 创建PriorityBlockingQueue指定比较器Comparator
- 放入的元素需要实现Comparable接口
上面 2 种方式必须选一个,如果 2 个都有,则走第一个规则排序。
需求:还是上面的推送业务,目前推送是按照放入的先后顺序进行发送的,比如有些公告比较紧急,优先级比较高,需要快点发送,怎么搞?此时PriorityBlockingQueue就派上用场了,代码如下:
package com.itsoku.chat25;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo2** {
//推送信息封装<br /> **static** **class** **Msg** **implements** **Comparable**<**Msg**> {<br /> //优先级,越小优先级越高<br /> **private** **int** priority;<br /> //推送的信息<br /> **private** String msg;
**public** **Msg**(**int** priority, String msg) {<br /> **this**.priority = priority;<br /> **this**.msg = msg;<br /> }
@Override<br /> **public** **int** **compareTo**(Msg o) {<br /> **return** Integer.compare(**this**.priority, o.priority);<br /> }
@Override<br /> **public** String **toString**() {<br /> **return** "Msg{" +<br /> "priority=" + priority +<br /> ", msg='" + msg + '\'' +<br /> '}';<br /> }<br /> }
//推送队列<br /> **static** PriorityBlockingQueue<Msg> pushQueue = **new** PriorityBlockingQueue<Msg>();
**static** {<br /> //启动一个线程做真实推送<br /> **new** Thread(() -> {<br /> **while** (**true**) {<br /> Msg msg;<br /> **try** {<br /> **long** starTime = System.currentTimeMillis();<br /> //获取一条推送消息,此方法会进行阻塞,直到返回结果<br /> msg = pushQueue.take();<br /> //模拟推送耗时<br /> TimeUnit.MILLISECONDS.sleep(100);<br /> **long** endTime = System.currentTimeMillis();<br /> System.out.println(String.format("[%s,%s,take耗时:%s],%s,发送消息:%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName(), msg));<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }<br /> }).start();<br /> }
//推送消息,需要发送推送消息的调用该方法,会将推送信息先加入推送队列<br /> **public** **static** **void** **pushMsg**(**int** priority, String msg) **throws** InterruptedException {<br /> pushQueue.put(**new** Msg(priority, msg));<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> **for** (**int** i = 5; i >= 1; i--) {<br /> String msg = "一起来学java高并发,第" + i + "天";<br /> Demo2.pushMsg(i, msg);<br /> }<br /> }<br />}
输出:
[1565598857028,1565598857129,take耗时:101],Thread-0,发送消息:Msg{priority=1, msg=’一起来学java高并发,第1天’}
[1565598857162,1565598857263,take耗时:101],Thread-0,发送消息:Msg{priority=2, msg=’一起来学java高并发,第2天’}
[1565598857263,1565598857363,take耗时:100],Thread-0,发送消息:Msg{priority=3, msg=’一起来学java高并发,第3天’}
[1565598857363,1565598857463,take耗时:100],Thread-0,发送消息:Msg{priority=4, msg=’一起来学java高并发,第4天’}
[1565598857463,1565598857563,take耗时:100],Thread-0,发送消息:Msg{priority=5, msg=’一起来学java高并发,第5天’}
main 中放入了 5 条推送信息,i 作为消息的优先级按倒序放入的,最终输出结果中按照优先级由小到大输出。注意 Msg 实现了Comparable接口,具有了比较功能。
SynchronousQueue
同步阻塞队列,SynchronousQueue 没有容量,与其他 BlockingQueue 不同,SynchronousQueue 是一个不存储元素的 BlockingQueue,每一个 put 操作必须要等待一个 take 操作,否则不能继续添加元素,反之亦然。
SynchronousQueue 在现实中用的不多,线程池中有用到过,Executors.newCachedThreadPool()实现中用到了这个队列,当有任务丢入线程池的时候,如果已创建的工作线程都在忙于处理任务,则会新建一个线程来处理丢入队列的任务。
来个示例代码:
package com.itsoku.chat25;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo3** {
**static** SynchronousQueue<String> queue = **new** SynchronousQueue<>();
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> **new** Thread(() -> {<br /> **try** {<br /> **long** starTime = System.currentTimeMillis();<br /> queue.put("java高并发系列,路人甲Java!");<br /> **long** endTime = System.currentTimeMillis();<br /> System.out.println(String.format("[%s,%s,take耗时:%s],%s", starTime, endTime, (endTime - starTime), Thread.currentThread().getName()));<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }).start();<br /> //休眠5秒之后,从队列中take一个元素<br /> TimeUnit.SECONDS.sleep(5);<br /> System.out.println(System.currentTimeMillis() + "调用take获取并移除元素," + queue.take());<br /> }<br />}
输出:
1565600421645调用take获取并移除元素,java高并发系列,路人甲Java!
[1565600416645,1565600421645,take耗时:5000],Thread-0
main 方法中启动了一个线程,调用queue.put方法向队列中丢入一条数据,调用的时候产生了阻塞,从输出结果中可以看出,直到 take 方法被调用时,put 方法才从阻塞状态恢复正常。
DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素。
需求:还是推送的业务,有时候我们希望早上 9 点或者其他指定的时间进行推送,如何实现呢?此时DelayQueue就派上用场了。
我们先看一下DelayQueue类的声明:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
元素 E 需要实现接口Delayed,我们看一下这个接口的代码:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
Delayed继承了Comparable接口,这个接口是用来做比较用的,DelayQueue内部使用PriorityQueue来存储数据的,PriorityQueue是一个优先级队列,丢入的数据会进行排序,排序方法调用的是Comparable接口中的方法。下面主要说一下Delayed接口中的getDelay方法:此方法在给定的时间单位内返回与此对象关联的剩余延迟时间。
对推送我们再做一下处理,让其支持定时发送(定时在将来某个时间也可以说是延迟发送),代码如下:
package com.itsoku.chat25;
import java.util.Calendar;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/
微信公众号:程序员路人,个人博客:http://www.itsoku.com
/
public class Demo4** {
//推送信息封装<br /> **static** **class** **Msg** **implements** **Delayed** {<br /> //优先级,越小优先级越高<br /> **private** **int** priority;<br /> //推送的信息<br /> **private** String msg;<br /> //定时发送时间,毫秒格式<br /> **private** **long** sendTimeMs;
**public** **Msg**(**int** priority, String msg, **long** sendTimeMs) {<br /> **this**.priority = priority;<br /> **this**.msg = msg;<br /> **this**.sendTimeMs = sendTimeMs;<br /> }
@Override<br /> **public** String **toString**() {<br /> **return** "Msg{" +<br /> "priority=" + priority +<br /> ", msg='" + msg + '\'' +<br /> ", sendTimeMs=" + sendTimeMs +<br /> '}';<br /> }
@Override<br /> **public** **long** **getDelay**(TimeUnit unit) {<br /> **return** unit.convert(**this**.sendTimeMs - Calendar.getInstance().getTimeInMillis(), TimeUnit.MILLISECONDS);<br /> }
@Override<br /> **public** **int** **compareTo**(Delayed o) {<br /> **if** (o **instanceof** Msg) {<br /> Msg c2 = (Msg) o;<br /> **return** Integer.compare(**this**.priority, c2.priority);<br /> }<br /> **return** 0;<br /> }<br /> }
//推送队列<br /> **static** DelayQueue<Msg> pushQueue = **new** DelayQueue<Msg>();
**static** {<br /> //启动一个线程做真实推送<br /> **new** Thread(() -> {<br /> **while** (**true**) {<br /> Msg msg;<br /> **try** {<br /> //获取一条推送消息,此方法会进行阻塞,直到返回结果<br /> msg = pushQueue.take();<br /> //此处可以做真实推送<br /> **long** endTime = System.currentTimeMillis();<br /> System.out.println(String.format("定时发送时间:%s,实际发送时间:%s,发送消息:%s", msg.sendTimeMs, endTime, msg));<br /> } **catch** (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> }<br /> }).start();<br /> }
//推送消息,需要发送推送消息的调用该方法,会将推送信息先加入推送队列<br /> **public** **static** **void** **pushMsg**(**int** priority, String msg, **long** sendTimeMs) **throws** InterruptedException {<br /> pushQueue.put(**new** Msg(priority, msg, sendTimeMs));<br /> }
**public** **static** **void** **main**(String[] args) **throws** InterruptedException {<br /> **for** (**int** i = 5; i >= 1; i--) {<br /> String msg = "一起来学java高并发,第" + i + "天";<br /> Demo4.pushMsg(i, msg, Calendar.getInstance().getTimeInMillis() + i * 2000);<br /> }<br /> }<br />}
输出:
定时发送时间:1565603357198,实际发送时间:1565603357198,发送消息:Msg{priority=1, msg=’一起来学java高并发,第1天’, sendTimeMs=1565603357198}
定时发送时间:1565603359198,实际发送时间:1565603359198,发送消息:Msg{priority=2, msg=’一起来学java高并发,第2天’, sendTimeMs=1565603359198}
定时发送时间:1565603361198,实际发送时间:1565603361199,发送消息:Msg{priority=3, msg=’一起来学java高并发,第3天’, sendTimeMs=1565603361198}
定时发送时间:1565603363198,实际发送时间:1565603363199,发送消息:Msg{priority=4, msg=’一起来学java高并发,第4天’, sendTimeMs=1565603363198}
定时发送时间:1565603365182,实际发送时间:1565603365183,发送消息:Msg{priority=5, msg=’一起来学java高并发,第5天’, sendTimeMs=1565603365182}
可以看出时间发送时间,和定时发送时间基本一致,代码中Msg需要实现Delayed接口,重点在于getDelay方法,这个方法返回剩余的延迟时间,代码中使用this.sendTimeMs减去当前时间的毫秒格式时间,得到剩余延迟时间。
LinkedTransferQueue
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
LinkedTransferQueue 类继承自 AbstractQueue 抽象类,并且实现了 TransferQueue 接口:
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则返回false,并且不进入队列。
boolean tryTransfer(E e);
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则等待直到元素被消费者接收。
void transfer(E e) throws InterruptedException;
// 在上述方法的基础上设置超时时间
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 如果至少有一位消费者在等待,则返回true
boolean hasWaitingConsumer();
// 获取所有等待获取元素的消费线程数量
int getWaitingConsumerCount();
}
再看一下上面的这些方法,transfer(E e)方法和SynchronousQueue的put方法类似,都需要等待消费者取走元素,否则一直等待。其他方法和ArrayBlockingQueue、LinkedBlockingQueue中的方法类似。
总结
- 重点需要了解BlockingQueue中的所有方法,以及他们的区别
- 重点掌握ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue的使用场景
- 需要处理的任务有优先级的,使用PriorityBlockingQueue
- 处理的任务需要延时处理的,使用DelayQueue