一、简介
所谓BlockingQueue是指其中的元素数量存在界限,当队列已满时(队列元素数量达到了最大容量的临界值),对队列进行写入操作的线程将被阻塞挂起,当队列为空时(队列元素数量达到了为0的临界值),对队列进行读取的操作线程将被阻塞挂起。
public interface BlockingQueue<E> extends Queue<E> {
}
BlockingQueue以及其实现
二、ArrayBlockingQueue
ArrayBlockingQueue是一个基于数组结构实现的FIFO阻塞队列,在构造该阻塞队列时需要指定队列中最大元素的数量(容量)。当队列已满时,若再次进行数据写入操作,则线程将会进入阻塞,一直等待直到其他线程对元素进行消费。当队列为空时,对该队列的消费线程将会进入阻塞,直到有其他线程写入数据。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
}
2.1、阻塞式写方法
void put(E e):向队列的尾部插入新的数据,当队列已满时调用该方法的线程会进入阻塞,直到有其他线程对该线程执行了中断操作,或者队列中的元素被其他线程消费。
boolean offer(E e, long timeout, TimeUnit unit):向队列尾部写入新的数据,当队列已满时执行该方法的线程在指定的时间单位内将进入阻塞,直到到了指定的超时时间后,或者在此期间有其他线程对队列数据进行了消费。当然了,对由于执行该方法而进入阻塞的线程执行中断操作也可以使当前线程退出阻塞。该方法的返回值boolean为true时表示写入数据成功,为false时表示写入数据失败。
2.2、非阻塞式写方法
- boolean add(E e):向队列尾部写入新的数据,当队列已满时不会进入阻塞,但是该方法会抛出队列已满的异常。
- boolean offer(E e):向队列尾部写入新的数据,当队列已满时不会进入阻塞,并且会立即返回false。
2.3、阻塞式读方法
- E take():从队列头部获取数据,并且该数据会从队列头部移除,当队列为空时执行take方法的线程将进入阻塞,直到有其他线程写入新的数据,或者当前线程被执行了中断操作。
- E poll(long timeout, TimeUnit unit):从队列头部获取数据并且该数据会从队列头部移除,如果队列中没有任何元素时则执行该方法,当前线程会阻塞指定的时间,直到在此期间有新的数据写入,或者阻塞的当前线程被其他线程中断,当线程由于超时退出阻塞时,返回值为null。
2.4、非阻塞式读方法
- E poll():从队列头部获取数据并且该数据会从队列头部移除,当队列为空时,该方法不会使得当前线程进入阻塞,而是返回null值。
- E peek():peek的操作类似于debug操作,它直接从队列头部获取一个数据,但是并不能从队列头部移除数据,当队列为空时,该方法不会使得当前线程进入阻塞,而是返回null值。
2.5、使用
/*有界阻塞式队列*/
public class ArrayBlockingQueueTest {
static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); //最多装10个
static Random r = new Random();
public static void main(String[] args) {
for (int i=0; i<10; i++) {
try {
strs.put("a" + i); //向容器中添加10个,就满了
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try { //strs已经满了,以下方法都加不进去,但是处理方式不同
strs.put("aaa");//发现满了,就会等待,程序阻塞
strs.add("aaa"); //已经满了,再往里面装就会报异常
strs.offer("aaa");//不会报异常,但是加不进去,返回是否添加成功
strs.offer("aaa",1,TimeUnit.SECONDS); //1秒钟后加不进去,就不往里面加了
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(strs);
}
}
三、PriorityBlockingQueue
PriorityBlockingQueue优先级阻塞队列是一个“无边界”阻塞队列,该队列会根据某种规则(Comparator)对插入队列尾部的元素进行排序,因此该队列将不会遵循FIFO(first-in-first-out)的约束。虽然PriorityBlockingQueue同ArrayBlockingQueue都实现自同样的接口,拥有同样的方法,但是大多数方法的实现确实具有很大的差别,PriorityBlockingQueue也是线程安全的类,适用于高并发多线程的情况下。
3.1、PriorityBlockingQueue常用方法
1、排序且无边界的队列
只要应用程序的内存足够使用,理论上,PriorityBlockingQueue存放数据的数量是“无边界”的,在PriorityBlockingQueue内部维护了一个Object的数组,随着数据量的不断增多,该数组也会进行动态地扩容。在构造PriorityBlockingQueue时虽然提供了一个整数类型的参数,但是该参数所代表的含义与ArrayBlockingQueue完全不同,前者是构造PriorityBlockingQueue的初始容量,后者指定的整数类型参数则是ArrayBlockingQueue的最大容量。
如果没有显示地指定Comparator,那么它将只支持实现了Comparable接口的数据类型。如果在创建PriorityBlockingQueue队列的时候既没有指定Comparator,同时数据元素也不是Comparable接口的子类,那么这种情况下,会出现类型转换的运行时异常。
2、不存在阻塞写方法
四、LinkedBlockingQueue
LinkedBlockingQueue是“可选边界”基于链表实现的FIFO队列。目前学习到阻塞队列都是通过显式锁Lock进行共享数据的同步,以及与Lock关联的Condition进行线程间通知,因此该队列也适用于高并发的多线程环境中,是线程安全的类。
LinkedBlockingQueue队列的边界可选性是通过构造函数来决定的,当我们在创建LinkedBlockingQueue对象时,使用的是默认的构造函数,那么该队列的最大容量将为Integer的最大值(所谓的“无边界”),当然开发者可以通过指定队列最大容量(有边界)的方式创建队列。
五、DelayQueue
往DelayQueue里加的元素是按时间排好序的,该队列是无界的。另外元素要实现Delayed接口,而Delayed接口又继承了Comparable接口,所以该类元素需要实现compareTo()方法;并且每个元素记载着自己还有多长时间才能被拿走,还要实现getDelay()方法。<br />getDelay(TimeUnit unit)方法用于计算该元素距离过期的剩余时间,如果在消费DelayQueue时发现并没有任何一个元素到达过期时间,那么对该队列的读取操作会立即返回null值,或者使得消费线程进入阻塞
5.1、基本使用
public class DelayQueueTest {
static DelayQueue<MyTask> tasks = new DelayQueue<>();
static class MyTask implements Delayed { //实现Delayed接口
long runningTime;
String name;
MyTask(long rt,String name) {
this.runningTime = rt;
this.name = name;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else // ==
return 0;
}
@Override
public String toString() {
return name + "--" + runningTime;
}
}
public static void main(String[] args) {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask(now + 1000, "task1"); //1 s 后执行 //②
MyTask t2 = new MyTask(now + 2000, "task2"); //2 s后执行 //④
MyTask t3 = new MyTask(now + 1500, "task3"); //1.5s后执行 //③
MyTask t4 = new MyTask(now + 500, "task4"); //0.5s后执行 //①
MyTask t5 = new MyTask(now + 2500, "task5"); //2.5s后执行 //⑤
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for (int i=0; i<5; i++) {
try {
System.out.println(tasks.take()); //按放进去的顺序拿出
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
5.3、读取DelayQueue中的数据
- peek():非阻塞读方法,立即返回但并不移除DelayQueue的头部元素,当队列为空时返回null。
- poll():非阻塞读方法,当队列为空或者队列头部元素还未到达过期时间时返回值为null,否则将会从队列头部立即将元素移除并返回。
- poll(long timeout, TimeUnit unit):最大阻塞单位时间,当达到阻塞时间后,此刻为空或者队列头部元素还未达到过期时间时返回值为null,否则将会立即从队列头部将元素移除并返回。
- take():阻塞式的读取方法,该方法会一直阻塞到队列中有元素,并且队列中的头部元素已达到过期时间,然后将其从队列中移除并返回。
六、SynchronousQueue
/*一种特殊的TransferQueue,生产的任何一个东西必须直接交给消费者消费,不能搁在容器里,容器的容量为0*/
public class SynchronizeQueueTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strs = new SynchronousQueue<>();
new Thread(()->{ //消费者线程
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.put("aaaa"); //不能调用add(报错),add不进去,put阻塞,等待消费者消费,内部调用的transfer.
System.out.println(strs.size()); //0
}
}
七、LinkedBlockingDeque
LinkedBlockingDeque是一个基于链表实现的双向(Double Ended Queue,Deque)阻塞队列,双向队列支持在队尾写入数据,读取移除数据;在队头写入数据,读取移除数据。LinkedBlockingDeque实现自BlockingDeque(BlockingDeque又是BlockingQueue的子接口),并且支持可选“边界”,与LinkedBlockingQueue一样,对边界的指定在构造LinkedBlockingDeque时就已经确定了。
八、LinkedTransferQueue
8.1、LinkedTransferQueue的简介
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer
和transfer
方法。
LinkedTransferQueue采用一种预占模式。意思就是消费者线程取元素时,如果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素为null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时发现有一个元素为null的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为“匹配”方式。
LinkedTransferQueue是ConcurrentLinkedQueue、SynchronousQueue(公平模式下转交元素)、LinkedBlockingQueue(阻塞Queue的基本方法)的超集。而且LinkedTransferQueue更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。
public interface TransferQueue<E> extends BlockingQueue<E> {
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则返回false,并且不进入队列。
boolean tryTransfer(E e);
// 如果存在一个消费者已经等待接收它,则立即传送指定的元素,否则等待直到元素被消费者接收。
void transfer(E e) throws InterruptedException;
// 在上述方法的基础上设置超时时间
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 如果至少有一位消费者在等待,则返回true
boolean hasWaitingConsumer();
// 获取所有等待获取元素的消费线程数量
int getWaitingConsumerCount();
}