一、阻塞队列

1、Queue接口

Queue接口定义了6个操作队列的方法:
boolean add(E e):添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常
boolean offer(E e): 添加一个元素,添加成功返回true, 如果队列满了,返回false
E remove(): 返回并删除队首元素,队列为空则抛出异常
E poll(): 返回并删除队首元素,队列为空则返回null
E element(): 返回队首元素,但不移除,队列为空则抛出异常
E peek(): 获取队首元素,但不移除,队列为空则返回null

2、BlockingQueue接口

BlockingQueue接口继承自Queue接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。新定义了两个方法。
支持阻塞的插入方法put:队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法take:队列空时,获取元素的线程会等待队列变为非空。
BlockingQueue和JDK集合包中的Queue接口兼容,同时在其基础上增加了阻塞功能。
入队:
1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不阻塞)
2)offer(E e, long timeout, TimeUnit unit):可以设置阻塞时间,如果队列已满,则进行阻塞。超过阻塞时间,则返回false
3)put(E e):队列没满的时候是正常的插入,如果队列已满,则阻塞,直至队列空出位置
出队:
1)poll():如果有数据,出队,如果没有数据,返回null (不阻塞)
2)poll(long timeout, TimeUnit unit):可以设置阻塞时间,如果没有数据,则阻塞,超过阻塞时间,则返回null
3)take():队列里有数据会正常取出数据并删除;但是如果队列里无数据,则阻塞,直到队列里有数据

  1. boolean add(E e);
  2. boolean offer(E e);
  3. void put(E e) throws InterruptedException;
  4. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
  5. E take() throws InterruptedException;
  6. E poll(long timeout, TimeUnit unit) throws InterruptedException;
  7. boolean remove(Object o);

从方法中可以看出,put()、offer()、take()、poll()方法都会抛出中断异常,说明这些方法存在阻塞。
在AbstractQueue类中,对add(),remove(),element()方法做了封装,AbstractQueue实现了Queue接口:

  1. public boolean add(E e) {
  2. if (offer(e))
  3. return true;
  4. else
  5. throw new IllegalStateException("Queue full");
  6. }
  7. public E remove() {
  8. E x = poll();
  9. if (x != null)
  10. return x;
  11. else
  12. throw new NoSuchElementException();
  13. }
  14. public E element() {
  15. E x = peek();
  16. if (x != null)
  17. return x;
  18. else
  19. throw new NoSuchElementException();
  20. }

一般使用jdk中的阻塞队列是继承了AbstractQueue,同时实现了BlockingQueue,以ArrayBlockingQueue为例,当队列调用add()方法时,先调用super.add(e),ArrayBlockingQueue是AbstractQueue的子类,所以会调用AbstractQueue的add()方法,AbstractQueue的add()方法先调用子类实现的offer()方法,该方法返回false的话则抛异常。

  1. public boolean add(E e) {
  2. return super.add(e);
  3. }

所以当队列满了无法添加元素,或者是队列空了无法移除元素时:

1.png二、阻塞队列特性

1、阻塞

阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字,所以下面重点介绍阻塞功能:阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。

1)take 方法

take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。

2)put 方法

put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。

2、阻塞队列的边界

阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。无界队列意味着里面可以容纳非常多的元素,例如 LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。

3、应用场景

BlockingQueue 是线程安全的,我们在很多场景下都可以利用线程安全的队列来优雅地解决我们业务自身的线程安全问题。比如说,使用生产者/消费者模式的时候,我们生产者只需要往队列里添加元素,而消费者只需要从队列里取出它们就可以了。
因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问 题。生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问 题。这也就意味着,考虑锁等线程安全问题的重任从“你”转移到了“队列”上,降低了我们开发的难度和工作量。
同时,队列它还能起到一个隔离的作用。比如说我们开发一个银行转账的程序,那么生产者线程不需要关心具体的转账逻辑,只需要把转账任务,如账户和金额等信息放到队列中就可以,而不需要去关心银行这个类如何实现具体的转账业务。而作为银行这个类来讲,它会去从队列里 取出来将要执行的具体的任务,再去通过自己的各种方法来完成本次转账。这样就实现了具体任 务与执行任务类之间的解耦,任务被放在了阻塞队列中,而负责放任务的线程是无法直接访问到我们银行具体实现转账操作的对象的,实现了隔离,提高了安全性。

三、常见的阻塞队列

BlockingQueue 接口的实现类都被放在了 juc 包中,它们的区别主要体现在存储结构上或对元 素操作上的不同,但是对于take与put操作的原理,却是类似的。

2.png四、ArrayBlockingQueue

ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。
在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用 ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满, 大量生产线程被阻塞。
使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发 场景下会成为性能瓶颈。

1、ArrayBlockingQueue的原理

ArrayBlockingQueue是实现了BlockingQueue,继承了AbstractQueue。
3.png
在队列初始化时,确定了容量的大小,是不可扩容的。默认独占锁是非公平锁。

  1. public ArrayBlockingQueue(int capacity) {
  2. this(capacity, false);
  3. }
  4. // 上一个方法中this调用的构造方法
  5. public ArrayBlockingQueue(int capacity, boolean fair) {
  6. if (capacity <= 0)
  7. throw new IllegalArgumentException();
  8. this.items = new Object[capacity];
  9. lock = new ReentrantLock(fair);
  10. notEmpty = lock.newCondition();
  11. notFull = lock.newCondition();
  12. }

另外还有一个构造方法,可以直接传入一个集合。在这个构造方法中,加了一个独占锁,是为了保证可见性,因为在this(capacity, fair)这个构造方法中,new 了一个数组对象,在创建对象时并不是原子的,所以为了保证这个对象完全创建完并对其他线程可见。

  1. public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
  2. this(capacity, fair);
  3. // 此处加锁是为了保证可见性
  4. final ReentrantLock lock = this.lock;
  5. lock.lock(); // Lock only for visibility, not mutual exclusion
  6. try {
  7. int i = 0;
  8. try {
  9. for (E e : c) {
  10. checkNotNull(e);
  11. items[i++] = e;
  12. }
  13. } catch (ArrayIndexOutOfBoundsException ex) {
  14. throw new IllegalArgumentException();
  15. }
  16. count = i;
  17. putIndex = (i == capacity) ? 0 : i;
  18. } finally {
  19. lock.unlock();
  20. }
  21. }

利用了Lock锁的Condition通知机制进行阻塞控制。入队和出队操作使用的是同一个ReentrantLock对象,即同一把锁,所以put和take操作是互斥的,同时只能有一个操作在进行。

  1. // 数据保存的数组
  2. final Object[] items;
  3. // 取数据的指针
  4. int takeIndex;
  5. // 存数据的指针
  6. int putIndex;
  7. // 数据数量
  8. int count;
  9. // 独占锁
  10. final ReentrantLock lock;
  11. // 条件队列,判断数组是否满了
  12. private final Condition notEmpty;
  13. // 条件对了,判断数组是否空着
  14. private final Condition notFull;

2、put()方法

ArrayBlockingQueue-put.png

3、take()方法

ArrayBlockingQueue-take.png

五、LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存, 则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。 LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。

1、LinkedBlockingQueue的原理

LinkedBlockingQueue是实现了BlockingQueue,继承了AbstractQueue。
4.png
LinkedBlockingQueue的构造方法默认是无界队列,入果指定容量,那么就是有界队列。在队列创建的时候,就已经创建了一个Node节点作为头节点。

  1. public LinkedBlockingQueue() {
  2. this(Integer.MAX_VALUE);
  3. }
  4. // 可以指定队列容量
  5. public LinkedBlockingQueue(int capacity) {
  6. if (capacity <= 0) throw new IllegalArgumentException();
  7. this.capacity = capacity;
  8. last = head = new Node<E>(null);
  9. }
  1. Node节点中保存了具体的数据item,和下一个节点的指针。takeput操作分别对应一个独占锁和条件队列,这样不同的操作不会互斥。
  1. // 容量,指定容量就是有界队列
  2. private final int capacity;
  3. // 元素数量
  4. private final AtomicInteger count = new AtomicInteger();
  5. // 链表头 本身是不存储任何元素的,初始化时item指向null
  6. transient Node<E> head;
  7. // 链表尾
  8. private transient Node<E> last;
  9. // take锁 锁分离,提高效率
  10. private final ReentrantLock takeLock = new ReentrantLock();
  11. // notEmpty条件
  12. // 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
  13. private final Condition notEmpty = takeLock.newCondition();
  14. // put锁
  15. private final ReentrantLock putLock = new ReentrantLock();
  16. // notFull条件
  17. // 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
  18. private final Condition notFull = putLock.newCondition();
  19. //典型的单链表结构
  20. static class Node<E> {
  21. E item; //存储元素
  22. Node<E> next; //后继节点 单链表结构
  23. Node(E x) { item = x; }
  24. }

2、put()方法

LinkedBlockingQueue-put.png

3、take()方法

LinkedBlockingQueue-take.png

六、LinkedBlockingQueue与ArrayBlockingQueue对比

LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和 ArrayBlockingQueue的不同点在于:

1、队列大小有所不同

ArrayBlockingQueue是有界的初始化必须指定大小,而 LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。

2、数据存储容器不同

ArrayBlockingQueue采用的是数组作为数据存储容器,而 LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会 产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。

3、两者的实现队列添加或移除的锁不一样

ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而 LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

七、LinkedBlockingDeque

LinkedBlockingDeque是一个双端阻塞队列,可以从尾进头出,也可以头进尾出。内部也是Node节点组成,保存了指向头节点和尾节点的指针,使用单个锁来控制入队和出队操作同ArrayBlockingQueue。
继承了AbstractQueue,实现了BlockingDeque接口。
分别提供了putFirst(E e),putLast(E e),takeFirst(),takeLast()来操作队列的出队入队,逻辑同LinkedBlockingQueue。