引言

这篇文章,我们来看LinkedBlockingQueue的实现。

关键字段

LinkedBlockingQueue内部采用链表来实现队列,所以会有一个内部类Node来代表每个元素对应的节点,并且每个节点会有一个next引用指向下一个节点:

  1. static class Node<E> {
  2. E item;
  3. /**
  4. * One of:
  5. * - the real successor Node
  6. * - this Node, meaning the successor is head.next
  7. * - null, meaning there is no successor (this is the last node)
  8. */
  9. Node<E> next;
  10. Node(E x) { item = x; }
  11. }

而对于整个队列,只需要保存头结点和尾结点即可,所以会有两个Node类型的变量:

  1. /**
  2. * Head of linked list.
  3. * Invariant: head.item == null
  4. */
  5. transient Node<E> head;
  6. /**
  7. * Tail of linked list.
  8. * Invariant: last.next == null
  9. */
  10. private transient Node<E> last;

除了头结点和尾结点,还需要记录队列的容量和当前元素的数量:

  1. /** The capacity bound, or Integer.MAX_VALUE if none */
  2. private final int capacity;
  3. /** Current number of elements */
  4. private final AtomicInteger count = new AtomicInteger();

注意,count是原子int类型,由于count在队列元素的添加和删除操作中都会用到,将其设置为原子变量可以减少一部分lock操作,这个我们稍后会看到。
与ArrayBlockingQueue类似,LinkedBlockingQueue也会使用lock和condition来保证元素添加和删除操作的并发安全,不同的是LinkedBlockingQueue有两个lock,分别对应添加和删除操作,并且每个lock对应一个condition,分别代表条件队列未满和队列不为空,而ArrayBlockingQueue只有一个lock,对元素的添加和删除都会在同一个lock上进行锁定。

  1. /** Lock held by take, poll, etc */
  2. private final ReentrantLock takeLock = new ReentrantLock();
  3. /** Wait queue for waiting takes */
  4. private final Condition notEmpty = takeLock.newCondition();
  5. /** Lock held by put, offer, etc */
  6. private final ReentrantLock putLock = new ReentrantLock();
  7. /** Wait queue for waiting puts */
  8. private final Condition notFull = putLock.newCondition();

这两个lock和condition的用法我们马上会讲到。

构造方法

LinkedBlockingQueue有三个构造方法,我们先看前两个:

  1. public LinkedBlockingQueue() {
  2. this(Integer.MAX_VALUE);
  3. }
  4. /**
  5. * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
  6. *
  7. * @param capacity the capacity of this queue
  8. * @throws IllegalArgumentException if {@code capacity} is not greater
  9. * than zero
  10. */
  11. public LinkedBlockingQueue(int capacity) {
  12. if (capacity <= 0) throw new IllegalArgumentException();
  13. this.capacity = capacity;
  14. last = head = new Node<E>(null);
  15. }

前者其实是调用了后者,在没有指定队列容量的情况下,默认队列容量为Integer.MAX_VALUE,所以LinkedBlockingQueue是一个有界队列。然后会设置头结点和尾结点,注意并不是将头结点和尾结点设置为null,而是新创建了一个元素为null的Node,这个操作我们需要重点记一下,在删除数据的操作中,这一点有很大影响,并且LinkedBlockingQueue采用put锁和take锁分离的方式来提高吞吐量,也和这个有关系。还有一点要注意的是这里虽然有了一个节点,但是count并没有增加,也就是说这个初始化的节点并不计入总元素数量。
第三个构造方法需要一个元素的集合,它会将集合中的每个元素加入队列:

  1. public LinkedBlockingQueue(Collection<? extends E> c) {
  2. this(Integer.MAX_VALUE);
  3. final ReentrantLock putLock = this.putLock;
  4. putLock.lock(); // Never contended, but necessary for visibility
  5. try {
  6. int n = 0;
  7. for (E e : c) {
  8. if (e == null)
  9. throw new NullPointerException();
  10. if (n == capacity)
  11. throw new IllegalStateException("Queue full");
  12. enqueue(new Node<E>(e));
  13. ++n;
  14. }
  15. count.set(n);
  16. } finally {
  17. putLock.unlock();
  18. }
  19. }

注意,它首先也会调用第二个构造方法来初始化容量和头尾两个节点,也就是说在将参数集合中的每个元素加入队列前,队列中已经有一个元素为null的节点了,并且头结点和尾结点都指向它。节点加入队列的逻辑我们这里先不去分析,后面会讲到。
为了清晰地表示LinkedBlockingQueue的元素添加和删除操作,我们假设使用第二个构造方法创建了一个容量为3的队列,构造方法执行完成后,队列如下图所示:
con.png
此时,head和last都指向构造方法中创建的item为null的节点,它既是头结点也是尾结点。

put方法

put方法将元素添加到队列尾部,如果队列已满,就会阻塞直到能够执行添加操作。它的实现如下:

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. // Note: convention in all put/take/etc is to preset local var
  4. // holding count negative to indicate failure unless set.
  5. int c = -1;
  6. //首先构造一个node
  7. Node<E> node = new Node<E>(e);
  8. //获取put锁,也就是执行元素添加操作的线程是互斥的
  9. final ReentrantLock putLock = this.putLock;
  10. final AtomicInteger count = this.count;
  11. putLock.lockInterruptibly();
  12. try {
  13. /*
  14. * Note that count is used in wait guard even though it is
  15. * not protected by lock. This works because count can
  16. * only decrease at this point (all other puts are shut
  17. * out by lock), and we (or some other waiting put) are
  18. * signalled if it ever changes from capacity. Similarly
  19. * for all other uses of count in other wait guards.
  20. */
  21. //如果队列已满 挂起线程 等待被唤醒 此时线程会释放putLock
  22. while (count.get() == capacity) {
  23. notFull.await();
  24. }
  25. //队列未满 将节点加到队尾
  26. enqueue(node);
  27. //增加队列中元素的数量并返回之前的数量
  28. c = count.getAndIncrement();
  29. //如果加入元素之后队列未满 唤醒一个阻塞队列未满条件的执行节点添加的线程
  30. if (c + 1 < capacity)
  31. notFull.signal();
  32. } finally {
  33. //解锁
  34. putLock.unlock();
  35. }
  36. //如果当前的容量是1(注意c是添加元素之前的值,c==0说明当前队列元素数量为1)
  37. if (c == 0)
  38. signalNotEmpty();
  39. }

首先,执行元素添加操作需要获取putLock,也就是不同的执行元素添加操作的线程之间是互斥的,它们通过notFull这个putLock上的条件来进行通信,一个执行完添加节点操作的线程会通过signal方法唤醒一个阻塞在await处的线程。
我们继续看enqueue方法:

  1. private void enqueue(Node<E> node) {
  2. // assert putLock.isHeldByCurrentThread();
  3. // assert last.next == null;
  4. last = last.next = node;
  5. }

它的逻辑很简单,就是先将最后一个节点的next指向新加入的节点,然后将新加入的节点作为last。继续上面我们的流程,执行元素添加操作之后的结构如下:
equeue.png
然后看signalNotEmpty方法:

  1. private void signalNotEmpty() {
  2. final ReentrantLock takeLock = this.takeLock;
  3. takeLock.lock();
  4. try {
  5. notEmpty.signal();
  6. } finally {
  7. takeLock.unlock();
  8. }
  9. }

它会去获取takeLock,这是删除元素时需要的锁,然后它会唤醒一个阻塞在notEmpty条件上的执行节点删除操作的线程。为什么删除线程需要添加线程来唤醒呢?并且为什么只在队列的容量是1的时候唤醒呢?队列的容量是2或者更多的时候不是也可以进行删除操作吗?我们先看删除的方法。

take方法

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. final ReentrantLock takeLock = this.takeLock;
  6. takeLock.lockInterruptibly();
  7. try {
  8. while (count.get() == 0) {
  9. notEmpty.await();
  10. }
  11. x = dequeue();
  12. c = count.getAndDecrement();
  13. if (c > 1)
  14. notEmpty.signal();
  15. } finally {
  16. takeLock.unlock();
  17. }
  18. if (c == capacity)
  19. signalNotFull();
  20. return x;
  21. }

它的逻辑大体上与put方法类似,只是它获取的是takeLock并且阻塞条件是notEmpty,也就是队列非空。成功删除元素的线程在操作完成后如果元素的数量大于1也就是队列非空,就会执行signal方法唤醒一个阻塞在await处的线程。
元素的删除是通过dequeue方法实现的:

  1. private E dequeue() {
  2. // assert takeLock.isHeldByCurrentThread();
  3. // assert head.item == null;
  4. Node<E> h = head;
  5. Node<E> first = h.next;
  6. h.next = h; // help GC
  7. head = first;
  8. E x = first.item;
  9. first.item = null;
  10. return x;
  11. }

这个方法我们需要重点理解一下,我们一步一步分析:
首先

  1. Node<E> h = head;
  2. Node<E> first = h.next;
  3. h.next=h;

h指向第一个节点,first指向第二个节点,而h.next意思是第一个节点的next引用执行第一个节点自己,继续上面的图,此时是这样的布局:
take1.png
然后:

  1. head = first;
  2. E x = first.item;
  3. first.item = null;

head = first将LinkedBlockingQueue的头结点指向第二个节点,此时第二个节点成为头结点,然后返回它的item并将item置空。执行完成后的布局是这样的:
take2.png
这里有个疑问,为什么不是返回node1的item而是返回node2的item呢?因为node1的item永远是空的。从初始化LinkedBlockingQueue开始,第一个节点的item就是空的,然后执行take方法,会将下一个节点设置为头结点并返回该节点item,返回之后再置空,再次删除时会对下一个节点执行同样的操作,每次删除完成之后,头结点的item都会被置空,所以这里会返回第二个节点的item。
在take方法中如果当前元素的数量为队列的容量-1,就会调用signalNotFull方法:

  1. private void signalNotFull() {
  2. final ReentrantLock putLock = this.putLock;
  3. putLock.lock();
  4. try {
  5. notFull.signal();
  6. } finally {
  7. putLock.unlock();
  8. }
  9. }

这个方法与signalNotEmpty方法类似,是唤醒一个阻塞在put Lock的notEmpty条件上的节点添加线程。那么就会有与之前signalNotEmpty同样的疑问,为什么需要删除线程来唤醒添加线程呢?为什么一定要在元素数量是容量-1的时候唤醒呢?只要元素数量比容量小都可以唤醒添加线程吧?

signalNotEmpty与signalNotFull的含义

其实这两个方法的作用可以一起解释。当队列满后,所有执行添加节点操作的线程都会阻塞在notFull条件上,此时这些线程之间就不能相互唤醒,就需要执行删除节点操作的线程来唤醒其中一个。同理,当队列为空时,所有执行删除节点操作的线程都会阻塞在notEmpty条件上,此时这些线程之间也不能相互唤醒,就需要执行添加节点操作的线程来唤醒其中一个。而这两种唤醒只需要队列由已满状态变为未满状态或者由已空状态变为非空状态这两个时间点执行即可。

为什么可以使用两个锁

再看一个问题,为什么LinkedBlockingQueue可以使用putLock和takeLock两个锁来分别控制添加和删除节点的并发安全呢?这两个操作之间不需要同步吗?
我们看添加和删除节点操作都会对什么进行操作:enqueue中只对last进行修改,dequeue中只对head进行操作,所以这两个方法即使同时执行没有问题,他们没有修改同一个变量。再有就是这操作都会对count进行修改,但是count是原子类型,不用担心线程安全。所以添加和删除节点不需要同一个锁,这样也大大提高了LinkedBlockingQueue的吞吐量。

LinkedBlockingQueue还有很多其他方法,这里不再分析。

小结

LinkedBlockingQueue内部使用链表来实现有界队列,它通过两个lock和对应的condition分别实现了元素的添加和删除操作,添加和删除元素之间没有进行同步,大大提高了整个队列的吞吐量。