PriorityQueue

优先级队列,是0个或多个元素的集合,集合中的每个元素都有一个权重值,每次出队都弹出优先级最大或最小的元素。一般来说,优先级队列使用堆来实现。

属性

  1. // 默认容量
  2. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  3. // 存储元素的地方
  4. transient Object[] queue; // non-private to simplify nested class access
  5. // 元素个数
  6. private int size = 0;
  7. // 比较器
  8. private final Comparator<? super E> comparator;
  9. // 修改次数
  10. transient int modCount = 0; // non-private to simplify nested class access

入队

  1. public boolean add(E e) {
  2. return offer(e);
  3. }
  4. public boolean offer(E e) {
  5. // 不支持null元素
  6. if (e == null)
  7. throw new NullPointerException();
  8. modCount++;
  9. // 取size
  10. int i = size;
  11. // 元素个数达到最大容量了,扩容
  12. if (i >= queue.length)
  13. grow(i + 1);
  14. // 元素个数加1
  15. size = i + 1;
  16. // 如果还没有元素
  17. // 直接插入到数组第一个位置
  18. // 这里跟我们之前讲堆不一样了
  19. // java里面是从0开始的
  20. // 我们说的堆是从1开始的
  21. if (i == 0)
  22. queue[0] = e;
  23. else
  24. // 否则,插入元素到数组size的位置,也就是最后一个元素的下一位
  25. // 注意这里的size不是数组大小,而是元素个数
  26. // 然后,再做自下而上的堆化
  27. siftUp(i, e);
  28. return true;
  29. }
  30. private void siftUp(int k, E x) {
  31. // 根据是否有比较器,使用不同的方法
  32. if (comparator != null)
  33. siftUpUsingComparator(k, x);
  34. else
  35. siftUpComparable(k, x);
  36. }
  37. @SuppressWarnings("unchecked")
  38. private void siftUpComparable(int k, E x) {
  39. Comparable<? super E> key = (Comparable<? super E>) x;
  40. while (k > 0) {
  41. // 找到父节点的位置
  42. // 因为元素是从0开始的,所以减1之后再除以2
  43. int parent = (k - 1) >>> 1;
  44. // 父节点的值
  45. Object e = queue[parent];
  46. // 比较插入的元素与父节点的值
  47. // 如果比父节点大,则跳出循环
  48. // 否则交换位置
  49. if (key.compareTo((E) e) >= 0)
  50. break;
  51. // 与父节点交换位置
  52. queue[k] = e;
  53. // 现在插入的元素位置移到了父节点的位置
  54. // 继续与父节点再比较
  55. k = parent;
  56. }
  57. // 最后找到应该插入的位置,放入元素
  58. queue[k] = key;
  59. }

(1)入队不允许null元素;
(2)如果数组不够用了,先扩容;
(3)如果还没有元素,就插入下标0的位置;
(4)如果有元素了,就插入到最后一个元素往后的一个位置;
(5)自下而上堆化,一直往上跟父节点比较;
(6)如果比父节点小,就与父节点交换位置,直到出现比父节点大为止;
(7)由此可见,PriorityQueue是一个小顶堆。

扩容

  1. private void grow(int minCapacity) {
  2. // 旧容量
  3. int oldCapacity = queue.length;
  4. // Double size if small; else grow by 50%
  5. // 旧容量小于64时,容量翻倍
  6. // 旧容量大于等于64,容量只增加旧容量的一半
  7. int newCapacity = oldCapacity + ((oldCapacity < 64) ?
  8. (oldCapacity + 2) :
  9. (oldCapacity >> 1));
  10. // overflow-conscious code
  11. // 检查是否溢出
  12. if (newCapacity - MAX_ARRAY_SIZE > 0)
  13. newCapacity = hugeCapacity(minCapacity);
  14. // 创建出一个新容量大小的新数组并把旧数组元素拷贝过去
  15. queue = Arrays.copyOf(queue, newCapacity);
  16. }
  17. private static int hugeCapacity(int minCapacity) {
  18. if (minCapacity < 0) // overflow
  19. throw new OutOfMemoryError();
  20. return (minCapacity > MAX_ARRAY_SIZE) ?
  21. Integer.MAX_VALUE :
  22. MAX_ARRAY_SIZE;
  23. }

出队

  1. public E remove() {
  2. // 调用poll弹出队首元素
  3. E x = poll();
  4. if (x != null)
  5. // 有元素就返回弹出的元素
  6. return x;
  7. else
  8. // 没有元素就抛出异常
  9. throw new NoSuchElementException();
  10. }
  11. @SuppressWarnings("unchecked")
  12. public E poll() {
  13. // 如果size为0,说明没有元素
  14. if (size == 0)
  15. return null;
  16. // 弹出元素,元素个数减1
  17. int s = --size;
  18. modCount++;
  19. // 队列首元素
  20. E result = (E) queue[0];
  21. // 队列末元素
  22. E x = (E) queue[s];
  23. // 将队列末元素删除
  24. queue[s] = null;
  25. // 如果弹出元素后还有元素
  26. if (s != 0)
  27. // 将队列末元素移到队列首
  28. // 再做自上而下的堆化
  29. siftDown(0, x);
  30. // 返回弹出的元素
  31. return result;
  32. }
  33. private void siftDown(int k, E x) {
  34. // 根据是否有比较器,选择不同的方法
  35. if (comparator != null)
  36. siftDownUsingComparator(k, x);
  37. else
  38. siftDownComparable(k, x);
  39. }
  40. @SuppressWarnings("unchecked")
  41. private void siftDownComparable(int k, E x) {
  42. Comparable<? super E> key = (Comparable<? super E>)x;
  43. // 只需要比较一半就行了,因为叶子节点占了一半的元素
  44. int half = size >>> 1; // loop while a non-leaf
  45. while (k < half) {
  46. // 寻找子节点的位置,这里加1是因为元素从0号位置开始
  47. int child = (k << 1) + 1; // assume left child is least
  48. // 左子节点的值
  49. Object c = queue[child];
  50. // 右子节点的位置
  51. int right = child + 1;
  52. if (right < size &&
  53. ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
  54. // 左右节点取其小者
  55. c = queue[child = right];
  56. // 如果比子节点都小,则结束
  57. if (key.compareTo((E) c) <= 0)
  58. break;
  59. // 如果比最小的子节点大,则交换位置
  60. queue[k] = c;
  61. // 指针移到最小子节点的位置继续往下比较
  62. k = child;
  63. }
  64. // 找到正确的位置,放入元素
  65. queue[k] = key;
  66. }

(1)PriorityQueue是一个小顶堆;
(2)PriorityQueue是非线程安全的;
(3)PriorityQueue不是有序的,只有堆顶存储着最小的元素;
(4)入队就是堆的插入元素的实现;
(5)出队就是堆的删除元素的实现;

ArrayBlockingQueue

blockingqueue

  1. public interface BlockingQueue<E> extends Queue<E> {
  2. /**
  3. * 插入数据到队列尾部(如果立即可行且不会超过该队列的容量)
  4. * 在成功时返回 true,如果此队列已满,则抛IllegalStateException。(与offer方法的区别)
  5. */
  6. boolean add(E e);
  7. /**
  8. * 插入数据到队列尾部,如果没有空间,直接返回false;
  9. * 有空间直接插入,返回true。
  10. */
  11. boolean offer(E e);
  12. /**
  13. * 插入数据到队列尾部,如果队列没有空间,一直阻塞;
  14. * 有空间直接插入。
  15. */
  16. void put(E e) throws InterruptedException;
  17. /**
  18. * 插入数据到队列尾部,如果没有额外的空间,等待一定的时间,有空间即插入,返回true,
  19. * 到时间了,还是没有额外空间,返回false。
  20. */
  21. boolean offer(E e, long timeout, TimeUnit unit)
  22. throws InterruptedException;
  23. /**
  24. * 取出和删除队列中的头元素,如果没有数据,会一直阻塞到有数据
  25. */
  26. E take() throws InterruptedException;
  27. /**
  28. * 取出和删除队列中的头元素,如果没有数据,需要会阻塞一定的时间,过期了还没有数据,返回null
  29. */
  30. E poll(long timeout, TimeUnit unit)
  31. throws InterruptedException;
  32. //除了上述方法还有继承自Queue接口的方法
  33. /**
  34. * 取出和删除队列头元素,如果是空队列直接返回null。
  35. */
  36. E poll();
  37. /**
  38. * 取出但不删除头元素,该方法与peek方法的区别是当队列为空时会抛出NoSuchElementException异常
  39. */
  40. E element();
  41. /**
  42. * 取出但不删除头元素,空队列直接返回null
  43. */
  44. E peek();
  45. /**
  46. * 返回队列总额外的空间
  47. */
  48. int remainingCapacity();
  49. /**
  50. * 删除队列中存在的元素
  51. */
  52. boolean remove(Object o);
  53. /**
  54. * 判断队列中是否存在当前元素
  55. */
  56. boolean contains(Object o);
  57. }

arrayblockingqueue

ArrayBlockingQueue() 是一个用数组实现的有界阻塞队列,内部按先进先出的原则对元素进行排序; 其中 put 方法和 take 方法为添加和删除元素的阻塞方法。

ArrayBlockingQueue 内部的阻塞队列是通过 ReentrantLock 和 Condition 条件队列实现的, 所以 ArrayBlockingQueue 中的元素存在公平和非公平访问的区别,这是因为 ReentrantLock 里面存在公平锁和非公平锁的原因, ReentrantLock 的具体分析会在 Lock 章节进行具体分析的; 对于 Lock 是公平锁的时候, 则被阻塞的队列可以按照阻塞的先后顺序访问队列,Lock 是非公平锁的时候, 阻塞的线程将进入争夺锁资源的过程中,谁先抢到锁就可以先执行,没有固定的先后顺序。

构造方法

  1. /**
  2. * 创建一个具体容量的队列,默认是非公平队列
  3. */
  4. public ArrayBlockingQueue(int capacity) {
  5. this(capacity, false);
  6. }
  7. /**
  8. * 创建一个具体容量、是否公平的队列
  9. */
  10. public ArrayBlockingQueue(int capacity, boolean fair) {
  11. if (capacity <= 0)
  12. throw new IllegalArgumentException();
  13. this.items = new Object[capacity];
  14. lock = new ReentrantLock(fair);
  15. notEmpty = lock.newCondition();
  16. notFull = lock.newCondition();
  17. }
  1. //返回队列剩余容量
  2. public int remainingCapacity()
  3. // 判断队列中是否存在当前元素o
  4. public boolean contains(Object o)
  5. // 返回一个按正确顺序,包含队列中所有元素的数组
  6. public Object[] toArray()
  7. // 返回一个按正确顺序,包含队列中所有元素的数组;数组的运行时类型是指定数组的运行时类型
  8. @SuppressWarnings("unchecked")
  9. public <T> T[] toArray(T[] a)
  10. // 自动清空队列中的所有元素
  11. public void clear()
  12. // 移除队列中所有可用元素,并将他们加入到给定的 Collection 中
  13. public int drainTo(Collection<? super E> c)
  14. // 从队列中最多移除指定数量的可用元素,并将他们加入到给定的 Collection 中
  15. public int drainTo(Collection<? super E> c, int maxElements)
  16. // 返回此队列中按正确顺序进行迭代的,包含所有元素的迭代器
  17. public Iterator<E> iterator()

成员变量

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. /** 存储数据的数组 */
  4. final Object[] items;
  5. /** 获取数据的索引,用于下次 take, poll, peek or remove 等方法 */
  6. int takeIndex;
  7. /** 添加元素的索引, 用于下次 put, offer, or add 方法 */
  8. int putIndex;
  9. /** 队列元素的个数 */
  10. int count;
  11. /*
  12. * 并发控制使用任何教科书中的经典双条件算法
  13. */
  14. /** 控制并发访问的锁 */
  15. final ReentrantLock lock;
  16. /** 非空条件对象,用于通知 take 方法中在等待获取数据的线程,队列中已有数据,可以执行获取操作 */
  17. private final Condition notEmpty;
  18. /** 未满条件对象,用于通知 put 方法中在等待添加数据的线程,队列未满,可以执行添加操作 */
  19. private final Condition notFull;
  20. /** 迭代器 */
  21. transient Itrs itrs = null;
  22. }

对于 notEmpty 条件对象是用于存放等待调用(此时队列中没有数据) take 方法的线程,这些线程会加入到 notEmpty 条件对象的等待队列(单链表)中,同时当队列中有数据后会通过 notEmpty 条件对象唤醒等待队列(链表)中等待的线程(链表中第一个non-null 且 status 为 Condition的线程)去 take 数据。

对于 notFull 条件对象是用于存放等待调用(此时队列容量已满) put 方法的线程,这些线程会加入到 notFull 条件对象的等待队列(单链表)中,同时当队列中数据被消费后会通过 notFull 条件对象唤醒等待队列(链表)中等待的线程去 put 数据。takeIndex 表示的是下一个(take、poll、peek、remove)方法被调用时获取数组元素的索引,putIndex 表示的是下一个(put、offer、add)被调用时添加元素的索引。

添加方法

  1. /**
  2. * 在当前 put 位置插入数据,put 位置前进一位,
  3. * 同时唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去 take 数据。
  4. * 当然这一系列动作只有该线程获取锁的时候才能进行,即只有获取锁的线程
  5. * 才能执行 enqueue 操作。
  6. */
  7. // 元素统一入队操作
  8. private void enqueue(E x) {
  9. // assert lock.getHoldCount() == 1;
  10. // assert items[putIndex] == null;
  11. final Object[] items = this.items;
  12. items[putIndex] = x; // putIndex 位置添加数据
  13. //putIndex 进行自增,当达到数组长度的时候,putIndex 重头再来,即设置为0
  14. if (++putIndex == items.length)
  15. putIndex = 0;
  16. count++; //元素个数自增
  17. notEmpty.signal(); //添加完数据后,说明数组中有数据了,所以可以唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去 take 数据
  18. }
  19. // 添加数据,数组中元素已满时,直接返回 false。
  20. public boolean offer(E e) {
  21. checkNotNull(e);
  22. final ReentrantLock lock = this.lock;
  23. // 获取锁,保证线程安全
  24. lock.lock();
  25. try {
  26. // 当数组元素个数已满时,直接返回false
  27. if (count == items.length)
  28. return false;
  29. else {
  30. // 执行入队操作,enqueue 方法在上面分析了
  31. enqueue(e);
  32. return true;
  33. }
  34. } finally {
  35. // 释放锁,保证其他等待锁的线程可以获取到锁
  36. // 为什么放到 finally (避免死锁)
  37. lock.unlock();
  38. }
  39. }
  40. // add 方法其实就是调用了 offer 方法来实现,
  41. // 与 offer 方法的区别就是 offer 方法数组满,抛出 IllegalStateException 异常。
  42. public boolean add(E e) {
  43. if (offer(e))
  44. return true;
  45. else
  46. throw new IllegalStateException("Queue full");
  47. }

阻塞添加方法put

  1. /**
  2. * 插入数据到队列尾部,如果队列已满,阻塞等待空间
  3. */
  4. public void put(E e) throws InterruptedException {
  5. checkNotNull(e);
  6. final ReentrantLock lock = this.lock;
  7. // 获取锁,期间线程可以打断,打断则不会添加
  8. lock.lockInterruptibly();
  9. try {
  10. // 通过上述分析,我们通过 count 来判断数组中元素个数
  11. while (count == items.length)
  12. notFull.await(); // 元素已满,线程挂起,线程加入 notFull 条件对象等待队列(链表)中,等待被唤醒
  13. enqueue(e); // 队列未满,直接执行入队操作
  14. } finally {
  15. lock.unlock();
  16. }
  17. }

LinkedBlockingQueue

构造方法

  1. public LinkedBlockingQueue() {
  2. this(Integer.MAX_VALUE);
  3. }
  4. public LinkedBlockingQueue(int capacity) {
  5. if (capacity <= 0) throw new IllegalArgumentException();
  6. this.capacity = capacity;
  7. last = head = new Node<E>(null);
  8. }
  9. public LinkedBlockingQueue(Collection<? extends E> c) {
  10. this(Integer.MAX_VALUE);
  11. //调用第二个构造方法,传入的capacity是Int的最大值,可以说是一个无界队列。
  12. final ReentrantLock putLock = this.putLock;
  13. putLock.lock(); //开启排他锁
  14. try {
  15. int n = 0;//用于记录LinkedBlockingQueue的size
  16. //循环传入的c集合
  17. for (E e : c) {
  18. if (e == null)//如果e==null,则抛出空指针异常
  19. throw new NullPointerException();
  20. if (n == capacity)//如果n==capacity,说明到了最大的容量,则抛出“Queue full”异常
  21. throw new IllegalStateException("Queue full");
  22. enqueue(new Node<E>(e));//入队操作
  23. ++n;//n自增
  24. }
  25. count.set(n);//设置count
  26. } finally {
  27. putLock.unlock();//释放排他锁
  28. }
  29. }

node

  1. static class Node<E> {
  2. E item;
  3. Node<E> next;
  4. Node(E x) { item = x; }
  5. }

put

  1. public void put(E e) throws InterruptedException {
  2. // 值不为空
  3. if (e == null) throw new NullPointerException();
  4. // Note: convention in all put/take/etc is to preset local var
  5. // holding count negative to indicate failure unless set.
  6. //
  7. int c = -1;
  8. // 新生结点
  9. Node<E> node = new Node<E>(e);
  10. // 存元素锁
  11. final ReentrantLock putLock = this.putLock;
  12. // 元素个数
  13. final AtomicInteger count = this.count;
  14. // 如果当前线程未被中断,则获取锁
  15. putLock.lockInterruptibly();
  16. try {
  17. /*
  18. * Note that count is used in wait guard even though it is
  19. * not protected by lock. This works because count can
  20. * only decrease at this point (all other puts are shut
  21. * out by lock), and we (or some other waiting put) are
  22. * signalled if it ever changes from capacity. Similarly
  23. * for all other uses of count in other wait guards.
  24. */
  25. while (count.get() == capacity) { // 元素个数到达指定容量
  26. // 在notFull条件上进行等待
  27. notFull.await();
  28. }
  29. // 入队列
  30. enqueue(node);
  31. // 更新元素个数,返回的是以前的元素个数(c从-1开始的)
  32. c = count.getAndIncrement();
  33. if (c + 1 < capacity) // 元素个数是否小于容量
  34. // 唤醒在notFull条件上等待的某个线程
  35. notFull.signal();
  36. } finally {
  37. // 释放锁
  38. putLock.unlock();
  39. }
  40. if (c == 0)
  41. // 元素个数为0,表示已有take线程在notEmpty条件上进入了等待,
  42. //则需要唤醒在notEmpty条件上等待的线程
  43. signalNotEmpty();
  44. }
  45. private void signalNotEmpty() {
  46. // 取元素锁
  47. final ReentrantLock takeLock = this.takeLock;
  48. // 获取锁
  49. takeLock.lock();
  50. try {
  51. // 唤醒在notEmpty条件上等待的某个线程
  52. notEmpty.signal();
  53. } finally {
  54. // 释放锁
  55. takeLock.unlock();
  56. }
  57. }

SynchronousQueue

SynchronousQueue是一个比较特别的队列,此队列源码中充斥着大量的CAS语句,在线程池方面有所应用。
SynchronousQueue 的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。

SynchronousQueue采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,SynchronousQueue的put、take操作都是委托这两个类来实现的。

  1. public void put(E e) throws InterruptedException {
  2. // 若插入的数据是null,则直接抛出NullPointerException异常
  3. if (e == null) throw new NullPointerException();
  4. // 调用transfer方法
  5. if (transferer.transfer(e, false, 0) == null) {
  6. Thread.interrupted();
  7. throw new InterruptedException();
  8. }
  9. }
  10. public E take() throws InterruptedException {
  11. // 调用transfer方法
  12. E e = transferer.transfer(null, false, 0);
  13. // 若值不为null,则直接返回
  14. if (e != null)
  15. return e;
  16. Thread.interrupted();
  17. throw new InterruptedException();
  18. }


TransferQueue

  1. static final class TransferQueue<E> extends Transferer<E>{
  2. /** Head of queue */
  3. transient volatile QNode head;
  4. /** Tail of queue */
  5. transient volatile QNode tail;
  6. /**
  7. * Reference to a cancelled node that might not yet have been
  8. * unlinked from queue because it was the last inserted node
  9. * when it was cancelled.
  10. */
  11. transient volatile QNode cleanMe;
  12. TransferQueue() {
  13. QNode h = new QNode(null, false); // initialize to dummy node.
  14. head = h;
  15. tail = h;
  16. }
  17. }
  18. //它使用队列作为交易媒介,来实现公平交易,TransferQueue使用QNode类来作为队列节点:
  19. static final class QNode {
  20. // 指向下一个节点
  21. volatile QNode next; // next node in queue
  22. // item数据项
  23. volatile Object item; // CAS'ed to or from null
  24. // 等待线程
  25. volatile Thread waiter; // to control park/unpark
  26. // 是否为数据的标识
  27. final boolean isData;
  28. ...
  29. }

TransferStack

  1. static final class TransferStack<E> extends Transfer
  2. static final class SNode {
  3. // 指向栈中的下一个节点
  4. volatile SNode next; // next node in stack
  5. // 匹配节点
  6. volatile SNode match; // the node matched to this
  7. // 等待线程
  8. volatile Thread waiter; // to control park/unpark
  9. // item数据线
  10. Object item; // data; or null for REQUESTs
  11. // 节点状态
  12. int mode;
  13. ...
  14. }
  15. /** Node represents an unfulfilled consumer */
  16. static final int REQUEST = 0;
  17. /** Node represents an unfulfilled producer */
  18. static final int DATA = 1;
  19. /** Node is fulfilling another unfulfilled DATA or REQUEST */
  20. static final int FULFILLING = 2;
  21. //REQUEST:表示了一个请求交易但是没有得到匹配的消费者
  22. //DATA:表示一个请求交易但是没有交付数据的生产者
  23. //FULFILLING:表示正在进行交易的生产者或者消费者

从源码中可以看到,这两个方法都会调用transfer方法,其中,put方法传递的是e参数,所以模式为数据(公平isData = true,非公平mode= DATA),而take方法传递的是null,所以模式为请求(公平isData = false,非公平mode = REQUEST)。

公平模式(queue)

  1. E transfer(E e, boolean timed, long nanos) {
  2. QNode s = null; // constructed/reused as needed
  3. // 获取当前节点的模式
  4. boolean isData = (e != null);
  5. for (;;) {
  6. QNode t = tail;
  7. QNode h = head;
  8. // 队列没有初始化,自旋
  9. if (t == null || h == null) // saw uninitialized value
  10. continue; // spin
  11. // 头尾节点相等(队列为null),或者当前节点和队列尾节点具有相同的交易类型
  12. // 将节点添加到队列尾部,并且等待匹配
  13. if (h == t || t.isData == isData) { // empty or same-mode
  14. QNode tn = t.next;
  15. // t != tail表明已有其他线程修改了tail,当前线程需要重新再来
  16. if (t != tail) // inconsistent read
  17. continue;
  18. // 若尾节点的后继节点不为null,则表明已经有其他线程添加了节点,更新尾节点
  19. if (tn != null) { // lagging tail
  20. advanceTail(t, tn);
  21. continue;
  22. }
  23. // 超时
  24. if (timed && nanos <= 0) // can't wait
  25. return null;
  26. // s == null,则创建一个新节点
  27. if (s == null)
  28. s = new QNode(e, isData);
  29. // 将新节点加入到队列中,如果不成功,继续处理
  30. if (!t.casNext(null, s)) // failed to link in
  31. continue;
  32. // 更新尾节点
  33. advanceTail(t, s); // swing tail and wait
  34. // 调用awaitFulfill方法,若节点是head.next,则进行自旋
  35. // 否则,直接阻塞,直到有其他线程与之匹配,或它自己进行线程的中断
  36. Object x = awaitFulfill(s, e, timed, nanos);
  37. // 若返回的x == s表示,当前线程已经超时或者中断,不然的话s == null或者是匹配的节点
  38. if (x == s) { // wait was cancelled
  39. clean(t, s);
  40. return null;
  41. }
  42. // 若s节点还没有从队列删除
  43. if (!s.isOffList()) { // not already unlinked
  44. // 尝试将s节点设置为head,移出t
  45. advanceHead(t, s); // unlink if head
  46. if (x != null) // and forget fields
  47. s.item = s;
  48. s.waiter = null;
  49. }
  50. return (x != null) ? (E)x : e;
  51. }
  52. // 这里是从head.next开始,因为TransferQueue总是会存在一个dummy节点
  53. else { // complementary-mode
  54. QNode m = h.next; // node to fulfill
  55. // 不一致读,表明有其他线程修改了队列
  56. if (t != tail || m == null || h != head)
  57. continue; // inconsistent read
  58. Object x = m.item;
  59. // isData == (x != null):判断isData与x的模式是否相同,相同表示已经匹配了
  60. // x == m :m节点被取消了
  61. // !m.casItem(x, e):如果尝试将数据e设置到m上失败
  62. if (isData == (x != null) || // m already fulfilled
  63. x == m || // m cancelled
  64. !m.casItem(x, e)) { // lost CAS
  65. // 将m设置为头结点,h出列,然后重试
  66. advanceHead(h, m); // dequeue and retry
  67. continue;
  68. }
  69. // 成功匹配了,m设置为头结点h出列,向前推进
  70. advanceHead(h, m); // successfully fulfilled
  71. // 唤醒m的等待线程
  72. LockSupport.unpark(m.waiter);
  73. return (x != null) ? (E)x : e;
  74. }
  75. }
  76. }

https://blog.csdn.net/qq_38293564/article/details/80604194
…….

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认情况下元素采用自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue也是基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。

PriorityBlockingQueue有四个构造方法:
// 默认的构造方法,该方法会调用this(DEFAULT_INITIAL_CAPACITY, null),即默认的容量是11
public PriorityBlockingQueue()
// 根据initialCapacity来设置队列的初始容量
public PriorityBlockingQueue(int initialCapacity)
// 根据initialCapacity来设置队列的初始容量,并根据comparator对象来对数据进行排序
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
// 根据集合来创建队列
public PriorityBlockingQueue(Collection<? extends E> c)

  1. public class PriorityBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {
  3. private static final long serialVersionUID = 5595510919245408276L;
  4. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  5. private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  6. private transient Object[] queue;
  7. private transient int size;
  8. private transient Comparator<? super E> comparator;
  9. private final ReentrantLock lock;
  10. private final Condition notEmpty;
  11. private transient volatile int allocationSpinLock;//扩容时候用到,自旋锁
  12. private PriorityQueue<E> q;//数组实现的最小堆,writeObject和readObject用到。
  13. //为了兼容之前的版本,只有在序列化和反序列化才非空
  14. public PriorityBlockingQueue(int initialCapacity,
  15. Comparator<? super E> comparator) {
  16. if (initialCapacity < 1)
  17. throw new IllegalArgumentException();
  18. this.lock = new ReentrantLock();
  19. this.notEmpty = lock.newCondition();
  20. this.comparator = comparator;
  21. this.queue = new Object[initialCapacity];
  22. //构造函数没有初始化allocationSpinLock,q
  23. }
  24. public PriorityBlockingQueue(Collection<? extends E> c) {
  25. this.lock = new ReentrantLock();
  26. this.notEmpty = lock.newCondition();
  27. boolean heapify = true; // true if not known to be in heap order
  28. boolean screen = true; // true if must screen for nulls
  29. if (c instanceof SortedSet<?>) {// 如果传入集合是有序集,则无须进行堆有序化
  30. SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
  31. this.comparator = (Comparator<? super E>) ss.comparator();
  32. heapify = false;//不需要重建堆
  33. }// 如果传入集合是PriorityBlockingQueue类型,则不进行堆有序化
  34. else if (c instanceof PriorityBlockingQueue<?>) {
  35. PriorityBlockingQueue<? extends E> pq =
  36. (PriorityBlockingQueue<? extends E>) c;
  37. this.comparator = (Comparator<? super E>) pq.comparator();
  38. screen = false;
  39. if (pq.getClass() == PriorityBlockingQueue.class) // exact match
  40. heapify = false;//不需要重建堆
  41. }
  42. Object[] a = c.toArray();
  43. int n = a.length;
  44. // If c.toArray incorrectly doesn't return Object[], copy it.
  45. if (a.getClass() != Object[].class)
  46. a = Arrays.copyOf(a, n, Object[].class);
  47. if (screen && (n == 1 || this.comparator != null)) {
  48. for (int i = 0; i < n; ++i)
  49. if (a[i] == null)
  50. throw new NullPointerException();
  51. }
  52. this.queue = a;
  53. this.size = n;
  54. if (heapify)
  55. heapify();//重建堆
  56. }
  57. private void removeAt(int i) {
  58. Object[] array = queue;
  59. int n = size - 1;
  60. if (n == i) // removed last element
  61. array[i] = null;
  62. else {
  63. E moved = (E) array[n];
  64. array[n] = null;
  65. Comparator<? super E> cmp = comparator;
  66. if (cmp == null)
  67. siftDownComparable(i, moved, array, n);
  68. else
  69. siftDownUsingComparator(i, moved, array, n, cmp);
  70. if (array[i] == moved) {
  71. if (cmp == null)
  72. siftUpComparable(i, moved, array);
  73. else
  74. siftUpUsingComparator(i, moved, array, cmp);
  75. }
  76. }
  77. size = n;
  78. }
  79. private static <T> void siftDownComparable(int k, T x, Object[] array,
  80. int n) {//元素x放到k的位置
  81. if (n > 0) {
  82. Comparable<? super T> key = (Comparable<? super T>)x;
  83. int half = n >>> 1; // loop while a non-leaf
  84. while (k < half) {
  85. int child = (k << 1) + 1; // assume left child is least
  86. Object c = array[child];
  87. int right = child + 1;
  88. if (right < n &&
  89. ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
  90. c = array[child = right];
  91. if (key.compareTo((T) c) <= 0)//比子节点小就不动,小堆
  92. break;
  93. array[k] = c;
  94. k = child;
  95. }
  96. array[k] = key;
  97. }
  98. }
  99. private static <T> void siftUpComparable(int k, T x, Object[] array) {//元素x放到k的位置
  100. Comparable<? super T> key = (Comparable<? super T>) x;
  101. while (k > 0) {
  102. int parent = (k - 1) >>> 1;
  103. Object e = array[parent];
  104. if (key.compareTo((T) e) >= 0)//比父亲大就不动,小堆
  105. break;
  106. array[k] = e;
  107. k = parent;
  108. }
  109. array[k] = key;
  110. }
  111. public boolean offer(E e) {
  112. if (e == null)// 若插入的元素为null,则直接抛出NullPointerException异常
  113. throw new NullPointerException();
  114. final ReentrantLock lock = this.lock;
  115. lock.lock();
  116. int n, cap;
  117. Object[] array;
  118. while ((n = size) >= (cap = (array = queue).length))
  119. tryGrow(array, cap);
  120. try {
  121. Comparator<? super E> cmp = comparator;
  122. if (cmp == null)
  123. siftUpComparable(n, e, array);//准备放在最后size位置处
  124. else
  125. siftUpUsingComparator(n, e, array, cmp);
  126. size = n + 1;
  127. notEmpty.signal();// 唤醒等待在空上的线程
  128. } finally {
  129. lock.unlock();
  130. }
  131. return true;
  132. }
  133. public E take() throws InterruptedException {
  134. final ReentrantLock lock = this.lock;
  135. lock.lockInterruptibly();
  136. E result;
  137. try {
  138. while ( (result = dequeue()) == null)
  139. notEmpty.await();
  140. } finally {
  141. lock.unlock();
  142. }
  143. return result;
  144. }
  145. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  146. long nanos = unit.toNanos(timeout);
  147. final ReentrantLock lock = this.lock;
  148. lock.lockInterruptibly();
  149. E result;
  150. try {
  151. while ( (result = dequeue()) == null && nanos > 0)
  152. nanos = notEmpty.awaitNanos(nanos);
  153. } finally {
  154. lock.unlock();
  155. }
  156. return result;
  157. }
  158. public E peek() {
  159. final ReentrantLock lock = this.lock;
  160. lock.lock();
  161. try {
  162. return (size == 0) ? null : (E) queue[0];
  163. } finally {
  164. lock.unlock();
  165. }
  166. }
  167. public int size() {
  168. final ReentrantLock lock = this.lock;
  169. lock.lock();
  170. try {
  171. return size;
  172. } finally {
  173. lock.unlock();
  174. }
  175. }
  176. private int indexOf(Object o) {
  177. if (o != null) {
  178. Object[] array = queue;
  179. int n = size;
  180. for (int i = 0; i < n; i++)
  181. if (o.equals(array[i]))
  182. return i;
  183. }
  184. return -1;
  185. }
  186. public boolean remove(Object o) {
  187. final ReentrantLock lock = this.lock;
  188. lock.lock();
  189. try {
  190. int i = indexOf(o);
  191. if (i == -1)
  192. return false;
  193. removeAt(i);
  194. return true;
  195. } finally {
  196. lock.unlock();
  197. }
  198. }
  199. public boolean contains(Object o) {
  200. final ReentrantLock lock = this.lock;
  201. lock.lock();
  202. try {
  203. return indexOf(o) != -1;
  204. } finally {
  205. lock.unlock();
  206. }
  207. }
  208. private E dequeue() {
  209. int n = size - 1;
  210. if (n < 0)
  211. return null;
  212. else {
  213. Object[] array = queue;
  214. E result = (E) array[0];
  215. E x = (E) array[n];
  216. array[n] = null;
  217. Comparator<? super E> cmp = comparator;
  218. if (cmp == null)
  219. siftDownComparable(0, x, array, n);
  220. else
  221. siftDownUsingComparator(0, x, array, n, cmp);
  222. size = n;
  223. return result;
  224. }
  225. }
  226. private void heapify() {
  227. Object[] array = queue;
  228. int n = size;
  229. int half = (n >>> 1) - 1;
  230. Comparator<? super E> cmp = comparator;
  231. if (cmp == null) {
  232. for (int i = half; i >= 0; i--)
  233. siftDownComparable(i, (E) array[i], array, n);//数组重建为堆
  234. }
  235. else {
  236. for (int i = half; i >= 0; i--)
  237. siftDownUsingComparator(i, (E) array[i], array, n, cmp);
  238. }
  239. }
  240. public void clear() {
  241. final ReentrantLock lock = this.lock;
  242. lock.lock();
  243. try {
  244. Object[] array = queue;
  245. int n = size;
  246. size = 0;
  247. for (int i = 0; i < n; i++)
  248. array[i] = null;
  249. } finally {
  250. lock.unlock();
  251. }
  252. public int drainTo(Collection<? super E> c, int maxElements) {//批量获取元素
  253. if (c == null)
  254. throw new NullPointerException();
  255. if (c == this)
  256. throw new IllegalArgumentException();
  257. if (maxElements <= 0)
  258. return 0;
  259. final ReentrantLock lock = this.lock;
  260. lock.lock();
  261. try {
  262. int n = Math.min(size, maxElements);
  263. for (int i = 0; i < n; i++) {// 循环遍历,不断弹出队首元素;
  264. c.add((E) queue[0]); // In this order, in case add() throws.
  265. dequeue();
  266. }
  267. return n;
  268. } finally {
  269. lock.unlock();
  270. }
  271. }
  272. }

扩容

  1. private void tryGrow(Object[] array, int oldCap) {//旧数组和容量
  2. lock.unlock(); // 释放锁,防止阻塞出队操作
  3. Object[] newArray = null;
  4. //释放了锁,多个线程可以进来这里,但是只有一个线程可以执行if里面的代码,也就是只有一个线程可以扩容,
  5. if (allocationSpinLock == 0 && // 使用CAS操作来修改allocationSpinLock
  6. UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
  7. 0, 1)) {
  8. try {// 容量越小增长得越快,若容量小于64,则新容量是oldCap * 2 + 2,否则是oldCap * 1.5
  9. int newCap = oldCap + ((oldCap < 64) ?
  10. (oldCap + 2) : // grow faster if small
  11. (oldCap >> 1));
  12. if (newCap - MAX_ARRAY_SIZE > 0) { // 扩容后超过最大容量处理
  13. int minCap = oldCap + 1;
  14. if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//整数溢出
  15. throw new OutOfMemoryError();
  16. newCap = MAX_ARRAY_SIZE;
  17. }//queue是公共变量,
  18. if (newCap > oldCap && queue == array)
  19. newArray = new Object[newCap];
  20. } finally {// 解锁,因为只有一个线程到此,因而不需要CAS操作;
  21. allocationSpinLock = 0;
  22. }
  23. }//失败扩容的线程newArray == null,调用Thread.yield()让出cpu, 让扩容线程扩容后优先调用lock.lock重新获取锁,
  24. //但是这得不到一定的保证,有可能调用Thread.yield()的线程先获取了锁。
  25. if (newArray == null)
  26. Thread.yield();
  27. lock.lock();//有可能扩容的线程先走到这里,也有可能没有扩容的线程先走到这里。
  28. //准备赋值给共有变量queue,要加锁,
  29. //扩容的线程newArray != null ,没有扩容的线程newArray = null
  30. if (newArray != null && queue == array) {//再次进入while循环去扩容。
  31. queue = newArray;
  32. System.arraycopy(array, 0, newArray, 0, oldCap);
  33. }
  34. }
  35. private static final sun.misc.Unsafe UNSAFE;
  36. private static final long allocationSpinLockOffset;
  37. static {
  38. try {
  39. UNSAFE = sun.misc.Unsafe.getUnsafe();
  40. Class<?> k = PriorityBlockingQueue.class;
  41. allocationSpinLockOffset = UNSAFE.objectFieldOffset
  42. (k.getDeclaredField("allocationSpinLock")); //allocationSpinLock这个字段
  43. } catch (Exception e) {
  44. throw new Error(e);
  45. }
  46. }

LinkedTransferQueue

LinkedTransferQueue使用了一个叫做dual data structure的数据结构,或者叫做dual queue,译为双重数据结构或者双重队列。

放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。
放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。

取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。

属性

  1. // 头节点
  2. transient volatile Node head;
  3. // 尾节点
  4. private transient volatile Node tail;
  5. // 放取元素的几种方式:
  6. // 立即返回,用于非超时的poll()和tryTransfer()方法中
  7. private static final int NOW = 0; // for untimed poll, tryTransfer
  8. // 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
  9. private static final int ASYNC = 1; // for offer, put, add
  10. // 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
  11. private static final int SYNC = 2; // for transfer, take
  12. // 超时,用于有超时的poll()和tryTransfer()方法中
  13. private static final int TIMED = 3; // for timed poll, tryTransfer

内部类

  1. static final class Node {
  2. // 是否是数据节点(也就标识了是生产者还是消费者)
  3. final boolean isData; // false if this is a request node
  4. // 元素的值
  5. volatile Object item; // initially non-null if isData; CASed to match
  6. // 下一个节点
  7. volatile Node next;
  8. // 持有元素的线程
  9. volatile Thread waiter; // null until waiting
  10. }
  11. //构造方法
  12. public LinkedTransferQueue() {
  13. }
  14. public LinkedTransferQueue(Collection<? extends E> c) {
  15. this();
  16. addAll(c);
  17. }

只有这两个构造方法,且没有初始容量,所以是无界的一个阻塞队列。

xfer方法

  1. private E xfer(E e, boolean haveData, int how, long nanos) {
  2. // 不允许放入空元素
  3. if (haveData && (e == null))
  4. throw new NullPointerException();
  5. Node s = null; // the node to append, if needed
  6. // 外层循环,自旋,失败就重试
  7. retry:
  8. for (;;) { // restart on append race
  9. // 下面这个for循环用于控制匹配的过程
  10. // 同一时刻队列中只会存储一种类型的节点
  11. // 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
  12. // 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
  13. for (Node h = head, p = h; p != null;) { // find & match first node
  14. // p节点的模式
  15. boolean isData = p.isData;
  16. // p节点的值
  17. Object item = p.item;
  18. // p没有被匹配到
  19. if (item != p && (item != null) == isData) { // unmatched
  20. // 如果两者模式一样,则不能匹配,跳出循环后尝试入队
  21. if (isData == haveData) // can't match
  22. break;
  23. // 如果两者模式不一样,则尝试匹配
  24. // 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
  25. if (p.casItem(item, e)) { // match
  26. // 匹配成功
  27. // for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
  28. // 看不懂可以直接跳过
  29. for (Node q = p; q != h;) {
  30. // 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
  31. Node n = q.next; // update by 2 unless singleton
  32. // 如果head还没变,就把它更新成新的节点
  33. // 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
  34. // 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
  35. // 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
  36. // 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
  37. if (head == h && casHead(h, n == null ? q : n)) {
  38. h.forgetNext();
  39. break;
  40. } // advance and retry
  41. // 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
  42. if ((h = head) == null ||
  43. (q = h.next) == null || !q.isMatched())
  44. break; // unless slack < 2
  45. }
  46. // 唤醒p中等待的线程
  47. LockSupport.unpark(p.waiter);
  48. // 并返回匹配到的元素
  49. return LinkedTransferQueue.<E>cast(item);
  50. }
  51. }
  52. // p已经被匹配了或者尝试匹配的时候失败了
  53. // 也就是其它线程先一步匹配了p
  54. // 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
  55. // 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
  56. Node n = p.next;
  57. p = (p != n) ? n : (h = head); // Use head if p offlist
  58. }
  59. // 到这里肯定是队列中存储的节点类型和自己一样
  60. // 或者队列中没有元素了
  61. // 就入队(不管放元素还是取元素都得入队)
  62. // 入队又分成四种情况:
  63. // NOW,立即返回,没有匹配到立即返回,不做入队操作
  64. // ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
  65. // SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
  66. // TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
  67. // 如果不是立即返回
  68. if (how != NOW) { // No matches available
  69. // 新建s节点
  70. if (s == null)
  71. s = new Node(e, haveData);
  72. // 尝试入队
  73. Node pred = tryAppend(s, haveData);
  74. // 入队失败,重试
  75. if (pred == null)
  76. continue retry; // lost race vs opposite mode
  77. // 如果不是异步(同步或者有超时)
  78. // 就等待被匹配
  79. if (how != ASYNC)
  80. return awaitMatch(s, pred, e, (how == TIMED), nanos);
  81. }
  82. return e; // not waiting
  83. }
  84. }
  85. private Node tryAppend(Node s, boolean haveData) {
  86. // 从tail开始遍历,把s放到链表尾端
  87. for (Node t = tail, p = t;;) { // move p to last node and append
  88. Node n, u; // temps for reads of next & tail
  89. // 如果首尾都是null,说明链表中还没有元素
  90. if (p == null && (p = head) == null) {
  91. // 就让首节点指向s
  92. // 注意,这里插入第一个元素的时候tail指针并没有指向s
  93. if (casHead(null, s))
  94. return s; // initialize
  95. }
  96. else if (p.cannotPrecede(haveData))
  97. // 如果p无法处理,则返回null
  98. // 这里无法处理的意思是,p和s节点的类型不一样,不允许s入队
  99. // 比如,其它线程先入队了一个数据节点,这时候要入队一个非数据节点,就不允许,
  100. // 队列中所有的元素都要保证是同一种类型的节点
  101. // 返回null后外面的方法会重新尝试匹配重新入队等
  102. return null; // lost race vs opposite mode
  103. else if ((n = p.next) != null) // not last; keep traversing
  104. // 如果p的next不为空,说明不是最后一个节点
  105. // 则让p重新指向最后一个节点
  106. p = p != t && t != (u = tail) ? (t = u) : // stale tail
  107. (p != n) ? n : null; // restart if off list
  108. else if (!p.casNext(null, s))
  109. // 如果CAS更新s为p的next失败
  110. // 则说明有其它线程先一步更新到p的next了
  111. // 就让p指向p的next,重新尝试让s入队
  112. p = p.next; // re-read on CAS failure
  113. else {
  114. // 到这里说明s成功入队了
  115. // 如果p不等于t,就更新tail指针
  116. // 还记得上面插入第一个元素时tail指针并没有指向新元素吗?
  117. // 这里就是用来更新tail指针的
  118. if (p != t) { // update if slack now >= 2
  119. while ((tail != t || !casTail(t, s)) &&
  120. (t = tail) != null &&
  121. (s = t.next) != null && // advance and retry
  122. (s = s.next) != null && s != t);
  123. }
  124. // 返回p,即s的前一个元素
  125. return p;
  126. }
  127. }
  128. }
  129. private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
  130. // 如果是有超时的,计算其超时时间
  131. final long deadline = timed ? System.nanoTime() + nanos : 0L;
  132. // 当前线程
  133. Thread w = Thread.currentThread();
  134. // 自旋次数
  135. int spins = -1; // initialized after first item and cancel checks
  136. // 随机数,随机让一些自旋的线程让出CPU
  137. ThreadLocalRandom randomYields = null; // bound if needed
  138. for (;;) {
  139. Object item = s.item;
  140. // 如果s元素的值不等于e,说明它被匹配到了
  141. if (item != e) { // matched
  142. // assert item != s;
  143. // 把s的item更新为s本身
  144. // 并把s中的waiter置为空
  145. s.forgetContents(); // avoid garbage
  146. // 返回匹配到的元素
  147. return LinkedTransferQueue.<E>cast(item);
  148. }
  149. // 如果当前线程中断了,或者有超时的到期了
  150. // 就更新s的元素值指向s本身
  151. if ((w.isInterrupted() || (timed && nanos <= 0)) &&
  152. s.casItem(e, s)) { // cancel
  153. // 尝试解除s与其前一个节点的关系
  154. // 也就是删除s节点
  155. unsplice(pred, s);
  156. // 返回元素的值本身,说明没匹配到
  157. return e;
  158. }
  159. // 如果自旋次数小于0,就计算自旋次数
  160. if (spins < 0) { // establish spins at/near front
  161. // spinsFor()计算自旋次数
  162. // 如果前面有节点未被匹配就返回0
  163. // 如果前面有节点且正在匹配中就返回一定的次数,等待
  164. if ((spins = spinsFor(pred, s.isData)) > 0)
  165. // 初始化随机数
  166. randomYields = ThreadLocalRandom.current();
  167. }
  168. else if (spins > 0) { // spin
  169. // 还有自旋次数就减1
  170. --spins;
  171. // 并随机让出CPU
  172. if (randomYields.nextInt(CHAINED_SPINS) == 0)
  173. Thread.yield(); // occasionally yield
  174. }
  175. else if (s.waiter == null) {
  176. // 更新s的waiter为当前线程
  177. s.waiter = w; // request unpark then recheck
  178. }
  179. else if (timed) {
  180. // 如果有超时,计算超时时间,并阻塞一定时间
  181. nanos = deadline - System.nanoTime();
  182. if (nanos > 0L)
  183. LockSupport.parkNanos(this, nanos);
  184. }
  185. else {
  186. // 不是超时的,直接阻塞,等待被唤醒
  187. // 唤醒后进入下一次循环,走第一个if的逻辑就返回匹配的元素了
  188. LockSupport.park(this);
  189. }
  190. }
  191. }

DelayQueue

https://www.cnblogs.com/tong-yuan/p/DelayQueue.html

ConcurrentLinkedQueue

https://ifeve.com/concurrentlinkedqueue/