阻塞队列简介

Java中阻塞队列的实现是BlockingQueue 类,其大致的数据结构如下:
阻塞队列 - 图1
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
  • 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞

也就是说试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其它线程往空的队列插入新的元素。同理,试图往已经满的阻塞队列中添加新元素的线程,直到其它线程往满的队列中移除一个或多个元素,或者完全清空队列后,使队列重新变得空闲起来,并后续新增。使用阻塞队列的好处是我们不需要关系什么适合阻塞线程,什么适合唤醒线程,这些都由阻塞队列自动完成。BlockingQueue 类是一个接口,已知有7个实现类:

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列;虽然有界,但是界限非常大,相当于无界,可以当成无界
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列;生产一个,消费一个,不存储元素,不消费不生产
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

常用的实现类是:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue
image.png

阻塞队列核心方法

image.png

方式一

  1. public static void main(String[] args) {
  2. //指定阻塞队列容量为3
  3. BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);
  4. //添加元素
  5. for (int i = 0; i < 3; i++) {
  6. final int temp = i;
  7. new Thread(() -> {
  8. System.out.println(Thread.currentThread().getName() + "添加元素" + temp + "结果:" + blockingQueue.add(String.valueOf(temp)));
  9. }, "线程" + (i + 1)).start();
  10. }
  11. //删除(队首)元素
  12. for (int i = 0; i < 3; i++) {
  13. final int temp = i;
  14. new Thread(() -> {
  15. System.out.println("队首元素:"+blockingQueue.element()+","+Thread.currentThread().getName() + "删除队首元素:" + blockingQueue.remove());
  16. }, "线程" + (i + 1)).start();
  17. }
  18. }

image.png
如果添加元素的线程变为4(大于阻塞队列的容量):
image.png
同理如果删除元素的线程是4条同样也会出错

方式二

  1. public static void main(String[] args) {
  2. BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);
  3. for (int i = 0; i < 3; i++) {
  4. final int temp = i;
  5. new Thread(() -> {
  6. System.out.println(Thread.currentThread().getName() + "添加元素" + temp + "结果:" + blockingQueue.offer(String.valueOf(temp)));
  7. }, "线程" + (i + 1)).start();
  8. }
  9. for (int i = 0; i < 4; i++) {
  10. final int temp = i;
  11. new Thread(() -> {
  12. System.out.println("队首元素:"+blockingQueue.peek()+","+Thread.currentThread().getName() + "删除队首元素:" + blockingQueue.poll());
  13. }, "线程" + (i + 1)).start();
  14. }
  15. }

image.png
image.png
image.png

方式三

  1. public static void main(String[] args) {
  2. BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);
  3. for (int i = 0; i < 3; i++) {
  4. final int temp = i;
  5. new Thread(() -> {
  6. try {
  7. blockingQueue.put(String.valueOf(temp));
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }, "线程" + (i + 1)).start();
  12. }
  13. for (int i = 0; i < 4; i++) {
  14. final int temp = i;
  15. new Thread(() -> {
  16. try {
  17. System.out.println(Thread.currentThread().getName() + "删除队首元素:" + blockingQueue.take());
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }, "线程" + (i + 1)).start();
  22. }
  23. }

image.png
如果队列发生阻塞,线程将一直等待下去,无法结束程序:
image.png

方式四

  1. public static void main(String[] args) {
  2. BlockingQueue<String> blockingQueue = new ArrayBlockingQueue(3);
  3. for (int i = 0; i < 3; i++) {
  4. final int temp = i;
  5. new Thread(() -> {
  6. try {
  7. System.out.println(Thread.currentThread().getName() + "添加元素" + temp);
  8. blockingQueue.offer(String.valueOf(temp), 3, TimeUnit.SECONDS);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. }, "线程" + (i + 1)).start();
  13. }
  14. for (int i = 0; i < 4; i++) {
  15. final int temp = i;
  16. new Thread(() -> {
  17. try {
  18. System.out.println(Thread.currentThread().getName() + "删除队首元素:" + blockingQueue.poll(3, TimeUnit.SECONDS));
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }, "线程" + (i + 1)).start();
  23. }
  24. }

image.png
可以看到超时之后就不会等待了

SynchronousQueue实现类(同步队列)

SynchronousQueue和其它的实现类相比比较特殊,它只能存储一个元素,如果put了一个元素,则必须take出来,否则无法再put元素了。如:

  1. public static void main(String[] args) {
  2. SynchronousQueue<String> synchronousQueue = new SynchronousQueue();
  3. try {
  4. synchronousQueue.put("a");
  5. synchronousQueue.put("b");
  6. synchronousQueue.take();
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. }

程序无法正常停止:
image.png