一、阻塞队列是什么?定义:阻塞队列首先是一个队列,当队列是空的时候,从队列获取元素的操作将会被阻塞,当队列是满的时候,从队列插入元素的操作将会被阻塞。
消息中间件底层用的就是阻塞队列。
二、阻塞队列好处
在多线程领域,所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。
为什么需要使用BlockingQueue? —— 好处是我们不需要关心什么时候去阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQue都给你一手包办了。
在Concurrent包发布以前,在多线程环境下,我们每个程序员都要去控制这些细节,尤其还要兼顾效率与线程安全,这会给我们的程序带来不小的复杂度。
三、阻塞队列BlockingQueue接口架构图
阻塞队列BlockingQueue种类:
(1)ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
(2)LinkedBlockingQueue:由链表结果组成的有界阻塞队列(默认大小Integer.MAX_VALUE)阻塞队列。(接近无界)
(3)SychronousQueue:不存储元素的阻塞队列,也即单个元素队列。(最后代码演示)
(4)PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
(5)DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
(6)LinkedTransferQueue:由链表结构组成的无界阻塞队列。
(7)LinkedBlockingDeque:由链表结构组成的双端阻塞队列。
四、阻塞队列BlockingQueue核心方法
共同点:插入成功或者删除成功都返回true(除了put和take),区别在于处理异常情况
Queue 中 element() 和 peek()都是用来返回队列的头元素,不删除。在队列元素为空的情况下,element() 方法会抛出NoSuchElementException异常,peek() 方法只会返回 null。
(1)抛出异常的方法
当阻塞队列满时,再往队列里add元素会抛出IllegalStateException:Queue full;
当阻塞队列空时,再从队列里remove移除元素会抛NoSuchElementException。
(2)返回特殊值(boolean值)的方法
插入成功,返回true;插入失败,返回false;
删除成功返回出队列元素;删除失败返回null;
(3)阻塞的方法(添加无返回值)
当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出。
当阻塞队列空时,消费者线程试图take队列里的元素,队列会一直阻塞消费者线程直到队列有可用元素。
(4)超时的方法
当向阻塞队列offer元素时候,时间超过了设定的值,就会出现超时中断;
当向阻塞队列poll元素时候,时间超过了设定的值,就会出现超时中断。
五、阻塞队列BlockingQueue核心方法代码验证
5.1、抛出异常的方法代码验证:
add方法抛出异常
package com.lxk.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 当阻塞队列满时,再往队列里add元素会抛出IllegalStateException:Queue full.
* 当阻塞队列空时,再往队列里remove元素会抛出NoSuchElementException.
*/
public class BlockingQueueDemoOne {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(5);
System.out.println(blockingQueue.add("吴用"));
System.out.println(blockingQueue.add("宋江"));
System.out.println(blockingQueue.add("李逵"));
System.out.println(blockingQueue.add("卢俊义"));
System.out.println(blockingQueue.add("高俅"));
// 再往阻塞队列添加一个元素就会抛出I了legalStateException:Queue full.
System.out.println(blockingQueue.add("武松"));
}
}
程序执行结果:
remove方法抛出异常
/ * 当阻塞队列空时,再往队列里remove元素会抛出NoSuchElementException.
*/
public class BlockingQueueDemoOne {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
// 阻塞队列add方法抛出异常演示
System.out.println(blockingQueue.add("吴用"));
System.out.println(blockingQueue.add("宋江"));
System.out.println(blockingQueue.add("李逵"));
System.out.println(blockingQueue.add("卢俊义"));
System.out.println(blockingQueue.add(123));
// 再往阻塞队列添加一个元素就会抛出I了legalStateException:Queue full.
// System.out.println(blockingQueue.add("武松"));
// 阻塞队列remove方法抛出异常演示
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// 再往阻塞队列里remove一个元素就会抛出NoSuchElementException
System.out.println(blockingQueue.remove());
}
}
程序执行结果:
element检查方法:(底层调用了 peek()方法)
package com.lxk.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 当阻塞队列满时,再往队列里add元素会抛出IllegalStateException:Queue full.
* 当阻塞队列空时,再往队列里remove元素会抛出NoSuchElementException.
*/
public class BlockingQueueDemoOne {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
// 阻塞队列add方法抛出异常演示
System.out.println(blockingQueue.add("吴用"));
System.out.println(blockingQueue.add("宋江"));
System.out.println(blockingQueue.add("李逵"));
System.out.println(blockingQueue.add("卢俊义"));
System.out.println(blockingQueue.add("高俅"));
// 检查队列队首元素是啥?或者检查队列是否为空。
System.err.println(blockingQueue.element());
}
}
程序执行结果:(不存在,会抛NoSuchElementException异常)
5.2、返回特殊值的方法(boolean值返回)
Demo One:阻塞队列offer方法的使用
package com.lxk.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 当阻塞队列满时,再往队列里offer元素会返回false.
* 当阻塞队列空时,再往队列里poll元素会返回null.
*/
public class BlockingQueueDemoTwo {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
// 当阻塞队列满时,offer方法返回false演示
System.out.println(blockingQueue.add("吴用"));
System.out.println(blockingQueue.add("宋江"));
System.out.println(blockingQueue.add("李逵"));
System.out.println(blockingQueue.add("卢俊义"));
System.out.println(blockingQueue.offer("高俅"));
// 此时再往阻塞队列offer一个元素就会返回false
System.out.println(blockingQueue.offer("武松"));
}
}
程序执行结果:
Demo Two:阻塞队列poll方法的使用
package com.lxk.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 当阻塞队列满时,再往队列里offer元素会返回false.
* 当阻塞队列空时,再往队列里poll元素会返回null.
*/
public class BlockingQueueDemoTwo {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
// 当阻塞队列满时,offer方法返回false演示
System.out.println(blockingQueue.add("吴用"));
System.out.println(blockingQueue.add("宋江"));
System.out.println(blockingQueue.add("李逵"));
System.out.println(blockingQueue.add("卢俊义"));
System.out.println(blockingQueue.add("高俅"));
// 此时再往阻塞队列offer一个元素就会返回false
System.out.println(blockingQueue.offer("武松"));
// 当阻塞队列为空时,poll方法返回null演示
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// 此时再往阻塞队列执行poll一个元素就会返回null
System.out.println(blockingQueue.poll());
}
}
程序执行结果:
Demo Three:(peek()方法常用)
package com.lxk.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 当阻塞队列满时,再往队列里offer元素会返回false.
* 当阻塞队列空时,再往队列里poll元素会返回null.
*/
public class BlockingQueueDemoTwo {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
System.out.println(blockingQueue.peek());
// 当阻塞队列满时,offer方法返回false演示
System.out.println(blockingQueue.add("吴用"));
System.out.println(blockingQueue.add("宋江"));
System.out.println(blockingQueue.add("李逵"));
System.out.println(blockingQueue.add("卢俊义"));
System.out.println(blockingQueue.add("高俅"));
// peek方法返回阻塞队列队首元素,如果此阻塞队列为空,则返回为null
System.out.println(blockingQueue.peek());
}
}
测试结果:
5.3、阻塞队列的方法
Demo One:阻塞队列put方法使用
package com.lxk.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 当阻塞队列满时,再往队列里put元素会阻塞队列.
* 当阻塞队列空时,再往队列里take元素会阻塞队列.
*/
public class BlockingQueueDemoThree {
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
// 当阻塞队列满时,put方法会阻塞队列演示
blockingQueue.put("吴用");
blockingQueue.put("宋江");
blockingQueue.put("李逵");
blockingQueue.put("卢俊义");
blockingQueue.put("高俅");
//此时再往阻塞队列put一个元素时,会阻塞队列
blockingQueue.put("武松");
}
}
程序执行结果:
Demo Two:阻塞队列take方法使用
package com.lxk.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* * 当阻塞队列满时,再往队列里put元素会阻塞队列. * 当阻塞队列空时,再往队列里take元素会阻塞队列.
*/
public class BlockingQueueDemoThree {
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
// 当阻塞队列满时,put方法会阻塞队列演示
blockingQueue.put("吴用");
blockingQueue.put("宋江");
blockingQueue.put("李逵");
blockingQueue.put("卢俊义");
blockingQueue.put("高俅");
// 此时再往阻塞队列put一个元素时,会阻塞队列
// blockingQueue.put("武松");
// 当阻塞队列空时,take方法会阻塞队列演示
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// 此时再往阻塞队列take一个元素时,会阻塞队列
System.out.println(blockingQueue.take());
}
}
程序执行结果:
5.4、超时中断的方法
Demo One:阻塞队列offer超时中断演示
package com.lxk.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 当阻塞队列满时,再往队列里offer元素阻塞队列会超时中断.
* 当阻塞队列空时,再往队列里poll元素阻塞队列会超时中断.
*/
public class BlockingQueueDemoFour {
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
// 当阻塞队列满时,offer方法执行超过3秒,阻塞队列会超时中断演示
System.out.println(blockingQueue.offer("吴用", 3, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("宋江", 3, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("李逵", 3, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("卢俊义", 3, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("高俅", 3, TimeUnit.SECONDS));
//此时再往阻塞队列offer一个元素时,阻塞队列3秒后会超时中断
System.out.println(blockingQueue.offer("武松", 3, TimeUnit.SECONDS));
}
}
程序执行结果如下:当往阻塞队列添加第六个元素的时候,队列添加不进去,3秒后会超时中断。
Demo Two:阻塞队列poll超时中断演示
package com.lxk.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 当阻塞队列满时,再往队列里offer元素阻塞队列会超时中断.
* 当阻塞队列空时,再往队列里poll元素阻塞队列会超时中断.
*/
public class BlockingQueueDemoFour {
public static void main(String[] args) throws InterruptedException {
BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
// 当阻塞队列满时,offer方法执行超过3秒,阻塞队列会超时中断演示
System.out.println(blockingQueue.offer("吴用", 3, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("宋江", 3, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("李逵", 3, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("卢俊义", 3, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("高俅", 3, TimeUnit.SECONDS));
//此时再往阻塞队列offer一个元素时,阻塞队列3秒后会超时中断
System.out.println("延时3秒。。。");
System.out.println(blockingQueue.offer("武松", 3, TimeUnit.SECONDS));
// 当阻塞队列空时,poll方法执行超过3秒,阻塞队列会超时中断演示
System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
// 此时再往阻塞队列poll一个元素时,超过3秒,阻塞队列会超时中断
System.out.println("延时3秒。。。");
System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
}
}
程序执行结果:
六、SychronousQueue类阻塞队列代码验证
程序执行结果如下:因为消费者线程每隔5秒钟取一个元素,所以生产者线程也是每隔5秒钟往Synchronous阻塞队列中添加一个元素。
package com.lxk.juc;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* SynchronousBlocking是一个不存储元素的阻塞队列,即单个元素队列
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
// 生产者线程进行put操作
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put one");
blockingQueue.put("one");
System.out.println(Thread.currentThread().getName() + "\t put two");
blockingQueue.put("two");
System.out.println(Thread.currentThread().getName() + "\t put three");
blockingQueue.put("three");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
// 消费者线程进行take操作
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t take one");
blockingQueue.take();
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t take two");
blockingQueue.take();
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t take three");
blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
}
}
测试结果: