阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。
    这两个附加操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。
    阻塞队列在实现上,主要是要利用了ConditionLock的等待通知模式。

    LinkedBlockingQueue原码:

    • 部分属性

      1. /**
      2. * Tail of linked list.
      3. * Invariant: last.next == null
      4. */
      5. private transient Node<E> last;
      6. /** Lock held by take, poll, etc */
      7. private final ReentrantLock takeLock = new ReentrantLock();
      8. /** Wait queue for waiting takes */
      9. private final Condition notEmpty = takeLock.newCondition();
      10. /** Lock held by put, offer, etc */
      11. private final ReentrantLock putLock = new ReentrantLock();
      12. /** Wait queue for waiting puts */
      13. private final Condition notFull = putLock.newCondition();
    • put()方法

      1. /**
      2. * Inserts the specified element at the tail of this queue, waiting if
      3. * necessary for space to become available.
      4. *
      5. * @throws InterruptedException {@inheritDoc}
      6. * @throws NullPointerException {@inheritDoc}
      7. */
      8. public void put(E e) throws InterruptedException {
      9. if (e == null) throw new NullPointerException();
      10. // Note: convention in all put/take/etc is to preset local var
      11. // holding count negative to indicate failure unless set.
      12. int c = -1;
      13. Node<E> node = new Node<E>(e);
      14. final ReentrantLock putLock = this.putLock;
      15. final AtomicInteger count = this.count;
      16. putLock.lockInterruptibly();
      17. try {
      18. /*
      19. * Note that count is used in wait guard even though it is
      20. * not protected by lock. This works because count can
      21. * only decrease at this point (all other puts are shut
      22. * out by lock), and we (or some other waiting put) are
      23. * signalled if it ever changes from capacity. Similarly
      24. * for all other uses of count in other wait guards.
      25. */
      26. while (count.get() == capacity) {
      27. notFull.await();
      28. }
      29. enqueue(node);
      30. c = count.getAndIncrement();
      31. if (c + 1 < capacity)
      32. notFull.signal();
      33. } finally {
      34. putLock.unlock();
      35. }
      36. if (c == 0)
      37. signalNotEmpty();
      38. }
    • take()方法

      1. public E take() throws InterruptedException {
      2. E x;
      3. int c = -1;
      4. final AtomicInteger count = this.count;
      5. final ReentrantLock takeLock = this.takeLock;
      6. takeLock.lockInterruptibly();
      7. try {
      8. while (count.get() == 0) {
      9. notEmpty.await();
      10. }
      11. x = dequeue();
      12. c = count.getAndDecrement();
      13. if (c > 1)
      14. notEmpty.signal();
      15. } finally {
      16. takeLock.unlock();
      17. }
      18. if (c == capacity)
      19. signalNotFull();
      20. return x;
      21. }