1. ArrayBlockingQueue 简介

在多线程编程过程中,为了业务解耦和架构设计,经常会使用并发容器用于存储多线程间的共享数据,这样不仅可以保证线程安全,还可以简化各个线程操作。例如在“生产者-消费者”问题中,会使用阻塞队列(BlockingQueue)作为数据容器,关于 BlockingQueue 可以看这篇文章。为了加深对阻塞队列的理解,唯一的方式是对其实验原理进行理解,这篇文章就主要来看看 ArrayBlockingQueue 和 LinkedBlockingQueue 的实现原理。

2. ArrayBlockingQueue 实现原理

阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素。当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。那么,多线程中消息通知机制最常用的是 lock 的 condition 机制,关于 condition 可以看这篇文章的详细介绍。那么 ArrayBlockingQueue 的实现是不是也会采用 Condition 的通知机制呢?下面来看看。

2.1 ArrayBlockingQueue 的主要属性

ArrayBlockingQueue 的主要属性如下:

  1. /** The queued items */
  2. final Object[] items;
  3. /** items index for next take, poll, peek or remove */
  4. int takeIndex;
  5. /** items index for next put, offer, or add */
  6. int putIndex;
  7. /** Number of elements in the queue */
  8. int count;
  9. /*
  10. Concurrency control uses the classic two-condition algorithm
  11. found in any textbook.
  12. */
  13. /** Main lock guarding all access */
  14. final ReentrantLock lock;
  15. /** Condition for waiting takes */
  16. private final Condition notEmpty;
  17. /** Condition for waiting puts */
  18. private final Condition notFull;

从源码中可以看出 ArrayBlockingQueue 内部是采用数组进行数据存储的(属性items),为了保证线程安全,采用的是ReentrantLock lock,为了保证可阻塞式的插入删除数据利用的是 Condition,当获取数据的消费者线程被阻塞时会将该线程放置到 notEmpty 等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到 notFull 等待队列中。而 notEmpty 和 notFull 等中要属性在构造方法中进行创建:

  1. public ArrayBlockingQueue(int capacity, boolean fair) {
  2. if (capacity <= 0)
  3. throw new IllegalArgumentException();
  4. this.items = new Object[capacity];
  5. lock = new ReentrantLock(fair);
  6. notEmpty = lock.newCondition();
  7. notFull = lock.newCondition();
  8. }

接下来,主要看看可阻塞式的 put 和 take 方法是怎样实现的。

2.2 put 方法详解

put(E e)方法源码如下:

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. //如果当前队列已满,将线程移入到notFull等待队列中
  7. while (count == items.length)
  8. notFull.await();
  9. //满足插入数据的要求,直接进行入队操作
  10. enqueue(e);
  11. } finally {
  12. lock.unlock();
  13. }
  14. }

该方法的逻辑很简单,当队列已满时(count == items.length)将线程移入到 notFull 等待队列中,如果当前满足插入数据的条件,就可以直接调用enqueue(e)插入数据元素。enqueue 方法源码为:

  1. private void enqueue(E x) {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[putIndex] == null;
  4. final Object[] items = this.items;
  5. //插入数据
  6. items[putIndex] = x;
  7. if (++putIndex == items.length)
  8. putIndex = 0;
  9. count++;
  10. //通知消费者线程,当前队列中有数据可供消费
  11. notEmpty.signal();
  12. }

enqueue 方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(items[putIndex] = x),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(notEmpty.signal())。

2.3 take 方法详解

take 方法源码如下:

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. //如果队列为空,没有数据,将消费者线程移入等待队列中
  6. while (count == 0)
  7. notEmpty.await();
  8. //获取数据
  9. return dequeue();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }

take 方法也主要做了两步:1. 如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中;2. 若队列不为空则获取数据,即完成出队操作dequeue。dequeue 方法源码为:

  1. private E dequeue() {
  2. // assert lock.getHoldCount() == 1;
  3. // assert items[takeIndex] != null;
  4. final Object[] items = this.items;
  5. @SuppressWarnings("unchecked")
  6. //获取数据
  7. E x = (E) items[takeIndex];
  8. items[takeIndex] = null;
  9. if (++takeIndex == items.length)
  10. takeIndex = 0;
  11. count--;
  12. if (itrs != null)
  13. itrs.elementDequeued();
  14. //通知被阻塞的生产者线程
  15. notFull.signal();
  16. return x;
  17. }

dequeue 方法也主要做了两件事情:1. 获取队列中的数据,即获取数组中的数据元素((E) items[takeIndex]);2. 通知 notFull 等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得 lock,并执行完成功退出。

从以上分析,可以看出 put 和 take 方法主要是通过 condition 的通知机制来完成可阻塞式的插入数据和获取数据。在理解 ArrayBlockingQueue 后再去理解 LinkedBlockingQueue 就很容易了。

3. LinkedBlockingQueue 实现原理

LinkedBlockingQueue 是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为Integer.MAX_VALUE。从它的构造方法可以看出:

  1. public LinkedBlockingQueue() {
  2. this(Integer.MAX_VALUE);
  3. }

3.1 LinkedBlockingQueue 的主要属性

LinkedBlockingQueue 的主要属性有:

  1. /** Current number of elements */
  2. private final AtomicInteger count = new AtomicInteger();
  3. /**
  4. Head of linked list.
  5. Invariant: head.item == null
  6. */
  7. transient Node<E> head;
  8. /**
  9. Tail of linked list.
  10. Invariant: last.next == null
  11. */
  12. private transient Node<E> last;
  13. /** Lock held by take, poll, etc */
  14. private final ReentrantLock takeLock = new ReentrantLock();
  15. /** Wait queue for waiting takes */
  16. private final Condition notEmpty = takeLock.newCondition();
  17. /** Lock held by put, offer, etc */
  18. private final ReentrantLock putLock = new ReentrantLock();
  19. /** Wait queue for waiting puts */
  20. private final Condition notFull = putLock.newCondition();

可以看出与 ArrayBlockingQueue 主要的区别是,LinkedBlockingQueue 在插入数据和删除数据时分别是由两个不同的 lock(takeLockputLock)来控制线程安全的,因此,也由这两个 lock 生成了两个对应的 condition(notEmptynotFull)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,Node 结点的定义为:

  1. static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; }
  2. }

接下来,我们也同样来看看 put 方法和 take 方法的实现。

3.2 put 方法详解

put 方法源码为:

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. // Note: convention in all put/take/etc is to preset local var
  4. // holding count negative to indicate failure unless set.
  5. int c = -1;
  6. Node<E> node = new Node<E>(e);
  7. final ReentrantLock putLock = this.putLock;
  8. final AtomicInteger count = this.count;
  9. putLock.lockInterruptibly();
  10. try {
  11. /*
  12. * Note that count is used in wait guard even though it is
  13. * not protected by lock. This works because count can
  14. * only decrease at this point (all other puts are shut
  15. * out by lock), and we (or some other waiting put) are
  16. * signalled if it ever changes from capacity. Similarly
  17. * for all other uses of count in other wait guards.
  18. */
  19. //如果队列已满,则阻塞当前线程,将其移入等待队列
  20. while (count.get() == capacity) {
  21. notFull.await();
  22. }
  23. //入队操作,插入数据
  24. enqueue(node);
  25. c = count.getAndIncrement();
  26. //若队列满足插入数据的条件,则通知被阻塞的生产者线程
  27. if (c + 1 < capacity)
  28. notFull.signal();
  29. } finally {
  30. putLock.unlock();
  31. }
  32. if (c == 0)
  33. signalNotEmpty();
  34. }

put 方法的逻辑也同样很容易理解,可见注释。基本上和 ArrayBlockingQueue 的 put 方法一样。take 方法的源码如下:

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. final ReentrantLock takeLock = this.takeLock;
  6. takeLock.lockInterruptibly();
  7. try {
  8. //当前队列为空,则阻塞当前线程,将其移入到等待队列中,直至满足条件
  9. while (count.get() == 0) {
  10. notEmpty.await();
  11. }
  12. //移除队头元素,获取数据
  13. x = dequeue();
  14. c = count.getAndDecrement();
  15. //如果当前满足移除元素的条件,则通知被阻塞的消费者线程
  16. if (c > 1)
  17. notEmpty.signal();
  18. } finally {
  19. takeLock.unlock();
  20. }
  21. if (c == capacity)
  22. signalNotFull();
  23. return x;
  24. }

take 方法的主要逻辑请见于注释,也很容易理解。

4. ArrayBlockingQueue 与 LinkedBlockingQueue 的比较

相同点:ArrayBlockingQueue 和 LinkedBlockingQueue 都是通过 condition 通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;

不同点:1. ArrayBlockingQueue 底层是采用的数组进行实现,而 LinkedBlockingQueue 则是采用链表数据结构;

  1. ArrayBlockingQueue 插入和删除数据,只采用了一个 lock,而 LinkedBlockingQueue 则是在插入和删除分别采用了putLocktakeLock,这样可以降低线程由于线程无法获取到 lock 而进入 WAITING 状态的可能性,从而提高了线程并发执行的效率。

作者链接:https://juejin.cn/post/6844903602448760845