一个基于链表实现的阻塞队列,情况下,队列的默认大小为Integer.MAX_VALUE,所以也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁,添加元素和获取元素都有独立的锁,实现入队出队互不阻塞,读写操作可以并行执行。
原理分析
重要属性
/**
* 单链表节点(只有next)
*/
static class Node<E> {
//存储队列元素
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 队列容量(不指定则为Integer.MAX_VALUE) */
private final int capacity;
/** 当前队列元素个数 */
private final AtomicInteger count = new AtomicInteger();
/**
* 链表头指针 不存储任何元素的.
* 初始化: head.item == null
*/
transient Node<E> head;
/**
* 链表尾指针 不存储任何元素的..
* 初始化: last.next == null
*/
private transient Node<E> last;
/** 消费者锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 消费者条件队列:队列空时,take线程会阻塞在notEmpty上,等待其它线程唤醒 */
private final Condition notEmpty = takeLock.newCondition();
/** 生产者锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 生产者条件队列:队列满时,put线程会阻塞在notFull上,等待其它线程唤醒 */
private final Condition notFull = putLock.newCondition();
put入队
public void put(E e) throws InterruptedException {
//判空
if (e == null) throw new NullPointerException();
// 将元素封装成队列节点
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//可中断锁
putLock.lockInterruptibly();
try {
//若队列满,则阻塞在notFull上等待被其它线程唤醒
//while:若唤醒后,唤醒条件变化,则再次阻塞
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
// 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)
// 此处执行唤醒原因?
// 可能有很多线程阻塞在notFull条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
//完成新元素插入。唤醒因为队列为空,阻塞的线程
signalNotEmpty();
}
//将新节点,插入链表尾部
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
notEmpty.signal();
} finally {
//唤醒 同步队列中元素
takeLock.unlock();
}
}
take方法
//元素出队
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//可中断锁
takeLock.lockInterruptibly();
try {
//队列元素为空,则出队线程阻塞,进入条件队列
while (count.get() == 0) {
notEmpty.await();
}
//出队具体操作
x = dequeue();
//队列元素减一
c = count.getAndDecrement();
//队列中元素不为空,则唤醒因队列为空,阻塞的出队线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 为什么队列是满的,仍然:唤醒阻塞在notFull上的线程呢?(值得深思)
// 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,此处进行尝试,未满就唤醒其它notFull上的线程,
// 这也是锁分离带来的代价
// 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程
if (c == capacity)
//已经有元素出队,则可以唤醒因队列元素满,而阻塞的入队线程
signalNotFull();
return x;
}
//删除head节点,将head下一个元素,改造成head节点
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}