以单链表实现的阻塞队列,它是线程安全的,借助分段的思想把入队出队分裂成两个锁

LinkedBlockingQueue 默认长度是 Integer.MAX_VALUE,所以可以认为它是无界队列,但是无界队列有个缺点,很容易造成 OOM

成员变量

  1. // 容量,有界队列
  2. private final int capacity;
  3. // 元素数量
  4. private final AtomicInteger count = new AtomicInteger();
  5. // 链表头 本身是不存储任何元素的,初始化时item指向null
  6. transient Node<E> head;
  7. // 链表尾 last.next == null
  8. private transient Node<E> last;
  9. // take锁,锁分离,提高效率
  10. private final ReentrantLock takeLock = new ReentrantLock();
  11. // notEmpty条件
  12. // 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
  13. private final Condition notEmpty = takeLock.newCondition();
  14. // put锁,锁分离,提高效率
  15. private final ReentrantLock putLock = new ReentrantLock();
  16. // notFull条件
  17. // 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
  18. private final Condition notFull = putLock.newCondition();
  19. static class Node<E> {
  20. E item;//存储元素
  21. Node<E> next;//后继元素 单链表结构
  22. Node(E x) { item = x; }
  23. }

构造器

  1. public LinkedBlockingQueue() {
  2. this(Integer.MAX_VALUE);// 一定是有界的
  3. }
  4. public LinkedBlockingQueue(int capacity) {
  5. if (capacity <= 0) throw new IllegalArgumentException();
  6. this.capacity = capacity;
  7. last = head = new Node<E>(null);// 初始化head和last指针为空值节点,head和last指向同一个对象
  8. }
  9. public LinkedBlockingQueue(Collection<? extends E> c) {
  10. this(Integer.MAX_VALUE);
  11. final ReentrantLock putLock = this.putLock;
  12. putLock.lock();
  13. try {
  14. int n = 0;
  15. for (E e : c) {
  16. if (e == null)
  17. throw new NullPointerException();
  18. if (n == capacity)
  19. throw new IllegalStateException("Queue full");
  20. enqueue(new Node<E>(e));
  21. ++n;
  22. }
  23. count.set(n);
  24. } finally {
  25. putLock.unlock();
  26. }
  27. }

入队

  1. public void put(E e) throws InterruptedException {
  2. if (e == null) throw new NullPointerException();// 不允许null元素
  3. int c = -1;
  4. Node<E> node = new Node<E>(e);// 新建一个节点
  5. final ReentrantLock putLock = this.putLock;
  6. final AtomicInteger count = this.count;
  7. putLock.lockInterruptibly();// 使用put锁加锁
  8. try {
  9. // 如果队列满了,就阻塞在notFull条件上等待被其它线程唤醒
  10. while (count.get() == capacity) {
  11. notFull.await();
  12. }
  13. enqueue(node);// 队列不满了,就入队
  14. c = count.getAndIncrement();// 队列长度加1,返回原值
  15. // 如果现队列长度如果小于容量,就唤醒一个阻塞在notFull条件上的线程(可以继续入队)
  16. // 这里为啥要唤醒一下呢?
  17. // 因为可能有很多线程阻塞在notFull这个条件上的,而取元素时只有取之前队列是满的才会唤醒notFull,不用等到取元素时才唤醒
  18. // 为什么队列满的才唤醒notFull呢?
  19. // 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一下,未满就唤醒其它notFull上的线程,说白了,这也是锁分离带来的代价
  20. if (c + 1 < capacity)
  21. notFull.signal();
  22. } finally {
  23. putLock.unlock();
  24. }
  25. if (c == 0)// 如果原队列长度为0,现在加了一个元素后立即唤醒notEmpty条件
  26. signalNotEmpty();
  27. }
  28. private void enqueue(Node<E> node) {
  29. last = last.next = node;// 直接加到last后面,last指向入队元素,如果是第一次入队head.next也会指向入队元素
  30. }
  31. private void signalNotEmpty() {
  32. final ReentrantLock takeLock = this.takeLock;
  33. takeLock.lock();// 加take锁
  34. try {
  35. notEmpty.signal();// 唤醒notEmpty条件
  36. } finally {
  37. takeLock.unlock();
  38. }
  39. }
  • 使用putLock加锁;
  • 如果队列满了就阻塞在notFull条件上,等待被唤醒,继续执行;
  • 否则就入队;
  • 如果入队后元素数量小于容量,唤醒其它阻塞在notFull条件上的线程;
  • 释放锁;
  • 如果放元素之前队列长度为0,就唤醒notEmpty条件;

    出队

    1. public E take() throws InterruptedException {
    2. E x;
    3. int c = -1;
    4. final AtomicInteger count = this.count;
    5. final ReentrantLock takeLock = this.takeLock;
    6. takeLock.lockInterruptibly();// 使用takeLock加锁
    7. try {
    8. while (count.get() == 0) {// 如果队列无元素,则阻塞在notEmpty条件上
    9. notEmpty.await();
    10. }
    11. x = dequeue();// 否则,出队
    12. c = count.getAndDecrement();//长度-1,返回原值
    13. if (c > 1)// 如果取之前队列长度大于1,则唤醒notEmpty 原因与入队同理
    14. notEmpty.signal();
    15. } finally {
    16. takeLock.unlock();
    17. }
    18. // 如果取之前队列长度等于容量(已满),则唤醒notFull
    19. if (c == capacity)
    20. signalNotFull();
    21. return x;
    22. }
    23. private E dequeue() {
    24. // 这里把head删除,并把head.next作为出队元素
    25. // 并把其值置空,返回原来的值
    26. Node<E> h = head;
    27. Node<E> first = h.next;
    28. h.next = h; // 方便GC
    29. head = first;
    30. E x = first.item;
    31. first.item = null;//head.item还是指向null,head.next指向了新的出队元素
    32. return x;
    33. }
    34. private void signalNotFull() {
    35. final ReentrantLock putLock = this.putLock;
    36. putLock.lock();
    37. try {
    38. notFull.signal();
    39. } finally {
    40. putLock.unlock();
    41. }
    42. }
  • 使用takeLock加锁;

  • 如果队列空了就阻塞在notEmpty条件上;
  • 否则就出队;
  • 如果出队前元素数量大于1,唤醒其它阻塞在notEmpty条件上的线程;
  • 释放锁;
  • 如果取元素之前队列长度等于容量,就唤醒notFull条件;

删除元素

  1. public boolean remove(Object o) {
  2. if (o == null) return false;
  3. fullyLock(); //此时将入队锁和出队锁全部锁住来保证线程安全
  4. try {
  5. for (Node<E> trail = head, p = trail.next;
  6. p != null;
  7. trail = p, p = p.next) {// 循环遍历查找值相等的元素
  8. if (o.equals(p.item)) {
  9. unlink(p, trail);//调用unlink删除此节点
  10. return true;//操作成功返回true
  11. }
  12. }
  13. return false;
  14. } finally {
  15. fullyUnlock();
  16. }
  17. }
  18. void unlink(Node<E> p, Node<E> trail) {//p为要删除节点,trail为删除节点的前一个节点
  19. p.item = null;
  20. trail.next = p.next; // 改变指针将前一节点的后继节点指向删除节点的后一个节点
  21. if (last == p)
  22. last = trail;
  23. if (count.getAndDecrement() == capacity)
  24. notFull.signal();
  25. }