一、阻塞队列是什么?定义:阻塞队列首先是一个队列,当队列是空的时候,从队列获取元素的操作将会被阻塞,当队列是满的时候,从队列插入元素的操作将会被阻塞。
    image.png
    消息中间件底层用的就是阻塞队列。
    二、阻塞队列好处
    在多线程领域,所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。
    为什么需要使用BlockingQueue? —— 好处是我们不需要关心什么时候去阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQue都给你一手包办了。
    在Concurrent包发布以前,在多线程环境下,我们每个程序员都要去控制这些细节,尤其还要兼顾效率与线程安全,这会给我们的程序带来不小的复杂度。
    三、阻塞队列BlockingQueue接口架构图
    image.png

    阻塞队列BlockingQueue种类:
    (1)ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
    (2)LinkedBlockingQueue:由链表结果组成的有界阻塞队列(默认大小Integer.MAX_VALUE)阻塞队列。(接近无界)
    (3)SychronousQueue:不存储元素的阻塞队列,也即单个元素队列。(最后代码演示)
    (4)PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
    (5)DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
    (6)LinkedTransferQueue:由链表结构组成的无界阻塞队列。
    (7)LinkedBlockingDeque:由链表结构组成的双端阻塞队列。
    四、阻塞队列BlockingQueue核心方法
    image.png
    共同点:插入成功或者删除成功都返回true(除了put和take),区别在于处理异常情况

    Queue 中 element() 和 peek()都是用来返回队列的头元素,不删除。在队列元素为空的情况下,element() 方法会抛出NoSuchElementException异常,peek() 方法只会返回 null。
    (1)抛出异常的方法
    当阻塞队列满时,再往队列里add元素会抛出IllegalStateException:Queue full;
    当阻塞队列空时,再从队列里remove移除元素会抛NoSuchElementException。
    (2)返回特殊值(boolean值)的方法
    插入成功,返回true;插入失败,返回false;
    删除成功返回出队列元素;删除失败返回null;
    (3)阻塞的方法(添加无返回值)
    当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出。
    当阻塞队列空时,消费者线程试图take队列里的元素,队列会一直阻塞消费者线程直到队列有可用元素。
    (4)超时的方法
    当向阻塞队列offer元素时候,时间超过了设定的值,就会出现超时中断;
    当向阻塞队列poll元素时候,时间超过了设定的值,就会出现超时中断。
    五、阻塞队列BlockingQueue核心方法代码验证
    5.1、抛出异常的方法代码验证:
    add方法抛出异常

    1. package com.lxk.juc;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.BlockingQueue;
    4. /**
    5. * 当阻塞队列满时,再往队列里add元素会抛出IllegalStateException:Queue full.
    6. * 当阻塞队列空时,再往队列里remove元素会抛出NoSuchElementException.
    7. */
    8. public class BlockingQueueDemoOne {
    9. public static void main(String[] args) {
    10. BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(5);
    11. System.out.println(blockingQueue.add("吴用"));
    12. System.out.println(blockingQueue.add("宋江"));
    13. System.out.println(blockingQueue.add("李逵"));
    14. System.out.println(blockingQueue.add("卢俊义"));
    15. System.out.println(blockingQueue.add("高俅"));
    16. // 再往阻塞队列添加一个元素就会抛出I了legalStateException:Queue full.
    17. System.out.println(blockingQueue.add("武松"));
    18. }
    19. }

    程序执行结果:
    image.png
    remove方法抛出异常

    1. / * 当阻塞队列空时,再往队列里remove元素会抛出NoSuchElementException.
    2. */
    3. public class BlockingQueueDemoOne {
    4. public static void main(String[] args) {
    5. BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
    6. // 阻塞队列add方法抛出异常演示
    7. System.out.println(blockingQueue.add("吴用"));
    8. System.out.println(blockingQueue.add("宋江"));
    9. System.out.println(blockingQueue.add("李逵"));
    10. System.out.println(blockingQueue.add("卢俊义"));
    11. System.out.println(blockingQueue.add(123));
    12. // 再往阻塞队列添加一个元素就会抛出I了legalStateException:Queue full.
    13. // System.out.println(blockingQueue.add("武松"));
    14. // 阻塞队列remove方法抛出异常演示
    15. System.out.println(blockingQueue.remove());
    16. System.out.println(blockingQueue.remove());
    17. System.out.println(blockingQueue.remove());
    18. System.out.println(blockingQueue.remove());
    19. System.out.println(blockingQueue.remove());
    20. // 再往阻塞队列里remove一个元素就会抛出NoSuchElementException
    21. System.out.println(blockingQueue.remove());
    22. }
    23. }

    程序执行结果:
    image.png
    element检查方法:(底层调用了 peek()方法)

    1. package com.lxk.juc;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.BlockingQueue;
    4. /**
    5. * 当阻塞队列满时,再往队列里add元素会抛出IllegalStateException:Queue full.
    6. * 当阻塞队列空时,再往队列里remove元素会抛出NoSuchElementException.
    7. */
    8. public class BlockingQueueDemoOne {
    9. public static void main(String[] args) {
    10. BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
    11. // 阻塞队列add方法抛出异常演示
    12. System.out.println(blockingQueue.add("吴用"));
    13. System.out.println(blockingQueue.add("宋江"));
    14. System.out.println(blockingQueue.add("李逵"));
    15. System.out.println(blockingQueue.add("卢俊义"));
    16. System.out.println(blockingQueue.add("高俅"));
    17. // 检查队列队首元素是啥?或者检查队列是否为空。
    18. System.err.println(blockingQueue.element());
    19. }
    20. }

    程序执行结果:(不存在,会抛NoSuchElementException异常)
    image.png
    image.png
    5.2、返回特殊值的方法(boolean值返回)
    Demo One:阻塞队列offer方法的使用

    1. package com.lxk.juc;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.BlockingQueue;
    4. /**
    5. * 当阻塞队列满时,再往队列里offer元素会返回false.
    6. * 当阻塞队列空时,再往队列里poll元素会返回null.
    7. */
    8. public class BlockingQueueDemoTwo {
    9. public static void main(String[] args) {
    10. BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
    11. // 当阻塞队列满时,offer方法返回false演示
    12. System.out.println(blockingQueue.add("吴用"));
    13. System.out.println(blockingQueue.add("宋江"));
    14. System.out.println(blockingQueue.add("李逵"));
    15. System.out.println(blockingQueue.add("卢俊义"));
    16. System.out.println(blockingQueue.offer("高俅"));
    17. // 此时再往阻塞队列offer一个元素就会返回false
    18. System.out.println(blockingQueue.offer("武松"));
    19. }
    20. }

    程序执行结果:
    image.png
    Demo Two:阻塞队列poll方法的使用

    1. package com.lxk.juc;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.BlockingQueue;
    4. /**
    5. * 当阻塞队列满时,再往队列里offer元素会返回false.
    6. * 当阻塞队列空时,再往队列里poll元素会返回null.
    7. */
    8. public class BlockingQueueDemoTwo {
    9. public static void main(String[] args) {
    10. BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
    11. // 当阻塞队列满时,offer方法返回false演示
    12. System.out.println(blockingQueue.add("吴用"));
    13. System.out.println(blockingQueue.add("宋江"));
    14. System.out.println(blockingQueue.add("李逵"));
    15. System.out.println(blockingQueue.add("卢俊义"));
    16. System.out.println(blockingQueue.add("高俅"));
    17. // 此时再往阻塞队列offer一个元素就会返回false
    18. System.out.println(blockingQueue.offer("武松"));
    19. // 当阻塞队列为空时,poll方法返回null演示
    20. System.out.println(blockingQueue.poll());
    21. System.out.println(blockingQueue.poll());
    22. System.out.println(blockingQueue.poll());
    23. System.out.println(blockingQueue.poll());
    24. System.out.println(blockingQueue.poll());
    25. // 此时再往阻塞队列执行poll一个元素就会返回null
    26. System.out.println(blockingQueue.poll());
    27. }
    28. }

    程序执行结果:
    image.png
    Demo Three:(peek()方法常用)

    1. package com.lxk.juc;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.BlockingQueue;
    4. /**
    5. * 当阻塞队列满时,再往队列里offer元素会返回false.
    6. * 当阻塞队列空时,再往队列里poll元素会返回null.
    7. */
    8. public class BlockingQueueDemoTwo {
    9. public static void main(String[] args) {
    10. BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
    11. System.out.println(blockingQueue.peek());
    12. // 当阻塞队列满时,offer方法返回false演示
    13. System.out.println(blockingQueue.add("吴用"));
    14. System.out.println(blockingQueue.add("宋江"));
    15. System.out.println(blockingQueue.add("李逵"));
    16. System.out.println(blockingQueue.add("卢俊义"));
    17. System.out.println(blockingQueue.add("高俅"));
    18. // peek方法返回阻塞队列队首元素,如果此阻塞队列为空,则返回为null
    19. System.out.println(blockingQueue.peek());
    20. }
    21. }

    测试结果:
    image.png
    5.3、阻塞队列的方法
    Demo One:阻塞队列put方法使用

    1. package com.lxk.juc;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.BlockingQueue;
    4. /**
    5. * 当阻塞队列满时,再往队列里put元素会阻塞队列.
    6. * 当阻塞队列空时,再往队列里take元素会阻塞队列.
    7. */
    8. public class BlockingQueueDemoThree {
    9. public static void main(String[] args) throws InterruptedException {
    10. BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
    11. // 当阻塞队列满时,put方法会阻塞队列演示
    12. blockingQueue.put("吴用");
    13. blockingQueue.put("宋江");
    14. blockingQueue.put("李逵");
    15. blockingQueue.put("卢俊义");
    16. blockingQueue.put("高俅");
    17. //此时再往阻塞队列put一个元素时,会阻塞队列
    18. blockingQueue.put("武松");
    19. }
    20. }

    程序执行结果:
    image.png
    Demo Two:阻塞队列take方法使用

    1. package com.lxk.juc;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.BlockingQueue;
    4. /**
    5. * * 当阻塞队列满时,再往队列里put元素会阻塞队列. * 当阻塞队列空时,再往队列里take元素会阻塞队列.
    6. */
    7. public class BlockingQueueDemoThree {
    8. public static void main(String[] args) throws InterruptedException {
    9. BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
    10. // 当阻塞队列满时,put方法会阻塞队列演示
    11. blockingQueue.put("吴用");
    12. blockingQueue.put("宋江");
    13. blockingQueue.put("李逵");
    14. blockingQueue.put("卢俊义");
    15. blockingQueue.put("高俅");
    16. // 此时再往阻塞队列put一个元素时,会阻塞队列
    17. // blockingQueue.put("武松");
    18. // 当阻塞队列空时,take方法会阻塞队列演示
    19. System.out.println(blockingQueue.take());
    20. System.out.println(blockingQueue.take());
    21. System.out.println(blockingQueue.take());
    22. System.out.println(blockingQueue.take());
    23. System.out.println(blockingQueue.take());
    24. // 此时再往阻塞队列take一个元素时,会阻塞队列
    25. System.out.println(blockingQueue.take());
    26. }
    27. }

    程序执行结果:
    image.png
    5.4、超时中断的方法
    Demo One:阻塞队列offer超时中断演示

    1. package com.lxk.juc;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.BlockingQueue;
    4. import java.util.concurrent.TimeUnit;
    5. /**
    6. * 当阻塞队列满时,再往队列里offer元素阻塞队列会超时中断.
    7. * 当阻塞队列空时,再往队列里poll元素阻塞队列会超时中断.
    8. */
    9. public class BlockingQueueDemoFour {
    10. public static void main(String[] args) throws InterruptedException {
    11. BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
    12. // 当阻塞队列满时,offer方法执行超过3秒,阻塞队列会超时中断演示
    13. System.out.println(blockingQueue.offer("吴用", 3, TimeUnit.SECONDS));
    14. System.out.println(blockingQueue.offer("宋江", 3, TimeUnit.SECONDS));
    15. System.out.println(blockingQueue.offer("李逵", 3, TimeUnit.SECONDS));
    16. System.out.println(blockingQueue.offer("卢俊义", 3, TimeUnit.SECONDS));
    17. System.out.println(blockingQueue.offer("高俅", 3, TimeUnit.SECONDS));
    18. //此时再往阻塞队列offer一个元素时,阻塞队列3秒后会超时中断
    19. System.out.println(blockingQueue.offer("武松", 3, TimeUnit.SECONDS));
    20. }
    21. }

    程序执行结果如下:当往阻塞队列添加第六个元素的时候,队列添加不进去,3秒后会超时中断。
    image.png
    Demo Two:阻塞队列poll超时中断演示

    1. package com.lxk.juc;
    2. import java.util.concurrent.ArrayBlockingQueue;
    3. import java.util.concurrent.BlockingQueue;
    4. import java.util.concurrent.TimeUnit;
    5. /**
    6. * 当阻塞队列满时,再往队列里offer元素阻塞队列会超时中断.
    7. * 当阻塞队列空时,再往队列里poll元素阻塞队列会超时中断.
    8. */
    9. public class BlockingQueueDemoFour {
    10. public static void main(String[] args) throws InterruptedException {
    11. BlockingQueue blockingQueue = new ArrayBlockingQueue(5);
    12. // 当阻塞队列满时,offer方法执行超过3秒,阻塞队列会超时中断演示
    13. System.out.println(blockingQueue.offer("吴用", 3, TimeUnit.SECONDS));
    14. System.out.println(blockingQueue.offer("宋江", 3, TimeUnit.SECONDS));
    15. System.out.println(blockingQueue.offer("李逵", 3, TimeUnit.SECONDS));
    16. System.out.println(blockingQueue.offer("卢俊义", 3, TimeUnit.SECONDS));
    17. System.out.println(blockingQueue.offer("高俅", 3, TimeUnit.SECONDS));
    18. //此时再往阻塞队列offer一个元素时,阻塞队列3秒后会超时中断
    19. System.out.println("延时3秒。。。");
    20. System.out.println(blockingQueue.offer("武松", 3, TimeUnit.SECONDS));
    21. // 当阻塞队列空时,poll方法执行超过3秒,阻塞队列会超时中断演示
    22. System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
    23. System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
    24. System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
    25. System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
    26. System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
    27. // 此时再往阻塞队列poll一个元素时,超过3秒,阻塞队列会超时中断
    28. System.out.println("延时3秒。。。");
    29. System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS));
    30. }
    31. }

    程序执行结果:
    image.png
    六、SychronousQueue类阻塞队列代码验证
    程序执行结果如下:因为消费者线程每隔5秒钟取一个元素,所以生产者线程也是每隔5秒钟往Synchronous阻塞队列中添加一个元素。

    1. package com.lxk.juc;
    2. import java.util.concurrent.BlockingQueue;
    3. import java.util.concurrent.SynchronousQueue;
    4. import java.util.concurrent.TimeUnit;
    5. /**
    6. * SynchronousBlocking是一个不存储元素的阻塞队列,即单个元素队列
    7. */
    8. public class SynchronousQueueDemo {
    9. public static void main(String[] args) {
    10. BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
    11. // 生产者线程进行put操作
    12. new Thread(() -> {
    13. try {
    14. System.out.println(Thread.currentThread().getName() + "\t put one");
    15. blockingQueue.put("one");
    16. System.out.println(Thread.currentThread().getName() + "\t put two");
    17. blockingQueue.put("two");
    18. System.out.println(Thread.currentThread().getName() + "\t put three");
    19. blockingQueue.put("three");
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. }, "t1").start();
    24. // 消费者线程进行take操作
    25. new Thread(()->{
    26. try {
    27. TimeUnit.SECONDS.sleep(5);
    28. System.out.println(Thread.currentThread().getName() + "\t take one");
    29. blockingQueue.take();
    30. TimeUnit.SECONDS.sleep(5);
    31. System.out.println(Thread.currentThread().getName() + "\t take two");
    32. blockingQueue.take();
    33. TimeUnit.SECONDS.sleep(5);
    34. System.out.println(Thread.currentThread().getName() + "\t take three");
    35. blockingQueue.take();
    36. } catch (InterruptedException e) {
    37. e.printStackTrace();
    38. }
    39. },"t2").start();
    40. }
    41. }

    测试结果:
    image.png