一个基于链表实现的阻塞队列,情况下,队列的默认大小为Integer.MAX_VALUE,所以也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁,添加元素和获取元素都有独立的锁,实现入队出队互不阻塞,读写操作可以并行执行。
原理分析
重要属性
/*** 单链表节点(只有next)*/static class Node<E> {//存储队列元素E item;Node<E> next;Node(E x) { item = x; }}/** 队列容量(不指定则为Integer.MAX_VALUE) */private final int capacity;/** 当前队列元素个数 */private final AtomicInteger count = new AtomicInteger();/*** 链表头指针 不存储任何元素的.* 初始化: head.item == null*/transient Node<E> head;/*** 链表尾指针 不存储任何元素的..* 初始化: last.next == null*/private transient Node<E> last;/** 消费者锁 */private final ReentrantLock takeLock = new ReentrantLock();/** 消费者条件队列:队列空时,take线程会阻塞在notEmpty上,等待其它线程唤醒 */private final Condition notEmpty = takeLock.newCondition();/** 生产者锁 */private final ReentrantLock putLock = new ReentrantLock();/** 生产者条件队列:队列满时,put线程会阻塞在notFull上,等待其它线程唤醒 */private final Condition notFull = putLock.newCondition();
put入队
public void put(E e) throws InterruptedException {//判空if (e == null) throw new NullPointerException();// 将元素封装成队列节点int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//可中断锁putLock.lockInterruptibly();try {//若队列满,则阻塞在notFull上等待被其它线程唤醒//while:若唤醒后,唤醒条件变化,则再次阻塞while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();// 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)// 此处执行唤醒原因?// 可能有很多线程阻塞在notFull条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)//完成新元素插入。唤醒因为队列为空,阻塞的线程signalNotEmpty();}//将新节点,插入链表尾部private void enqueue(Node<E> node) {// assert putLock.isHeldByCurrentThread();// assert last.next == null;last = last.next = node;}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程notEmpty.signal();} finally {//唤醒 同步队列中元素takeLock.unlock();}}
take方法
//元素出队public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//可中断锁takeLock.lockInterruptibly();try {//队列元素为空,则出队线程阻塞,进入条件队列while (count.get() == 0) {notEmpty.await();}//出队具体操作x = dequeue();//队列元素减一c = count.getAndDecrement();//队列中元素不为空,则唤醒因队列为空,阻塞的出队线程if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}// 为什么队列是满的,仍然:唤醒阻塞在notFull上的线程呢?(值得深思)// 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,此处进行尝试,未满就唤醒其它notFull上的线程,// 这也是锁分离带来的代价// 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程if (c == capacity)//已经有元素出队,则可以唤醒因队列元素满,而阻塞的入队线程signalNotFull();return x;}//删除head节点,将head下一个元素,改造成head节点private E dequeue() {// assert takeLock.isHeldByCurrentThread();// assert head.item == null;Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}
