以单链表实现的阻塞队列,它是线程安全的,借助分段的思想把入队出队分裂成两个锁
LinkedBlockingQueue 默认长度是 Integer.MAX_VALUE,所以可以认为它是无界队列,但是无界队列有个缺点,很容易造成 OOM
成员变量
// 容量,有界队列private final int capacity;// 元素数量private final AtomicInteger count = new AtomicInteger();// 链表头 本身是不存储任何元素的,初始化时item指向nulltransient Node<E> head;// 链表尾 last.next == nullprivate transient Node<E> last;// take锁,锁分离,提高效率private final ReentrantLock takeLock = new ReentrantLock();// notEmpty条件// 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒private final Condition notEmpty = takeLock.newCondition();// put锁,锁分离,提高效率private final ReentrantLock putLock = new ReentrantLock();// notFull条件// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒private final Condition notFull = putLock.newCondition();static class Node<E> {E item;//存储元素Node<E> next;//后继元素 单链表结构Node(E x) { item = x; }}
构造器
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);// 一定是有界的}public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);// 初始化head和last指针为空值节点,head和last指向同一个对象}public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);final ReentrantLock putLock = this.putLock;putLock.lock();try {int n = 0;for (E e : c) {if (e == null)throw new NullPointerException();if (n == capacity)throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));++n;}count.set(n);} finally {putLock.unlock();}}
入队
public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// 不允许null元素int c = -1;Node<E> node = new Node<E>(e);// 新建一个节点final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();// 使用put锁加锁try {// 如果队列满了,就阻塞在notFull条件上等待被其它线程唤醒while (count.get() == capacity) {notFull.await();}enqueue(node);// 队列不满了,就入队c = count.getAndIncrement();// 队列长度加1,返回原值// 如果现队列长度如果小于容量,就唤醒一个阻塞在notFull条件上的线程(可以继续入队)// 这里为啥要唤醒一下呢?// 因为可能有很多线程阻塞在notFull这个条件上的,而取元素时只有取之前队列是满的才会唤醒notFull,不用等到取元素时才唤醒// 为什么队列满的才唤醒notFull呢?// 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,说白了,这也是锁分离带来的代价if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)// 如果原队列长度为0,现在加了一个元素后立即唤醒notEmpty条件signalNotEmpty();}private void enqueue(Node<E> node) {last = last.next = node;// 直接加到last后面,last指向入队元素,如果是第一次入队head.next也会指向入队元素}private void signalNotEmpty() {final ReentrantLock takeLock = this.takeLock;takeLock.lock();// 加take锁try {notEmpty.signal();// 唤醒notEmpty条件} finally {takeLock.unlock();}}
- 使用putLock加锁;
- 如果队列满了就阻塞在notFull条件上,等待被唤醒,继续执行;
- 否则就入队;
- 如果入队后元素数量小于容量,唤醒其它阻塞在notFull条件上的线程;
- 释放锁;
-
出队
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();// 使用takeLock加锁try {while (count.get() == 0) {// 如果队列无元素,则阻塞在notEmpty条件上notEmpty.await();}x = dequeue();// 否则,出队c = count.getAndDecrement();//长度-1,返回原值if (c > 1)// 如果取之前队列长度大于1,则唤醒notEmpty 原因与入队同理notEmpty.signal();} finally {takeLock.unlock();}// 如果取之前队列长度等于容量(已满),则唤醒notFullif (c == capacity)signalNotFull();return x;}private E dequeue() {// 这里把head删除,并把head.next作为出队元素// 并把其值置空,返回原来的值Node<E> h = head;Node<E> first = h.next;h.next = h; // 方便GChead = first;E x = first.item;first.item = null;//head.item还是指向null,head.next指向了新的出队元素return x;}private void signalNotFull() {final ReentrantLock putLock = this.putLock;putLock.lock();try {notFull.signal();} finally {putLock.unlock();}}
使用takeLock加锁;
- 如果队列空了就阻塞在notEmpty条件上;
- 否则就出队;
- 如果出队前元素数量大于1,唤醒其它阻塞在notEmpty条件上的线程;
- 释放锁;
- 如果取元素之前队列长度等于容量,就唤醒notFull条件;
删除元素
public boolean remove(Object o) {if (o == null) return false;fullyLock(); //此时将入队锁和出队锁全部锁住来保证线程安全try {for (Node<E> trail = head, p = trail.next;p != null;trail = p, p = p.next) {// 循环遍历查找值相等的元素if (o.equals(p.item)) {unlink(p, trail);//调用unlink删除此节点return true;//操作成功返回true}}return false;} finally {fullyUnlock();}}void unlink(Node<E> p, Node<E> trail) {//p为要删除节点,trail为删除节点的前一个节点p.item = null;trail.next = p.next; // 改变指针将前一节点的后继节点指向删除节点的后一个节点if (last == p)last = trail;if (count.getAndDecrement() == capacity)notFull.signal();}
