LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE,由于这个数值特别大,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限,队列可以随着元素的添加而动态增长,但是如果没有剩余内存,则队列将抛出OOM错误.
LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。LinkedBlockingQueue采用两把锁的锁分离技术实现入队出队互不阻塞,添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行
截屏2022-03-30 22.31.15.png

LinkedBlockingQueue的原理

数据结构

  1. //容量,队列的容量
  2. private final int capacity;
  3. //元素数量
  4. private final AtomicInteger count = new AtomicInteger();
  5. /**
  6. * Head of linked list.
  7. * Invariant: head.item == null
  8. */
  9. //链表头节点 ,本身是不存储任何元素的,初始化时item指向null
  10. transient Node<E> head;
  11. /**
  12. * Tail of linked list.
  13. * Invariant: last.next == null
  14. */
  15. //链表尾节点
  16. private transient Node<E> last;
  17. /** Lock held by take, poll, etc */
  18. // take锁,从队列取任务。锁分离,提高效率
  19. private final ReentrantLock takeLock = new ReentrantLock();
  20. /** Wait queue for waiting takes */
  21. //当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
  22. private final Condition notEmpty = takeLock.newCondition();
  23. /** Lock held by put, offer, etc */
  24. //put锁,往队列放任务
  25. private final ReentrantLock putLock = new ReentrantLock();
  26. /** Wait queue for waiting puts */
  27. //当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
  28. private final Condition notFull = putLock.newCondition();
  29. static class Node<E> {//单链表
  30. E item;//节点元素
  31. Node<E> next;//下一个节点
  32. Node(E x) { item = x; }
  33. }

构造器

  1. public LinkedBlockingQueue() {
  2. //队列长度默认值是int最大值
  3. this(Integer.MAX_VALUE);
  4. }
  5. public LinkedBlockingQueue(int capacity) {
  6. if (capacity <= 0) throw new IllegalArgumentException();
  7. this.capacity = capacity;
  8. //首尾节点都置为空节点
  9. last = head = new Node<E>(null);
  10. }

入队put方法

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();
  3. // Note: convention in all put/take/etc is to preset local var
  4. // holding count negative to indicate failure unless set.
  5. int c = -1;
  6. // 新建一个节点
  7. Node<E> node = new Node<E>(e);
  8. final ReentrantLock putLock = this.putLock;
  9. final AtomicInteger count = this.count;
  10. // 使用put锁加锁,可以被中断
  11. putLock.lockInterruptibly();
  12. try {
  13. //如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)
  14. while (count.get() == capacity) {
  15. notFull.await();
  16. }
  17. //入队
  18. enqueue(node);
  19. c = count.getAndIncrement();
  20. //如果现队列长度小于容量,notFull条件队列转同步队列,
  21. //准备唤醒一个阻塞在notFull条件上的线程(可以继续入队)
  22. //因为可能有很多线程阻塞在notFull这个条件上,
  23. //而取元素时只有取之前队列是满的才会唤醒notFull,此处不用等到取元素时才唤醒
  24. if (c + 1 < capacity)
  25. notFull.signal();
  26. } finally {
  27. putLock.unlock();// 真正唤醒生产者线程
  28. }
  29. // 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程
  30. if (c == 0)
  31. signalNotEmpty();
  32. }
  33. private void signalNotEmpty() {
  34. final ReentrantLock takeLock = this.takeLock;
  35. //加take锁
  36. takeLock.lock();
  37. try {
  38. notEmpty.signal();// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
  39. } finally {
  40. takeLock.unlock();// 真正唤醒消费者线程
  41. }
  42. }
  43. private void enqueue(Node<E> node) {
  44. // assert putLock.isHeldByCurrentThread();
  45. // assert last.next == null;
  46. //把新节点指向last的下一个节点,再把新节点设置为last节点
  47. last = last.next = node;
  48. }

出队take方法

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. final ReentrantLock takeLock = this.takeLock;
  6. // 使用takeLock加锁
  7. takeLock.lockInterruptibly();
  8. try {
  9. // 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)
  10. while (count.get() == 0) {
  11. notEmpty.await();
  12. }
  13. //出队
  14. x = dequeue();
  15. c = count.getAndDecrement();//长度-1,返回原值
  16. // 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
  17. if (c > 1)
  18. notEmpty.signal();
  19. } finally {
  20. takeLock.unlock();// 真正唤醒消费者线程
  21. }
  22. //如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程
  23. if (c == capacity)
  24. signalNotFull();
  25. return x;
  26. }
  27. private void signalNotFull() {
  28. final ReentrantLock putLock = this.putLock;
  29. putLock.lock();
  30. try {
  31. notFull.signal();// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程
  32. } finally {
  33. putLock.unlock();// 解锁,这才会真正的唤醒生产者线程
  34. }
  35. }

LinkedBlockingQueue与ArrayBlockingQueue对比

  • 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题
  • 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表
  • 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响
  • 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能