前言
LinkedBlockingQueue 不同于 ArrayBlockingQueue,它如果不指定容量(默认构造函数),容量被默认为 Integer.MAX_VALUE,也就是无界队列。为了避免队列过大造成机器负载或者内存爆满的情况,我们在使用的时候建议手动传一个队列的大小。
源码分析
成员变量
/**
* 节点类,用于存储数据
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 阻塞队列的大小,默认为Integer.MAX_VALUE */
private final int capacity;
/** 当前阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();
/**
* 阻塞队列的头结点
*/
transient Node<E> head;
/**
* 阻塞队列的尾节点
*/
private transient Node<E> last;
/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();
/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();
从上面的属性可知,每个添加到 LinkedBlockingQueue 队列中的数据都被封装为 Node 节点,并添加到队列的尾部。其中 head 和 last 分别指向队列的头节点和尾节点。
与 ArrayBlockingQueue 不同的是,LinkedBlockingQueue 内部分别使用了 takeLock 和 putLock 对并发进行控制。添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。
另外,LinkedBlockingQueue 对每一个 lock 锁都提供了一个 Condition 用来挂起和唤醒其他线程。
构造函数
public LinkedBlockingQueue() {
// 默认大小为Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}
// head 和 last 在初始化的时候都是null
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
值得注意的是,head 和 last 在初始化的时候都是 null。
offer
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 如果容量已满,直接返回 false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 队列有可用空间,放入node节点,判断放入元素后是否还有可用空间,
// 如果有,唤醒下一个添加线程进行添加操作。
if (count.get() < capacity) {
enqueue(node);
// count 自增,并返回自增前的值
c = count.getAndIncrement();
// 队列还有空间,通知其它生产者线程,继续添加元素
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 这时候释放生产者的独占锁,因为下面要获取消费者的独占锁,为了防止死锁发生,先释放这把锁
// 加入队列前,队列是空的
if (c == 0)
signalNotEmpty();
return c >= 0;
}
// 添加元素到队列尾部,并更新 last 指针
private void enqueue(Node<E> node) {
last = last.next = node;
}
// 唤醒消费者,此方法只能被 put/offer 方法调用
private void signalNotEmpty() {
// 获取到消费者的锁,然后进行唤醒
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 队列不为空,唤醒消费者
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
// 唤醒生产者,此方法只能被 take/poll 方法调用
private void signalNotFull() {
// 获取到生产者的锁,然后进行唤醒
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 队列还有空间,唤醒生产者
notFull.signal();
} finally {
putLock.unlock();
}
}
为什么要判断 if (c == 0)
?
生产者可以唤醒其它生产者线程,不停地添加元素。同理,消费者也可以唤醒其它消费者线程,不停地消费元素。
如果队列此时为空,消费者那边肯定是阻塞或未启动的状态,生产者这边添加第一个元素后,必须去唤醒消费者,所以要去判断 if (c == 0)
,最根本的原因就是提高程序运行的速度。
那为什么添加一个元素后,c 会等于 0 呢?这是因为 AtomicInteger 调用 getAndIncrement 方法后返回的是自增前的值。
为什么调用 signalNotEmpty
方法的时候,需要去获取消费者的锁?
这是因为消费者消费(take)的时候,必须先获取到锁,才能消费元素,如果队列没有元素消费,就会调用 Condition 的 await 方法进行阻塞。也就是说,使用 Condition 前,必须先获取到独占锁。
所以,我们在唤醒消费者前,也必须先获取到消费者的独占锁,然后唤醒消费者。
添加元素其它方法
put 方法和上面的 offer 方法差不多一样。如果队列容量已满,上面的 offer 方法直接返回 false,put 方法会一直阻塞,直到被唤醒。
offer 方法提供了超时阻塞的方法,如果超时后,队列容量依然不能满足插入,直接返回 false。
poll
执行流程如下:
- 队列为空,阻塞等待;
- 队列不为空,从队首获取并移除一个元素,如果消费后还有元素在队列中,继续唤醒下一个消费线程进行元素移除。如果放之前队列是满元素的情况,移除完后要唤醒生产线程进行添加元素。
public E poll() {
final AtomicInteger count = this.count;
// 队列为空,直接返回 null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
// 出队
x = dequeue();
// 递减,返回之前的值
c = count.getAndDecrement();
// 如果队列中还有元素,唤醒其它消费者线程
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 如果消费时发现容量满了,提醒生产者添加元素
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// 获取到head节点
Node<E> h = head;
// 获取到head节点指向的下一个节点
Node<E> first = h.next;
// head节点原来指向的节点的next指向自己,等待下次gc回收
h.next = h; // help GC
// head节点指向新的节点
head = first;
// 获取到新的head节点的item值
E x = first.item;
// 新head节点的item值设置为null
first.item = null;
return x;
}
remove 方法
public boolean remove(Object o) {
if (o == null) return false;
// 两个lock全部上锁
fullyLock();
try {
// 从head开始遍历元素,直到最后一个元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// 如果找到相等的元素,调用unlink方法删除元素
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
// 两个lock全部解锁
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
void unlink(Node<E> p, Node<E> trail) {
// p的元素置为null
p.item = null;
// p的前一个节点的next指向p的next,也就是把p从链表中去除了
trail.next = p.next;
// 如果last指向p,删除p后让last指向trail
if (last == p)
last = trail;
// 如果删除之前元素是满的,删除之后就有空间了,唤醒生产线程放入元素
if (count.getAndDecrement() == capacity)
notFull.signal();
}
总结
LinkedBlockingQueue 是一个阻塞队列,内部由两个 ReentrantLock 来实现出入队列的线程安全,由各自的 Condition 对象的 await 和 signal 来实现等待和唤醒功能。
它和 ArrayBlockingQueue 的不同点在于:
队列大小有所不同。ArrayBlockingQueue 是有界的初始化必须指定大小,而 LinkedBlockingQueue 可以是有界的也可以是无界的 (Integer.MAX_VALUE) ,对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
数据存储容器不同。ArrayBlockingQueue 采用的是数组作为数据存储容器,而 LinkedBlockingQueue 采用的则是以 Node 节点作为连接对象的链表。
由于 ArrayBlockingQueue 采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而 LinkedBlockingQueue 则会生成一个额外的 Node 对象。
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个 ReenterLock 锁,而 LinkedBlockingQueue 实现的队列中的锁是分离的,其添加采用的是 putLock,移除采用的则是 takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。