一个基于链表实现的阻塞队列,情况下,队列的默认大小为Integer.MAX_VALUE,所以也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁,添加元素和获取元素都有独立的锁,实现入队出队互不阻塞,读写操作可以并行执行

原理分析

重要属性

  1. /**
  2. * 单链表节点(只有next)
  3. */
  4. static class Node<E> {
  5. //存储队列元素
  6. E item;
  7. Node<E> next;
  8. Node(E x) { item = x; }
  9. }
  10. /** 队列容量(不指定则为Integer.MAX_VALUE) */
  11. private final int capacity;
  12. /** 当前队列元素个数 */
  13. private final AtomicInteger count = new AtomicInteger();
  14. /**
  15. * 链表头指针 不存储任何元素的.
  16. * 初始化: head.item == null
  17. */
  18. transient Node<E> head;
  19. /**
  20. * 链表尾指针 不存储任何元素的..
  21. * 初始化: last.next == null
  22. */
  23. private transient Node<E> last;
  24. /** 消费者锁 */
  25. private final ReentrantLock takeLock = new ReentrantLock();
  26. /** 消费者条件队列:队列空时,take线程会阻塞在notEmpty上,等待其它线程唤醒 */
  27. private final Condition notEmpty = takeLock.newCondition();
  28. /** 生产者锁 */
  29. private final ReentrantLock putLock = new ReentrantLock();
  30. /** 生产者条件队列:队列满时,put线程会阻塞在notFull上,等待其它线程唤醒 */
  31. private final Condition notFull = putLock.newCondition();

put入队

  1. public void put(E e) throws InterruptedException {
  2. //判空
  3. if (e == null) throw new NullPointerException();
  4. // 将元素封装成队列节点
  5. int c = -1;
  6. Node<E> node = new Node<E>(e);
  7. final ReentrantLock putLock = this.putLock;
  8. final AtomicInteger count = this.count;
  9. //可中断锁
  10. putLock.lockInterruptibly();
  11. try {
  12. //若队列满,则阻塞在notFull上等待被其它线程唤醒
  13. //while:若唤醒后,唤醒条件变化,则再次阻塞
  14. while (count.get() == capacity) {
  15. notFull.await();
  16. }
  17. enqueue(node);
  18. c = count.getAndIncrement();
  19. // 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)
  20. // 此处执行唤醒原因?
  21. // 可能有很多线程阻塞在notFull条件上,而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒
  22. if (c + 1 < capacity)
  23. notFull.signal();
  24. } finally {
  25. putLock.unlock();
  26. }
  27. if (c == 0)
  28. //完成新元素插入。唤醒因为队列为空,阻塞的线程
  29. signalNotEmpty();
  30. }
  31. //将新节点,插入链表尾部
  32. private void enqueue(Node<E> node) {
  33. // assert putLock.isHeldByCurrentThread();
  34. // assert last.next == null;
  35. last = last.next = node;
  36. }
  37. private void signalNotEmpty() {
  38. final ReentrantLock takeLock = this.takeLock;
  39. takeLock.lock();
  40. try {
  41. // notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
  42. notEmpty.signal();
  43. } finally {
  44. //唤醒 同步队列中元素
  45. takeLock.unlock();
  46. }
  47. }

take方法

  1. //元素出队
  2. public E take() throws InterruptedException {
  3. E x;
  4. int c = -1;
  5. final AtomicInteger count = this.count;
  6. final ReentrantLock takeLock = this.takeLock;
  7. //可中断锁
  8. takeLock.lockInterruptibly();
  9. try {
  10. //队列元素为空,则出队线程阻塞,进入条件队列
  11. while (count.get() == 0) {
  12. notEmpty.await();
  13. }
  14. //出队具体操作
  15. x = dequeue();
  16. //队列元素减一
  17. c = count.getAndDecrement();
  18. //队列中元素不为空,则唤醒因队列为空,阻塞的出队线程
  19. if (c > 1)
  20. notEmpty.signal();
  21. } finally {
  22. takeLock.unlock();
  23. }
  24. // 为什么队列是满的,仍然:唤醒阻塞在notFull上的线程呢?(值得深思)
  25. // 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,此处进行尝试,未满就唤醒其它notFull上的线程,
  26. // 这也是锁分离带来的代价
  27. // 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程
  28. if (c == capacity)
  29. //已经有元素出队,则可以唤醒因队列元素满,而阻塞的入队线程
  30. signalNotFull();
  31. return x;
  32. }
  33. //删除head节点,将head下一个元素,改造成head节点
  34. private E dequeue() {
  35. // assert takeLock.isHeldByCurrentThread();
  36. // assert head.item == null;
  37. Node<E> h = head;
  38. Node<E> first = h.next;
  39. h.next = h; // help GC
  40. head = first;
  41. E x = first.item;
  42. first.item = null;
  43. return x;
  44. }
  45. private void signalNotFull() {
  46. final ReentrantLock putLock = this.putLock;
  47. putLock.lock();
  48. try {
  49. notFull.signal();
  50. } finally {
  51. putLock.unlock();
  52. }
  53. }