前面介绍了使用 CAS 算法实现的非阻塞队列 ConcurrentLinkedQueue,下面我们来介绍使用独占锁实现的阻塞队列 LinkedBlockingQueue。

类图结构

同样首先看一下 LinkedBlockingQueue 的类图结构,以便从全局对 LinkedBlockingQueue 有个直观的了解,如图 7-28 所示。

LinkedBlockingQueue 原理探究 - 图1

由类图可以看到,LinkedBlockingQueue 也是使用单向链表实现的,其也有两个 Node,分别用来存放首、尾节点,并且还有一个初始值为 0 的原子变量 count,用来记录队列元素个数。另外还有两个 ReentrantLock 的实例,分别用来控制元素入队和出队的原子性,其中 takeLock 用来控制同时只有一个线程可以从队列头获取元素,其他线程必须等待,putLock 控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必须等待。另外,notEmpty 和 notFull 是条件变量,它们内部都有一个条件队列用来存放进队和出队时被阻塞的线程,其实这是生产者—消费者模型。如下是独占锁的创建代码。

  1. /** 执行 take poll 等操作时需要获取该锁 */
  2. private final ReentrantLock takeLock = new ReentrantLock();
  3. /** 当队列为空时,执行出队操作(比如 take)的线程会被放入这个条件队列进行等待 */
  4. private final Condition notEmpty = takeLock.newCondition();
  5. /** 执行 put offer 等操作时需要获取该锁*/
  6. private final ReentrantLock putLock = new ReentrantLock();
  7. /**当队列满时,执行进队操作(比如 put)的线程会被放入这个条件队列进行等待 */
  8. private final Condition notFull = putLock.newCondition();
  9. /** 当前队列元素个数 */
  10. private final AtomicInteger count = new AtomicInteger(0);

● 当调用线程在 LinkedBlockingQueue 实例上执行 take、poll 等操作时需要获取到 takeLock 锁,从而保证同时只有一个线程可以操作链表头节点。另外由于条件变量 notEmpty 内部的条件队列的维护使用的是 takeLock 的锁状态管理机制,所以在调用 notEmpty 的 await 和 signal 方法前调用线程必须先获取到 takeLock 锁,否则会抛出 IllegalMonitorStateException 异常。notEmpty 内部则维护着一个条件队列,当线程获取到 takeLock 锁后调用 notEmpty 的 await 方法时,调用线程会被阻塞,然后该线程会被放到 notEmpty 内部的条件队列进行等待,直到有线程调用了 notEmpty 的 signal 方法。

● 在 LinkedBlockingQueue 实例上执行 put、offer 等操作时需要获取到 putLock 锁,从而保证同时只有一个线程可以操作链表尾节点。同样由于条件变量 notFull 内部的条件队列的维护使用的是 putLock 的锁状态管理机制,所以在调用 notFull 的 await 和 signal 方法前调用线程必须先获取到 putLock 锁,否则会抛出 IllegalMonitorStateException 异常。notFull 内部则维护着一个条件队列,当线程获取到 putLock 锁后调用 notFull 的 await 方法时,调用线程会被阻塞,然后该线程会被放到 notFull 内部的条件队列进行等待,直到有线程调用了 notFull 的 signal 方法。如下是 LinkedBlockingQueue 的无参构造函数的代码。

  1. public static final int MAX_VALUE = 0x7fffffff;
  2. public LinkedBlockingQueue() {
  3. this(Integer.MAX_VALUE);
  4. }
  5. public LinkedBlockingQueueint capacity {
  6. if capacity <= 0 throw new IllegalArgumentException();
  7. this.capacity = capacity
  8. //初始化首、尾节点,让它们指向哨兵节点
  9. last = head = new Node<E>(null);
  10. }

由该代码可知,默认队列容量为 0x7fffffff,用户也可以自己指定容量,所以从一定程度上可以说 LinkedBlockingQueue 是有界阻塞队列。

LinkedBlockingQueue 原理介绍

本节讲解 LinkedBlockingQueue 的几个重要方法。

1.offer 操作

向队列尾部插入一个元素,如果队列中有空闲则插入成功后返回 true,如果队列已满则丢弃当前元素然后返回 false。如果 e 元素为 null 则抛出 NullPointerException 异常。另外,该方法是非阻塞的。

  1. public boolean offerE e {
  2. //(1)为空元素则抛出空指针异常
  3. if e == null throw new NullPointerException();
  4. //(2) 如果当前队列满则丢弃将要放入的元素,然后返回 false
  5. final AtomicInteger count = this.count
  6. if count.get() == capacity
  7. return false
  8. //(3) 构造新节点,获取 putLock 独占锁
  9. int c = -1
  10. Node<E> node = new Node<E>(e);
  11. final ReentrantLock putLock = this.putLock
  12. putLock.lock();
  13. try {
  14. //(4)如果队列不满则进队列,并递增元素计数
  15. if count.get() < capacity {
  16. enqueuenode);
  17. c = count.getAndIncrement();
  18. //(5)
  19. if c + 1 < capacity
  20. notFull.signal();
  21. }
  22. } finally {
  23. //(6)释放锁
  24. putLock.unlock();
  25. }
  26. //(7)
  27. if c == 0
  28. signalNotEmpty();
  29. //(8)
  30. return c >= 0
  31. }
  32. private void enqueueNode<E> node {
  33. last = last.next = node
  34. }

代码(2)判断如果当前队列已满则丢弃当前元素并返回 false。

代码(3)获取到 putLock 锁,当前线程获取到该锁后,则其他调用 put 和 offer 操作的线程将会被阻塞(阻塞的线程被放到 putLock 锁的 AQS 阻塞队列)。

代码(4)这里重新判断当前队列是否满,这是因为在执行代码(2)和获取到 putLock 锁期间可能其他线程通过 put 或者 offer 操作向队列里面添加了新元素。重新判断队列确实不满则新元素入队,并递增计数器。

代码(5)判断如果新元素入队后队列还有空闲空间,则唤醒 notFull 的条件队列里面因为调用了 notFull 的 await 操作(比如执行 put 方法而队列满了的时候)而被阻塞的一个线程,因为队列现在有空闲所以这里可以提前唤醒一个入队线程。

代码(6)则释放获取的 putLock 锁,这里要注意,锁的释放一定要在 finally 里面做,因为即使 try 块抛出异常了,finally 也是会被执行到。另外释放锁后其他因为调用 put 操作而被阻塞的线程将会有一个获取到该锁。

代码(7)中的 c==0 说明在执行代码(6)释放锁时队列里面至少有一个元素,队列里面有元素则执行 signalNotEmpty 操作,signalNotEmpty 的代码如下。

  1. private void signalNotEmpty() {
  2. final ReentrantLock takeLock = this.takeLock;
  3. takeLock.lock();
  4. try {
  5. notEmpty.signal();
  6. } finally {
  7. takeLock.unlock();
  8. }
  9. }

该方法的作用就是激活 notEmpty 的条件队列中因为调用 notEmpty 的 await 方法(比如调用 take 方法并且队列为空的时候)而被阻塞的一个线程,这也说明了调用条件变量的方法前要获取对应的锁。

综上可知,offer 方法通过使用 putLock 锁保证了在队尾新增元素操作的原子性。另外,调用条件变量的方法前一定要记得获取对应的锁,并且注意进队时只操作队列链表的尾节点。

2.put 操作

向队列尾部插入一个元素,如果队列中有空闲则插入后直接返回,如果队列已满则阻塞当前线程,直到队列有空闲插入成功后返回。如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。另外,如果 e 元素为 null 则抛出 NullPointerException 异常。

put 操作的代码结构与 offer 操作类似,代码如下。

  1. public void putE e throws InterruptedException {
  2. //(1)如果为空元素则抛出空指针异常
  3. if e == null throw new NullPointerException();
  4. //(2) 构建新节点,并获取独占锁 putLock
  5. int c = -1
  6. Node<E> node = new Node<E>(e);
  7. final ReentrantLock putLock = this.putLock
  8. final AtomicInteger count = this.count
  9. putLock.lockInterruptibly();
  10. try {
  11. //(3)如果队列满则等待
  12. while count.get() == capacity {
  13. notFull.await();
  14. }
  15. //(4)进队列并递增计数
  16. enqueuenode);
  17. c = count.getAndIncrement();
  18. //(5)
  19. if (c + 1 < capacity)
  20. notFull.signal();
  21. } finally {
  22. //(6)
  23. putLock.unlock();
  24. }
  25. //(7)
  26. if (c == 0)
  27. signalNotEmpty();
  28. }

在代码(2)中使用 putLock.lockInterruptibly()获取独占锁,相比在 offer 方法中获取独占锁的方法这个方法可以被中断。具体地说就是当前线程在获取锁的过程中,如果被其他线程设置了中断标志则当前线程会抛出 InterruptedException 异常,所以 put 操作在获取锁的过程中是可被中断的。

代码(3)判断如果当前队列已满,则调用 notFull 的 await()方法把当前线程放入 notFull 的条件队列,当前线程被阻塞挂起后会释放获取到的 putLock 锁。由于 putLock 锁被释放了,所以现在其他线程就有机会获取到 putLock 锁了。

另外代码(3)在判断队列是否为空时为何使用 while 循环而不是 if 语句?这是考虑到当前线程被虚假唤醒的问题,也就是其他线程没有调用 notFull 的 singal 方法时 notFull. await()在某种情况下会自动返回。如果使用 if 语句那么虚假唤醒后会执行代码(4)的元素入队操作,并且递增计数器,而这时候队列已经满了,从而导致队列元素个数大于队列被设置的容量,进而导致程序出错。而使用 while 循环时,假如 notFull.await()被虚假唤醒了,那么再次循环检查当前队列是否已满,如果是则再次进行等待。

3.poll 操作

从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。

  1. public E poll() {
  2. //(1)队列为空则返回 null
  3. final AtomicInteger count = this.count
  4. if count.get() == 0
  5. return null
  6. //(2)获取独占锁
  7. E x = null
  8. int c = -1
  9. final ReentrantLock takeLock = this.takeLock
  10. takeLock.lock();
  11. try {
  12. //(3)队列不空则出队并递减计数
  13. if count.get() > 0 {//3.1
  14. x = dequeue(); //3.2
  15. c = count.getAndDecrement(); //3.3
  16. //(4)
  17. if c > 1
  18. notEmpty.signal();
  19. }
  20. } finally {
  21. //(5)
  22. takeLock.unlock();
  23. }
  24. //(6)
  25. if c == capacity
  26. signalNotFull();
  27. //(7)返回
  28. return x
  29. }
  30. private E dequeue() {
  31. Node<E> h = head
  32. Node<E> first = h.next
  33. h.next = h // help GC
  34. head = first
  35. E x = first.item
  36. first.item = null
  37. return x
  38. }

代码(1)判断如果当前队列为空,则直接返回 null。

代码(2)获取独占锁 takeLock,当前线程获取该锁后,其他线程在调用 poll 或者 take 方法时会被阻塞挂起。

代码(3)判断如果当前队列不为空则进行出队操作,然后递减计数器。这里需要思考,如何保证执行代码 3.1 时队列不空,而执行代码 3.2 时也一定不会空呢?毕竟这不是原子性操作,会不会出现代码 3.1 判断队列不为空,但是执行代码 3.2 时队列为空了呢?那么我们看在执行到代码 3.2 前在哪些地方会修改 count 的计数。由于当前线程已经拿到了 takeLock 锁,所以其他调用 poll 或者 take 方法的线程不可能会走到修改 count 计数的地方。其实这时候如果能走到修改 count 计数的地方是因为其他线程调用了 put 和 offer 操作,由于这两个操作不需要获取 takeLock 锁而获取的是 putLock 锁,但是在 put 和 offer 操作内部是增加 count 计数值的,所以不会出现上面所说的情况。其实只需要看在哪些地方递减了 count 计数值即可,只有递减了 count 计数值才会出现上面说的,执行代码 3.1 时队列不空,而执行代码 3.2 时队列为空的情况。我们查看代码,只有在 poll、take 或者 remove 操作的地方会递减 count 计数值,但是这三个方法都需要获取到 takeLock 锁才能进行操作,而当前线程已经获取了 takeLock 锁,所以其他线程没有机会在当前情况下递减 count 计数值,所以看起来代码 3.1、3.2 不是原子性的,但是它们是线程安全的。

代码(4)判断如果 c>1 则说明当前线程移除掉队列里面的一个元素后队列不为空(c 是删除元素前队列元素个数),那么这时候就可以激活因为调用 take 方法而被阻塞到 notEmpty 的条件队列里面的一个线程。

代码(6)说明当前线程移除队头元素前当前队列是满的,移除队头元素后当前队列至少有一个空闲位置,那么这时候就可以调用 signalNotFull 激活因为调用 put 方法而被阻塞到 notFull 的条件队列里的一个线程,signalNotFull 的代码如下。

  1. private void signalNotFull() {
  2. final ReentrantLock putLock = this.putLock;
  3. putLock.lock();
  4. try {
  5. notFull.signal();
  6. } finally {
  7. putLock.unlock();
  8. }
  9. }

poll 代码逻辑比较简单,值得注意的是,获取元素时只操作了队列的头节点。

4.peek 操作

获取队列头部元素但是不从队列里面移除它,如果队列为空则返回 null。该方法是不阻塞的。

  1. public E peek() {
  2. //(1)
  3. if (count.get() == 0)
  4. return null;
  5. //(2)
  6. final ReentrantLock takeLock = this.takeLock;
  7. takeLock.lock();
  8. try {
  9. Node<E> first = head.next;
  10. //(3)
  11. if (first == null)
  12. return null;
  13. else
  14. //(4)
  15. return first.item;
  16. } finally {
  17. //(5)
  18. takeLock.unlock();
  19. }
  20. }

peek 操作的代码也比较简单,这里需要注意的是,代码(3)这里还是需要判断 first 是否为 null,不能直接执行代码(4)。正常情况下执行到代码(2)说明队列不为空,但是代码(1)和(2)不是原子性操作,也就是在执行点(1)判断队列不空后,在代码(2)获取到锁前有可能其他线程执行了 poll 或者 take 操作导致队列变为空。然后当前线程获取锁后,直接执行代码(4)(first.item)会抛出空指针异常。

5.take 操作

获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。

  1. public E take() throws InterruptedException {
  2. E x
  3. int c = -1
  4. final AtomicInteger count = this.count
  5. //(1)获取锁
  6. final ReentrantLock takeLock = this.takeLock
  7. takeLock.lockInterruptibly();
  8. try {
  9. //(2)当前队列为空则阻塞挂起
  10. while count.get() == 0 {
  11. notEmpty.await();
  12. }
  13. //(3)出队并递减计数
  14. x = dequeue();
  15. c = count.getAndDecrement();
  16. //(4)
  17. if c > 1
  18. notEmpty.signal();
  19. } finally {
  20. //(5)
  21. takeLock.unlock();
  22. }
  23. //(6)
  24. if c == capacity
  25. signalNotFull();
  26. //(7)
  27. return x
  28. }

在代码(1)中,当前线程获取到独占锁,其他调用 take 或者 poll 操作的线程将会被阻塞挂起。

代码(2)判断如果队列为空则阻塞挂起当前线程,并把当前线程放入 notEmpty 的条件队列。

代码(3)进行出队操作并递减计数。

代码(4)判断如果 c>1 则说明当前队列不为空,那么唤醒 notEmpty 的条件队列里面的一个因为调用 take 操作而被阻塞的线程。

代码(5)释放锁。

代码(6)判断如果 c == capacity 则说明当前队列至少有一个空闲位置,那么激活条件变量 notFull 的条件队列里面的一个因为调用 put 操作而被阻塞的线程。

6.remove 操作

删除队列里面指定的元素,有则删除并返回 true,没有则返回 false。

  1. public boolean removeObject o {
  2. if o == null return false
  3. //(1)双重加锁
  4. fullyLock();
  5. try {
  6. //(2)遍历队列找到则删除并返回 true
  7. for Node<E> trail = head p = trail.next
  8. p = null
  9. trail = p p = p.next {
  10. //(3)
  11. if (o.equalsp.item)) {
  12. unlinkp trail);
  13. return true
  14. }
  15. }
  16. //(4)找不到则返回 false
  17. return false
  18. } finally {
  19. //(5)解锁
  20. fullyUnlock();
  21. }
  22. }

代码(1)通过 fullyLock 获取双重锁,获取后,其他线程进行入队或者出队操作时就会被阻塞挂起。

  1. void fullyLock() {
  2. putLock.lock();
  3. takeLock.lock();
  4. }

代码(2)遍历队列寻找要删除的元素,找不到则直接返回 false,找到则执行 unlink 操作。unlik 操作的代码如下。

  1. void unlinkNode<E> p, Node<E> trail {
  2. p.item = null
  3. trail.next = p.next
  4. if last == p
  5. last = trail
  6. //如果当前队列满,则删除后,也不忘记唤醒等待的线程
  7. if count.getAndDecrement() == capacity
  8. notFull.signal();
  9. }

删除元素后,如果发现当前队列有空闲空间,则唤醒 notFull 的条件队列中的一个因为调用 put 方法而被阻塞的线程。

代码(5)调用 fullyUnlock 方法使用与加锁顺序相反的顺序释放双重锁。

  1. void fullyUnlock() {
  2. takeLock.unlock();
  3. putLock.unlock();
  4. }

总结:由于 remove 方法在删除指定元素前加了两把锁,所以在遍历队列查找指定元素的过程中是线程安全的,并且此时其他调用入队、出队操作的线程全部会被阻塞。另外,获取多个资源锁的顺序与释放的顺序是相反的。

7.size 操作

获取当前队列元素个数。

  1. public int size() {
  2. return count.get();
  3. }

由于进行出队、入队操作时的 count 是加了锁的,所以结果相比 ConcurrentLinkedQueue 的 size 方法比较准确。这里考虑为何在 ConcurrentLinkedQueue 中需要遍历链表来获取 size 而不使用一个原子变量呢?这是因为使用原子变量保存队列元素个数需要保证入队、出队操作和原子变量操作是原子性操作,而 ConcurrentLinkedQueue 使用的是 CAS 无锁算法,所以无法做到这样。

小结

LinkedBlockingQueue 的内部是通过单向链表实现的,使用头、尾节点来进行入队和出队操作,也就是入队操作都是对尾节点进行操作,出队操作都是对头节点进行操作。

如图 7-29 所示,对头、尾节点的操作分别使用了单独的独占锁从而保证了原子性,所以出队和入队操作是可以同时进行的。另外对头、尾节点的独占锁都配备了一个条件队列,用来存放被阻塞的线程,并结合入队、出队操作实现了一个生产消费模型。

LinkedBlockingQueue 原理探究 - 图2