LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认大小是Integer.MAX_VALUE

关键的成员变量:

  1. // 链表头 本身是不存储任何元素的,初始化时item指向null
  2. transient Node<E> head;
  3. // 链表尾
  4. private transient Node<E> last;
  5. // take锁 锁分离,提高效率
  6. private final ReentrantLock takeLock = new ReentrantLock();
  7. // notEmpty条件
  8. // 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
  9. private final Condition notEmpty = takeLock.newCondition();
  10. // put锁
  11. private final ReentrantLock putLock = new ReentrantLock();
  12. // notFull条件
  13. // 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
  14. private final Condition notFull = putLock.newCondition();

构造方法中构造了链表结构

  1. public LinkedBlockingQueue(int capacity) {
  2. if (capacity <= 0) throw new IllegalArgumentException();
  3. this.capacity = capacity;
  4. // 初始化head和last指针为空值节点
  5. last = head = new Node<E>(null);
  6. }

入队put方法

  1. public void put(E e) throws InterruptedException {
  2. // 不允许null元素
  3. if (e == null) throw new NullPointerException();
  4. int c = 1;
  5. // 新建一个节点
  6. Node<E> node = new Node<E>(e);
  7. final ReentrantLock putLock = this.putLock;
  8. final AtomicInteger count = this.count;
  9. // 使用put锁加锁
  10. putLock.lockInterruptibly();
  11. try {
  12. // 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)
  13. while (count.get() == capacity) {
  14. notFull.await();
  15. }
  16. // 队列不满,就入队
  17. enqueue(node);
  18. c = count.getAndIncrement();// 队列长度加1,返回原值
  19. // 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条
  20. 件上的线程(可以继续入队)
  21. // 这里为啥要唤醒一下呢?
  22. // 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才会唤
  23. notFull,此处不用等到取元素时才唤醒
  24. if (c + 1 < capacity)
  25. notFull.signal();
  26. } finally {
  27. putLock.unlock(); // 真正唤醒生产者线程
  28. }
  29. // 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程
  30. if (c == 0)
  31. signalNotEmpty();
  32. }
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();// 加take锁
    try {
     notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
    } finally {
    takeLock.unlock(); // 真正唤醒消费者线程
    }
}

链表入队,操作的是last节点

private void enqueue(Node<E> node) {
    // 直接加到last后面,last指向入队元素
    last = last.next = node;
}

出队take方法

public E take() throws InterruptedException {
    E x;
    int c = ‐1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 使用takeLock加锁
    takeLock.lockInterruptibly();
    try {
    // 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)
    while (count.get() == 0) {
      notEmpty.await();
    }
    // 否则,出队
    x = dequeue();
    c = count.getAndDecrement();//长度‐1,返回原值
    if (c > 1)
        // 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程,原因与入队同理
        notEmpty.signal();
    } finally {
        takeLock.unlock(); // 真正唤醒消费者线程
    }
    // 为什么队列是满的才唤醒阻塞在notFull上的线程呢?
    // 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,
    // 这也是锁分离带来的代价
   if (c == capacity)
     signalNotFull();
     return x;
    }
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
      notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程
    } finally {
      putLock.unlock(); // 解锁,这才会真正的唤醒生产者线程
   }
}

链表出队,操作head节点,与入队操作一起保证队列的先进先出

private E dequeue() {
    // head节点本身是不存储任何元素的
    // 这里把head删除,并把head下一个节点作为新的值
    // 并把其值置空,返回原来的值
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // 方便GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

LinkedBlockingQueue与ArrayBlockingQueue对比

1)存储容器数据结构不同,ArrayBlockingQueue采用的是数组,而 LinkedBlockingQueue采用的是链表。
2)队列大小不同,ArrayBlockingQueue是有界的初始化指定大小,而 LinkedBlockingQueue默认是Integer.MAX_VALUE,也可以指定大小
3)ArrayBlockingQueue添加操作和移除操作采用的同一个ReenterLock锁,而 LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用 的则是takeLock,这样能大大提高队列的吞吐量