阻塞队列简介
Java中阻塞队列的实现是BlockingQueue 类,其大致的数据结构如下:
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
- 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
- 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞
也就是说试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其它线程往空的队列插入新的元素。同理,试图往已经满的阻塞队列中添加新元素的线程,直到其它线程往满的队列中移除一个或多个元素,或者完全清空队列后,使队列重新变得空闲起来,并后续新增。使用阻塞队列的好处是我们不需要关系什么适合阻塞线程,什么适合唤醒线程,这些都由阻塞队列自动完成。BlockingQueue 类是一个接口,已知有7个实现类:
- ArrayBlockQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列;虽然有界,但是界限非常大,相当于无界,可以当成无界
- PriorityBlockQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列;生产一个,消费一个,不存储元素,不消费不生产
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
常用的实现类是:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue
阻塞队列核心方法
方式一
public static void main(String[] args) {//指定阻塞队列容量为3BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);//添加元素for (int i = 0; i < 3; i++) {final int temp = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "添加元素" + temp + "结果:" + blockingQueue.add(String.valueOf(temp)));}, "线程" + (i + 1)).start();}//删除(队首)元素for (int i = 0; i < 3; i++) {final int temp = i;new Thread(() -> {System.out.println("队首元素:"+blockingQueue.element()+","+Thread.currentThread().getName() + "删除队首元素:" + blockingQueue.remove());}, "线程" + (i + 1)).start();}}

如果添加元素的线程变为4(大于阻塞队列的容量):
同理如果删除元素的线程是4条同样也会出错
方式二
public static void main(String[] args) {BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);for (int i = 0; i < 3; i++) {final int temp = i;new Thread(() -> {System.out.println(Thread.currentThread().getName() + "添加元素" + temp + "结果:" + blockingQueue.offer(String.valueOf(temp)));}, "线程" + (i + 1)).start();}for (int i = 0; i < 4; i++) {final int temp = i;new Thread(() -> {System.out.println("队首元素:"+blockingQueue.peek()+","+Thread.currentThread().getName() + "删除队首元素:" + blockingQueue.poll());}, "线程" + (i + 1)).start();}}
方式三
public static void main(String[] args) {BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);for (int i = 0; i < 3; i++) {final int temp = i;new Thread(() -> {try {blockingQueue.put(String.valueOf(temp));} catch (InterruptedException e) {e.printStackTrace();}}, "线程" + (i + 1)).start();}for (int i = 0; i < 4; i++) {final int temp = i;new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "删除队首元素:" + blockingQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}, "线程" + (i + 1)).start();}}
方式四
public static void main(String[] args) {BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);for (int i = 0; i < 3; i++) {final int temp = i;new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "添加元素" + temp);blockingQueue.offer(String.valueOf(temp), 3, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}}, "线程" + (i + 1)).start();}for (int i = 0; i < 4; i++) {final int temp = i;new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "删除队首元素:" + blockingQueue.poll(3, TimeUnit.SECONDS));} catch (InterruptedException e) {e.printStackTrace();}}, "线程" + (i + 1)).start();}}
SynchronousQueue实现类(同步队列)
SynchronousQueue和其它的实现类相比比较特殊,它只能存储一个元素,如果put了一个元素,则必须take出来,否则无法再put元素了。如:
public static void main(String[] args) {SynchronousQueue<String> synchronousQueue = new SynchronousQueue();try {synchronousQueue.put("a");synchronousQueue.put("b");synchronousQueue.take();} catch (InterruptedException e) {e.printStackTrace();}}
程序无法正常停止:




