一 介绍

  1. 阻塞队列(BlockingQueue):顾名思义,首先它是一个队列。
  2. 当阻塞队列是空时,从队列中获取元素的操作线程将会被阻塞。
  3. 当阻塞队列是满时,往队列中添加元素的操作线程将会被阻塞。
  4. 阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

    1.1 好处

  5. 在多线程领域:所谓阻塞,在某些情况下会挂起线程(即线程阻塞),一旦条件满足,被挂起的线程又会被自动唤醒。

  6. 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为BlockingQueue都一手给你包办好了。
  7. 在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

    1.2 核心用法

    在阻塞队列不可用的时候,它对于操作线程提供了4种处理方式。
    注意:阻塞队列如果是无界队列,那么插入元素的时是一直成功的,不会发生阻塞操作线程的情况。
方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek() 不可用 不可用
抛出异常 当阻塞队列满的时候,再往队列add插入元素会抛出 IllegalStateException:Queue full。当阻塞队列空的时候,再往队列remove移除元素时候会抛出异常NoSuchElementException
特殊值 插入方法,成功返回true,失败返回false。移除方法,成功返回元素,队列为空返回null
一直阻塞 当队列满的时候,生产者继续往队列里面put插入元素,队列会一直阻塞直到put插入成功或者响应中断退出。当队列为空的时候,消费者会从队列take移除元素,队列会一直阻塞消费者,直到队列有元素可以被移除。
超时退出 当阻塞队列满的时候,生产者会阻塞消费者一定时间。超时后生产者线程自动退出。

二 JDK中的阻塞队列

  1. // JDK8源码:BlockingQueue
  2. package java.util.concurrent;
  3. // 直接父接口是Queue,Queue的直接父接口是Collection
  4. public interface BlockingQueue<E> extends Queue<E> {

2.1 ArrayBlockingQueue

数组结构组成的有界阻塞队列。三个构造方法如下

  1. public ArrayBlockingQueue(int capacity) {
  2. this(capacity, false);
  3. }
  4. /**
  5. * fair:指的是公平性,当有很多线程都访问阻塞队列已经阻塞,此时TRUE表示先阻塞的线程在阻塞队列
  6. * 有空余的位置时优先访问阻塞队列,这时候会适当的降低程序的吞吐量。
  7. */
  8. public ArrayBlockingQueue(int capacity, boolean fair) {
  9. if (capacity <= 0)
  10. throw new IllegalArgumentException();
  11. this.items = new Object[capacity];
  12. lock = new ReentrantLock(fair);
  13. notEmpty = lock.newCondition();
  14. notFull = lock.newCondition();
  15. }
  16. public ArrayBlockingQueue(int capacity, boolean fair,
  17. Collection<? extends E> c) {
  18. this(capacity, fair);
  19. final ReentrantLock lock = this.lock;
  20. lock.lock(); // Lock only for visibility, not mutual exclusion
  21. try {
  22. int i = 0;
  23. try {
  24. for (E e : c) {
  25. checkNotNull(e);
  26. items[i++] = e;
  27. }
  28. } catch (ArrayIndexOutOfBoundsException ex) {
  29. throw new IllegalArgumentException();
  30. }
  31. count = i;
  32. putIndex = (i == capacity) ? 0 : i;
  33. } finally {
  34. lock.unlock();
  35. }
  36. }

2.2 LinkedBlockingQueue

链表结构组成的有界阻塞队列,但是队列默认最大长度为:Integer.MAX_VALUE
此队列按照先进先出的原则对元素进行排序。

2.3 PriorityBlockingQueue

支持元素优先级顺序排列的无界阻塞队列。
默认情况下,元素按照自然排序的规则升序排列,但是也可以自定义类实现 compareTo 方法来指定元素的排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来进行排序。需要注意的是不能保证同优先级元素的顺序。

2.4 DelayQueue

由PriorityBlockingQueue实现的延时无界阻塞队列。
队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

2.5 SynchronousQueue

不存储元素的阻塞队列,也即是单个元素的队列,每一个put操作必须等待一个take操作,否则不能继续添加元素。
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

2.6 LinkedTransferQueue

由链表结构组成的无界阻塞队列,相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

  • transfer方法

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者了才返回。

  • tryTransfer方法

tryTransfer方法时用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回fasle。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。

2.7 LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。
所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双向队列的最后一个元素。

三 生产者消费者

阻塞队列实现生产者消费者

  1. class MyResource
  2. {
  3. private volatile boolean FLAG = true; // 默认开启,进行生产+消费
  4. private AtomicInteger atomicInteger = new AtomicInteger();
  5. BlockingQueue<String> blockingQueue = null;
  6. public MyResource(BlockingQueue<String> blockingQueue) {
  7. this.blockingQueue = blockingQueue;
  8. System.out.println(blockingQueue.getClass().getName());
  9. }
  10. // 生产者
  11. public void MyProd() throws Exception{
  12. String data = null;
  13. boolean retValue ; // 默认是false
  14. while (FLAG)
  15. {
  16. // 往阻塞队列填充数据
  17. data = atomicInteger.incrementAndGet()+""; // 等于++i的意思
  18. retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS); // 插入成功则返回true
  19. if (retValue){ // 如果是true,那么代表当前这个线程插入数据成功
  20. System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
  21. }else { // 那么就是插入失败
  22. System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
  23. }
  24. TimeUnit.SECONDS.sleep(1);
  25. }
  26. // 如果FLAG是false了,马上打印
  27. System.out.println(Thread.currentThread().getName()+"\t大老板叫停了,表示FLAG=false,生产结束");
  28. }
  29. // 消费者
  30. public void MyConsumer() throws Exception
  31. {
  32. String result = null;
  33. while (FLAG) { // 开始消费
  34. // 两秒钟等不到生产者生产出来的数据就不取了
  35. result = blockingQueue.poll(2L,TimeUnit.SECONDS); // 阻塞队列移除元素失败则返回null
  36. if (null == result || result.equalsIgnoreCase("")){ // 如果取不到数据了
  37. FLAG = false;
  38. System.out.println(Thread.currentThread().getName()+"\t 超过两秒钟没有取到数据,消费退出")
  39. return; // 退出
  40. }
  41. System.out.println(Thread.currentThread().getName()+"\t消费队列数据"+result+"成功");
  42. }
  43. }
  44. // 叫停方法
  45. public void stop() throws Exception{
  46. this.FLAG = false;
  47. }
  48. }
  49. public class ProdConsumer_BlockQueueDemo {
  50. public static void main(String[] args) throws Exception{
  51. MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
  52. new Thread(() -> {
  53. System.out.println(Thread.currentThread().getName()+"\t 生产线程启动");
  54. try {
  55. myResource.MyProd();
  56. } catch (Exception e) {
  57. e.printStackTrace();
  58. }
  59. },"Prod").start();
  60. new Thread(() -> {
  61. System.out.println(Thread.currentThread().getName()+"\t 消费线程启动");
  62. try {
  63. myResource.MyConsumer();
  64. } catch (Exception e) {
  65. e.printStackTrace();
  66. }
  67. },"Consumer").start();
  68. try { TimeUnit.SECONDS.sleep(5); }catch (Exception e) {e.printStackTrace();}
  69. System.out.println("5秒钟时间到,大bossMain主线程叫停,活动结束");
  70. myResource.stop();
  71. }
  72. }