LinkedBlockingQueue 类结构图
- LinkedBlockingQueue 是一个单链表结构,其中的 head 和 last 分别表示头结点和尾节点。count 表示队列中元素的个数
- capacity 表示队列的容量,默认为 Integer.MAX_VALUE
- putLock 和 takeLock 分别用来控制元素入队和出队的原子性,其中 takeLock 用来控制同时只有一个线程可以从队列头获取元素,其他线程必须等待,putLock 控制同时只能有一个线程可以获取锁,在队列尾部添加元素,其他线程必须等待。另外,notEmpty 和 notFull 是条件变量,它们内部都有一个条件队列用来存放进队和出队时被阻塞的线程
LinkedBlockingQueue 实现原理
LinkedBlockingQueue 是一个基于单链表结构的阻塞队列,用两把锁分别控制队头和队尾的操作,因此也是 2 把锁 + 2 个条件的结构,同时还使用了一个 AtomicInteger 原子变量 count 来记录队列的元素个数。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
/** 队列容量 */
private final int capacity;
/**链表队列元素数量 */
private final AtomicInteger count = new AtomicInteger();
/** 单链表的头结点 */
transient Node<E> head;
/** 单链表的尾结点 */
private transient Node<E> last;
/** 执行 take, poll 等操作时需要获取该锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 当队列为空时,执行出队操作的线程会被放入该条件队列进行等待 */
private final Condition notEmpty = takeLock.newCondition();
/** 执行 put, offer 等操作时需要获取该锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 当队列已满时,执行入队操作的线程会被放入该条件队列进行等待 */
private final Condition notFull = putLock.newCondition();
}
offer(E) - 入队非阻塞操作
public boolean offer(E e) {
// (1)
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// (2)
if (count.get() == capacity)
return false;
int c = -1;
// (3)
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// (4)队列为满,直接将节点入队
if (count.get() < capacity) {
enqueue(node);
// count+1
c = count.getAndIncrement();
//(5) 队列未满,唤醒入队线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// 释放锁
putLock.unlock();
}
// (6)
if (c == 0)
signalNotEmpty();
return c >= 0;
}
private void enqueue(Node<E> node) {
// 1. last.next = node
// 2. last = last.next
last = last.next = node;
}
- 如果入队元素 e 为 null 则抛出 NullPointerException 异常
- 如果队列已满,则抛弃该元素,返回 false
- 构建新节点,并且尝试获取 putLock
- 如果队列不满则进队列,并递增元素计数
- 如果新元素入队后队列还有空闲空间,则唤醒 notFull 的条件队列里面因为调用了 notFull 的 await 操作(比如执行 put 方法而队列满了的时候)而被阻塞的一个线程,因为队列现在有空闲所以这里可以提前唤醒一个入队线程
c == 0
表示链表队列中在添加该元素之前没有元素,即长度为 0,如果之前有通过take()
阻塞方法获取元素由于队列为空而阻塞的线程,则执行signalNotEmpty()
进行唤醒;
signalNotEmpty()
的源码如下:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
该方法的作用就是激活 notEmpty 的条件队列中因为调用 notEmpty 的 await 方法(比如调用 take 方法并且队列为空的时候)而被阻塞的一个线程,这也说明了调用条件变量的方法前要获取对应的锁。
综上可知,offer 方法通过使用 putLock 锁保证了在队尾新增元素操作的原子性。另外,调用条件变量的方法前一定要记得获取对应的锁,并且注意进队时只操作队列链表的尾节点。
put(E) - 入队阻塞操作
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// (1)队列已满则阻塞 notFull 队列
while (count.get() == capacity) {
notFull.await();
}
// 唤醒之后入队
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
向队列尾部插入一个元素,如果队列中有空闲则插入后直接返回,如果队列已满则阻塞当前线程,直到队列有空闲插入成功后返回。如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。另外,如果 e 元素为 null 则抛出 NullPointerException 异常。
注意:(1)中的判断使用 while 循环而不是 if 语句,是因为考虑到当前线程被虚假唤醒的问题,也就是其他线程没有调用 notFull 的 singal 方法时 notFull.await()
在某种情况下会自动返回。
poll() - 移除元素,不阻塞操作
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// (1)队列不为空,出队并且递减计数
if (count.get() > 0) {(2)
x = dequeue();(3)
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// (4)
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
(1)处判断如果当前队列不为空则进行出队操作,然后递减计数器。注意:如何保证执行代码(2)时队列不空,而执行代码(3)时也一定不会空呢?毕竟这不是原子性操作,会不会出现代码(2)判断队列不为空,但是执行代码(3)时队列为空了呢?那么我们看在执行到代码(2)前在哪些地方会修改 count 的计数。由于当前线程已经拿到了 takeLock 锁,所以其他调用 poll 或者 take 方法的线程不可能会走到修改 count 计数的地方。其实这时候如果能走到修改 count 计数的地方是因为其他线程调用了 put 和 offer 操作,由于这两个操作不需要获取 takeLock 锁而获取的是 putLock 锁,但是在 put 和 offer 操作内部是增加 count 计数值的,所以不会出现上面所说的情况,即 count 一定是大于 0 的。其实只需要看在哪些地方递减了 count 计数值即可,只有递减了 count 计数值才会出现上面说的,执行代码(1)时队列不空,而执行代码(2)时队列为空的情况。我们查看代码,只有在 poll
、take
或者 remove
操作的地方会递减 count 计数值,但是这三个方法都需要获取到 takeLock 锁才能进行操作,而当前线程已经获取了 takeLock 锁,所以其他线程没有机会在当前情况下递减 count 计数值,所以看起来代码(1)、(2)不是原子性的,但是它们是线程安全的。
(4)处说明当前线程移除队头元素前当前队列是满的,移除队头元素后当前队列至少有一个空闲位置,那么这时候就可以调用 signalNotFull 激活因为调用 put 方法而被阻塞到 notFull 的条件队列里的一个线程。signalNotFull 的代码如下:
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
peek() - 获取元素,不阻塞操作
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
获取队列头部元素但是不从队列里面移除它,如果队列为空则返回 null。该方法是不阻塞的。
take() - 获取并移除元素,阻塞操作
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空则阻塞挂起
while (count.get() == 0) {
notEmpty.await();
}
// 元素出队并递减计数
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。
remove(Object) - 移除指定元素
public boolean remove(Object o) {
if (o == null) return false;
//(1)获取双重锁
fullyLock();
try {
for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) {
if (o.equals(p.item)) {
//(2)移除元素
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
删除队列里面指定的元素,有则删除并返回 true,没有则返回 false。通过 fullyLock()
获取双重锁,获取后,其他线程进行入队或者出队操作时就会被阻塞挂起。(2)中遍历队列寻找要删除的元素,找不到则直接返回 false,找到则执行 unlink 操作。unlik 操作的代码如下:
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
// 如果当前队列满,则删除后需要唤醒等待的线程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
删除元素后,如果发现当前队列有空闲空间,则唤醒 notFull 的条件队列中的一个因为调用 put 方法而被阻塞的线程。
由于 remove 方法在删除指定元素前加了两把锁,所以在遍历队列查找指定元素的过程中是线程安全的,并且此时其他调用入队、出队操作的线程全部会被阻塞。另外,获取多个资源锁的顺序与释放的顺序是相反的。