LinkedBlockingQueue 类结构图

点击查看【processon】

  1. LinkedBlockingQueue 是一个单链表结构,其中的 head 和 last 分别表示头结点和尾节点。count 表示队列中元素的个数
  2. capacity 表示队列的容量,默认为 Integer.MAX_VALUE
  3. putLock 和 takeLock 分别用来控制元素入队和出队的原子性,其中 takeLock 用来控制同时只有一个线程可以从队列头获取元素,其他线程必须等待,putLock 控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必须等待。另外,notEmpty 和 notFull 是条件变量,它们内部都有一个条件队列用来存放进队和出队时被阻塞的线程

LinkedBlockingQueue 实现原理

LinkedBlockingQueue 是一个基于单链表结构的阻塞队列,用两把锁分别控制队头和队尾的操作,因此也是 2 把锁 + 2 个条件的结构,同时还使用了一个 AtomicInteger 原子变量 count 来记录队列的元素个数。

  1. public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
  2. /** 队列容量 */
  3. private final int capacity;
  4. /**链表队列元素数量 */
  5. private final AtomicInteger count = new AtomicInteger();
  6. /** 单链表的头结点 */
  7. transient Node<E> head;
  8. /** 单链表的尾结点 */
  9. private transient Node<E> last;
  10. /** 执行 take, poll 等操作时需要获取该锁 */
  11. private final ReentrantLock takeLock = new ReentrantLock();
  12. /** 当队列为空时,执行出队操作的线程会被放入该条件队列进行等待 */
  13. private final Condition notEmpty = takeLock.newCondition();
  14. /** 执行 put, offer 等操作时需要获取该锁 */
  15. private final ReentrantLock putLock = new ReentrantLock();
  16. /** 当队列已满时,执行入队操作的线程会被放入该条件队列进行等待 */
  17. private final Condition notFull = putLock.newCondition();
  18. }

offer(E) - 入队非阻塞操作

  1. public boolean offer(E e) {
  2. // (1)
  3. if (e == null) throw new NullPointerException();
  4. final AtomicInteger count = this.count;
  5. // (2)
  6. if (count.get() == capacity)
  7. return false;
  8. int c = -1;
  9. // (3)
  10. Node<E> node = new Node<E>(e);
  11. final ReentrantLock putLock = this.putLock;
  12. putLock.lock();
  13. try {
  14. // (4)队列为满,直接将节点入队
  15. if (count.get() < capacity) {
  16. enqueue(node);
  17. // count+1
  18. c = count.getAndIncrement();
  19. //(5) 队列未满,唤醒入队线程
  20. if (c + 1 < capacity)
  21. notFull.signal();
  22. }
  23. } finally {
  24. // 释放锁
  25. putLock.unlock();
  26. }
  27. // (6)
  28. if (c == 0)
  29. signalNotEmpty();
  30. return c >= 0;
  31. }
  32. private void enqueue(Node<E> node) {
  33. // 1. last.next = node
  34. // 2. last = last.next
  35. last = last.next = node;
  36. }
  1. 如果入队元素 e 为 null 则抛出 NullPointerException 异常
  2. 如果队列已满,则抛弃该元素,返回 false
  3. 构建新节点,并且尝试获取 putLock
  4. 如果队列不满则进队列,并递增元素计数
  5. 如果新元素入队后队列还有空闲空间,则唤醒 notFull 的条件队列里面因为调用了 notFull 的 await 操作(比如执行 put 方法而队列满了的时候)而被阻塞的一个线程,因为队列现在有空闲所以这里可以提前唤醒一个入队线程
  6. c == 0 表示链表队列中在添加该元素之前没有元素,即长度为 0,如果之前有通过 take() 阻塞方法获取元素由于队列为空而阻塞的线程,则执行 signalNotEmpty() 进行唤醒;

signalNotEmpty() 的源码如下:

  1. private void signalNotEmpty() {
  2. final ReentrantLock takeLock = this.takeLock;
  3. takeLock.lock();
  4. try {
  5. notEmpty.signal();
  6. } finally {
  7. takeLock.unlock();
  8. }
  9. }

该方法的作用就是激活 notEmpty 的条件队列中因为调用 notEmpty 的 await 方法(比如调用 take 方法并且队列为空的时候)而被阻塞的一个线程,这也说明了调用条件变量的方法前要获取对应的锁。

综上可知,offer 方法通过使用 putLock 锁保证了在队尾新增元素操作的原子性。另外,调用条件变量的方法前一定要记得获取对应的锁,并且注意进队时只操作队列链表的尾节点。

put(E) - 入队阻塞操作

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. int c = -1;
  4. Node<E> node = new Node<E>(e);
  5. final ReentrantLock putLock = this.putLock;
  6. final AtomicInteger count = this.count;
  7. putLock.lockInterruptibly();
  8. try {
  9. // (1)队列已满则阻塞 notFull 队列
  10. while (count.get() == capacity) {
  11. notFull.await();
  12. }
  13. // 唤醒之后入队
  14. enqueue(node);
  15. c = count.getAndIncrement();
  16. if (c + 1 < capacity)
  17. notFull.signal();
  18. } finally {
  19. putLock.unlock();
  20. }
  21. if (c == 0)
  22. signalNotEmpty();
  23. }

向队列尾部插入一个元素,如果队列中有空闲则插入后直接返回,如果队列已满则阻塞当前线程,直到队列有空闲插入成功后返回。如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。另外,如果 e 元素为 null 则抛出 NullPointerException 异常。

注意:(1)中的判断使用 while 循环而不是 if 语句,是因为考虑到当前线程被虚假唤醒的问题,也就是其他线程没有调用 notFull 的 singal 方法时 notFull.await() 在某种情况下会自动返回。

poll() - 移除元素,不阻塞操作

  1. public E poll() {
  2. final AtomicInteger count = this.count;
  3. if (count.get() == 0)
  4. return null;
  5. E x = null;
  6. int c = -1;
  7. final ReentrantLock takeLock = this.takeLock;
  8. takeLock.lock();
  9. try {
  10. // (1)队列不为空,出队并且递减计数
  11. if (count.get() > 0) {(2
  12. x = dequeue();(3
  13. c = count.getAndDecrement();
  14. if (c > 1)
  15. notEmpty.signal();
  16. }
  17. } finally {
  18. takeLock.unlock();
  19. }
  20. // (4)
  21. if (c == capacity)
  22. signalNotFull();
  23. return x;
  24. }
  25. private E dequeue() {
  26. Node<E> h = head;
  27. Node<E> first = h.next;
  28. h.next = h; // help GC
  29. head = first;
  30. E x = first.item;
  31. first.item = null;
  32. return x;
  33. }

(1)处判断如果当前队列不为空则进行出队操作,然后递减计数器。注意:如何保证执行代码(2)时队列不空,而执行代码(3)时也一定不会空呢?毕竟这不是原子性操作,会不会出现代码(2)判断队列不为空,但是执行代码(3)时队列为空了呢?那么我们看在执行到代码(2)前在哪些地方会修改 count 的计数。由于当前线程已经拿到了 takeLock ,所以其他调用 poll 或者 take 方法的线程不可能会走到修改 count 计数的地方。其实这时候如果能走到修改 count 计数的地方是因为其他线程调用了 put 和 offer 操作,由于这两个操作不需要获取 takeLock 锁而获取的是 putLock 锁,但是在 put 和 offer 操作内部是增加 count 计数值的,所以不会出现上面所说的情况,即 count 一定是大于 0 的。其实只需要看在哪些地方递减了 count 计数值即可,只有递减了 count 计数值才会出现上面说的,执行代码(1)时队列不空,而执行代码(2)时队列为空的情况。我们查看代码,只有在 polltake 或者 remove 操作的地方会递减 count 计数值,但是这三个方法都需要获取到 takeLock 锁才能进行操作,而当前线程已经获取了 takeLock 锁,所以其他线程没有机会在当前情况下递减 count 计数值,所以看起来代码(1)、(2)不是原子性的,但是它们是线程安全的。

(4)处说明当前线程移除队头元素前当前队列是满的,移除队头元素后当前队列至少有一个空闲位置,那么这时候就可以调用 signalNotFull 激活因为调用 put 方法而被阻塞到 notFull 的条件队列里的一个线程。signalNotFull 的代码如下:

  1. private void signalNotFull() {
  2. final ReentrantLock putLock = this.putLock;
  3. putLock.lock();
  4. try {
  5. notFull.signal();
  6. } finally {
  7. putLock.unlock();
  8. }
  9. }

peek() - 获取元素,不阻塞操作

  1. public E peek() {
  2. if (count.get() == 0)
  3. return null;
  4. final ReentrantLock takeLock = this.takeLock;
  5. takeLock.lock();
  6. try {
  7. Node<E> first = head.next;
  8. if (first == null)
  9. return null;
  10. else
  11. return first.item;
  12. } finally {
  13. takeLock.unlock();
  14. }
  15. }


获取队列头部元素但是不从队列里面移除它,如果队列为空则返回 null。该方法是不阻塞的。

take() - 获取并移除元素,阻塞操作

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. final ReentrantLock takeLock = this.takeLock;
  6. takeLock.lockInterruptibly();
  7. try {
  8. // 队列为空则阻塞挂起
  9. while (count.get() == 0) {
  10. notEmpty.await();
  11. }
  12. // 元素出队并递减计数
  13. x = dequeue();
  14. c = count.getAndDecrement();
  15. if (c > 1)
  16. notEmpty.signal();
  17. } finally {
  18. takeLock.unlock();
  19. }
  20. if (c == capacity)
  21. signalNotFull();
  22. return x;
  23. }

获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。

remove(Object) - 移除指定元素

  1. public boolean remove(Object o) {
  2. if (o == null) return false;
  3. //(1)获取双重锁
  4. fullyLock();
  5. try {
  6. for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
  7. if (o.equals(p.item)) {
  8. //(2)移除元素
  9. unlink(p, trail);
  10. return true;
  11. }
  12. }
  13. return false;
  14. } finally {
  15. fullyUnlock();
  16. }
  17. }
  18. void fullyLock() {
  19. putLock.lock();
  20. takeLock.lock();
  21. }
  22. void fullyUnlock() {
  23. takeLock.unlock();
  24. putLock.unlock();
  25. }


删除队列里面指定的元素,有则删除并返回 true,没有则返回 false。通过 fullyLock() 获取双重锁,获取后,其他线程进行入队或者出队操作时就会被阻塞挂起。(2)中遍历队列寻找要删除的元素,找不到则直接返回 false,找到则执行 unlink 操作。unlik 操作的代码如下:

  1. void unlink(Node<E> p, Node<E> trail) {
  2. p.item = null;
  3. trail.next = p.next;
  4. if (last == p)
  5. last = trail;
  6. // 如果当前队列满,则删除后需要唤醒等待的线程
  7. if (count.getAndDecrement() == capacity)
  8. notFull.signal();
  9. }

删除元素后,如果发现当前队列有空闲空间,则唤醒 notFull 的条件队列中的一个因为调用 put 方法而被阻塞的线程。

由于 remove 方法在删除指定元素前加了两把锁,所以在遍历队列查找指定元素的过程中是线程安全的,并且此时其他调用入队、出队操作的线程全部会被阻塞。另外,获取多个资源锁的顺序与释放的顺序是相反的。