PriorityQueue
优先级队列,是0个或多个元素的集合,集合中的每个元素都有一个权重值,每次出队都弹出优先级最大或最小的元素。一般来说,优先级队列使用堆来实现。
属性
// 默认容量private static final int DEFAULT_INITIAL_CAPACITY = 11;// 存储元素的地方transient Object[] queue; // non-private to simplify nested class access// 元素个数private int size = 0;// 比较器private final Comparator<? super E> comparator;// 修改次数transient int modCount = 0; // non-private to simplify nested class access
入队
public boolean add(E e) {return offer(e);}public boolean offer(E e) {// 不支持null元素if (e == null)throw new NullPointerException();modCount++;// 取sizeint i = size;// 元素个数达到最大容量了,扩容if (i >= queue.length)grow(i + 1);// 元素个数加1size = i + 1;// 如果还没有元素// 直接插入到数组第一个位置// 这里跟我们之前讲堆不一样了// java里面是从0开始的// 我们说的堆是从1开始的if (i == 0)queue[0] = e;else// 否则,插入元素到数组size的位置,也就是最后一个元素的下一位// 注意这里的size不是数组大小,而是元素个数// 然后,再做自下而上的堆化siftUp(i, e);return true;}private void siftUp(int k, E x) {// 根据是否有比较器,使用不同的方法if (comparator != null)siftUpUsingComparator(k, x);elsesiftUpComparable(k, x);}@SuppressWarnings("unchecked")private void siftUpComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>) x;while (k > 0) {// 找到父节点的位置// 因为元素是从0开始的,所以减1之后再除以2int parent = (k - 1) >>> 1;// 父节点的值Object e = queue[parent];// 比较插入的元素与父节点的值// 如果比父节点大,则跳出循环// 否则交换位置if (key.compareTo((E) e) >= 0)break;// 与父节点交换位置queue[k] = e;// 现在插入的元素位置移到了父节点的位置// 继续与父节点再比较k = parent;}// 最后找到应该插入的位置,放入元素queue[k] = key;}
(1)入队不允许null元素;
(2)如果数组不够用了,先扩容;
(3)如果还没有元素,就插入下标0的位置;
(4)如果有元素了,就插入到最后一个元素往后的一个位置;
(5)自下而上堆化,一直往上跟父节点比较;
(6)如果比父节点小,就与父节点交换位置,直到出现比父节点大为止;
(7)由此可见,PriorityQueue是一个小顶堆。
扩容
private void grow(int minCapacity) {// 旧容量int oldCapacity = queue.length;// Double size if small; else grow by 50%// 旧容量小于64时,容量翻倍// 旧容量大于等于64,容量只增加旧容量的一半int newCapacity = oldCapacity + ((oldCapacity < 64) ?(oldCapacity + 2) :(oldCapacity >> 1));// overflow-conscious code// 检查是否溢出if (newCapacity - MAX_ARRAY_SIZE > 0)newCapacity = hugeCapacity(minCapacity);// 创建出一个新容量大小的新数组并把旧数组元素拷贝过去queue = Arrays.copyOf(queue, newCapacity);}private static int hugeCapacity(int minCapacity) {if (minCapacity < 0) // overflowthrow new OutOfMemoryError();return (minCapacity > MAX_ARRAY_SIZE) ?Integer.MAX_VALUE :MAX_ARRAY_SIZE;}
出队
public E remove() {// 调用poll弹出队首元素E x = poll();if (x != null)// 有元素就返回弹出的元素return x;else// 没有元素就抛出异常throw new NoSuchElementException();}@SuppressWarnings("unchecked")public E poll() {// 如果size为0,说明没有元素if (size == 0)return null;// 弹出元素,元素个数减1int s = --size;modCount++;// 队列首元素E result = (E) queue[0];// 队列末元素E x = (E) queue[s];// 将队列末元素删除queue[s] = null;// 如果弹出元素后还有元素if (s != 0)// 将队列末元素移到队列首// 再做自上而下的堆化siftDown(0, x);// 返回弹出的元素return result;}private void siftDown(int k, E x) {// 根据是否有比较器,选择不同的方法if (comparator != null)siftDownUsingComparator(k, x);elsesiftDownComparable(k, x);}@SuppressWarnings("unchecked")private void siftDownComparable(int k, E x) {Comparable<? super E> key = (Comparable<? super E>)x;// 只需要比较一半就行了,因为叶子节点占了一半的元素int half = size >>> 1; // loop while a non-leafwhile (k < half) {// 寻找子节点的位置,这里加1是因为元素从0号位置开始int child = (k << 1) + 1; // assume left child is least// 左子节点的值Object c = queue[child];// 右子节点的位置int right = child + 1;if (right < size &&((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)// 左右节点取其小者c = queue[child = right];// 如果比子节点都小,则结束if (key.compareTo((E) c) <= 0)break;// 如果比最小的子节点大,则交换位置queue[k] = c;// 指针移到最小子节点的位置继续往下比较k = child;}// 找到正确的位置,放入元素queue[k] = key;}
(1)PriorityQueue是一个小顶堆;
(2)PriorityQueue是非线程安全的;
(3)PriorityQueue不是有序的,只有堆顶存储着最小的元素;
(4)入队就是堆的插入元素的实现;
(5)出队就是堆的删除元素的实现;
ArrayBlockingQueue
blockingqueue
public interface BlockingQueue<E> extends Queue<E> {/*** 插入数据到队列尾部(如果立即可行且不会超过该队列的容量)* 在成功时返回 true,如果此队列已满,则抛IllegalStateException。(与offer方法的区别)*/boolean add(E e);/*** 插入数据到队列尾部,如果没有空间,直接返回false;* 有空间直接插入,返回true。*/boolean offer(E e);/*** 插入数据到队列尾部,如果队列没有空间,一直阻塞;* 有空间直接插入。*/void put(E e) throws InterruptedException;/*** 插入数据到队列尾部,如果没有额外的空间,等待一定的时间,有空间即插入,返回true,* 到时间了,还是没有额外空间,返回false。*/boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;/*** 取出和删除队列中的头元素,如果没有数据,会一直阻塞到有数据*/E take() throws InterruptedException;/*** 取出和删除队列中的头元素,如果没有数据,需要会阻塞一定的时间,过期了还没有数据,返回null*/E poll(long timeout, TimeUnit unit)throws InterruptedException;//除了上述方法还有继承自Queue接口的方法/*** 取出和删除队列头元素,如果是空队列直接返回null。*/E poll();/*** 取出但不删除头元素,该方法与peek方法的区别是当队列为空时会抛出NoSuchElementException异常*/E element();/*** 取出但不删除头元素,空队列直接返回null*/E peek();/*** 返回队列总额外的空间*/int remainingCapacity();/*** 删除队列中存在的元素*/boolean remove(Object o);/*** 判断队列中是否存在当前元素*/boolean contains(Object o);}
arrayblockingqueue
ArrayBlockingQueue() 是一个用数组实现的有界阻塞队列,内部按先进先出的原则对元素进行排序; 其中 put 方法和 take 方法为添加和删除元素的阻塞方法。
ArrayBlockingQueue 内部的阻塞队列是通过 ReentrantLock 和 Condition 条件队列实现的, 所以 ArrayBlockingQueue 中的元素存在公平和非公平访问的区别,这是因为 ReentrantLock 里面存在公平锁和非公平锁的原因, ReentrantLock 的具体分析会在 Lock 章节进行具体分析的; 对于 Lock 是公平锁的时候, 则被阻塞的队列可以按照阻塞的先后顺序访问队列,Lock 是非公平锁的时候, 阻塞的线程将进入争夺锁资源的过程中,谁先抢到锁就可以先执行,没有固定的先后顺序。
构造方法
/*** 创建一个具体容量的队列,默认是非公平队列*/public ArrayBlockingQueue(int capacity) {this(capacity, false);}/*** 创建一个具体容量、是否公平的队列*/public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
//返回队列剩余容量public int remainingCapacity()// 判断队列中是否存在当前元素opublic boolean contains(Object o)// 返回一个按正确顺序,包含队列中所有元素的数组public Object[] toArray()// 返回一个按正确顺序,包含队列中所有元素的数组;数组的运行时类型是指定数组的运行时类型@SuppressWarnings("unchecked")public <T> T[] toArray(T[] a)// 自动清空队列中的所有元素public void clear()// 移除队列中所有可用元素,并将他们加入到给定的 Collection 中public int drainTo(Collection<? super E> c)// 从队列中最多移除指定数量的可用元素,并将他们加入到给定的 Collection 中public int drainTo(Collection<? super E> c, int maxElements)// 返回此队列中按正确顺序进行迭代的,包含所有元素的迭代器public Iterator<E> iterator()
成员变量
public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 存储数据的数组 */final Object[] items;/** 获取数据的索引,用于下次 take, poll, peek or remove 等方法 */int takeIndex;/** 添加元素的索引, 用于下次 put, offer, or add 方法 */int putIndex;/** 队列元素的个数 */int count;/** 并发控制使用任何教科书中的经典双条件算法*//** 控制并发访问的锁 */final ReentrantLock lock;/** 非空条件对象,用于通知 take 方法中在等待获取数据的线程,队列中已有数据,可以执行获取操作 */private final Condition notEmpty;/** 未满条件对象,用于通知 put 方法中在等待添加数据的线程,队列未满,可以执行添加操作 */private final Condition notFull;/** 迭代器 */transient Itrs itrs = null;}
对于 notEmpty 条件对象是用于存放等待调用(此时队列中没有数据) take 方法的线程,这些线程会加入到 notEmpty 条件对象的等待队列(单链表)中,同时当队列中有数据后会通过 notEmpty 条件对象唤醒等待队列(链表)中等待的线程(链表中第一个non-null 且 status 为 Condition的线程)去 take 数据。
对于 notFull 条件对象是用于存放等待调用(此时队列容量已满) put 方法的线程,这些线程会加入到 notFull 条件对象的等待队列(单链表)中,同时当队列中数据被消费后会通过 notFull 条件对象唤醒等待队列(链表)中等待的线程去 put 数据。takeIndex 表示的是下一个(take、poll、peek、remove)方法被调用时获取数组元素的索引,putIndex 表示的是下一个(put、offer、add)被调用时添加元素的索引。
添加方法
/*** 在当前 put 位置插入数据,put 位置前进一位,* 同时唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去 take 数据。* 当然这一系列动作只有该线程获取锁的时候才能进行,即只有获取锁的线程* 才能执行 enqueue 操作。*/// 元素统一入队操作private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x; // putIndex 位置添加数据//putIndex 进行自增,当达到数组长度的时候,putIndex 重头再来,即设置为0if (++putIndex == items.length)putIndex = 0;count++; //元素个数自增notEmpty.signal(); //添加完数据后,说明数组中有数据了,所以可以唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去 take 数据}// 添加数据,数组中元素已满时,直接返回 false。public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;// 获取锁,保证线程安全lock.lock();try {// 当数组元素个数已满时,直接返回falseif (count == items.length)return false;else {// 执行入队操作,enqueue 方法在上面分析了enqueue(e);return true;}} finally {// 释放锁,保证其他等待锁的线程可以获取到锁// 为什么放到 finally (避免死锁)lock.unlock();}}// add 方法其实就是调用了 offer 方法来实现,// 与 offer 方法的区别就是 offer 方法数组满,抛出 IllegalStateException 异常。public boolean add(E e) {if (offer(e))return true;elsethrow new IllegalStateException("Queue full");}
阻塞添加方法put
/*** 插入数据到队列尾部,如果队列已满,阻塞等待空间*/public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;// 获取锁,期间线程可以打断,打断则不会添加lock.lockInterruptibly();try {// 通过上述分析,我们通过 count 来判断数组中元素个数while (count == items.length)notFull.await(); // 元素已满,线程挂起,线程加入 notFull 条件对象等待队列(链表)中,等待被唤醒enqueue(e); // 队列未满,直接执行入队操作} finally {lock.unlock();}}
LinkedBlockingQueue
构造方法
public LinkedBlockingQueue() {this(Integer.MAX_VALUE);}public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;last = head = new Node<E>(null);}public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);//调用第二个构造方法,传入的capacity是Int的最大值,可以说是一个无界队列。final ReentrantLock putLock = this.putLock;putLock.lock(); //开启排他锁try {int n = 0;//用于记录LinkedBlockingQueue的size//循环传入的c集合for (E e : c) {if (e == null)//如果e==null,则抛出空指针异常throw new NullPointerException();if (n == capacity)//如果n==capacity,说明到了最大的容量,则抛出“Queue full”异常throw new IllegalStateException("Queue full");enqueue(new Node<E>(e));//入队操作++n;//n自增}count.set(n);//设置count} finally {putLock.unlock();//释放排他锁}}
node
static class Node<E> {E item;Node<E> next;Node(E x) { item = x; }}
put
public void put(E e) throws InterruptedException {// 值不为空if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.//int c = -1;// 新生结点Node<E> node = new Node<E>(e);// 存元素锁final ReentrantLock putLock = this.putLock;// 元素个数final AtomicInteger count = this.count;// 如果当前线程未被中断,则获取锁putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*/while (count.get() == capacity) { // 元素个数到达指定容量// 在notFull条件上进行等待notFull.await();}// 入队列enqueue(node);// 更新元素个数,返回的是以前的元素个数(c从-1开始的)c = count.getAndIncrement();if (c + 1 < capacity) // 元素个数是否小于容量// 唤醒在notFull条件上等待的某个线程notFull.signal();} finally {// 释放锁putLock.unlock();}if (c == 0)// 元素个数为0,表示已有take线程在notEmpty条件上进入了等待,//则需要唤醒在notEmpty条件上等待的线程signalNotEmpty();}private void signalNotEmpty() {// 取元素锁final ReentrantLock takeLock = this.takeLock;// 获取锁takeLock.lock();try {// 唤醒在notEmpty条件上等待的某个线程notEmpty.signal();} finally {// 释放锁takeLock.unlock();}}
SynchronousQueue
SynchronousQueue是一个比较特别的队列,此队列源码中充斥着大量的CAS语句,在线程池方面有所应用。
SynchronousQueue 的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。
SynchronousQueue采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,SynchronousQueue的put、take操作都是委托这两个类来实现的。
public void put(E e) throws InterruptedException {// 若插入的数据是null,则直接抛出NullPointerException异常if (e == null) throw new NullPointerException();// 调用transfer方法if (transferer.transfer(e, false, 0) == null) {Thread.interrupted();throw new InterruptedException();}}public E take() throws InterruptedException {// 调用transfer方法E e = transferer.transfer(null, false, 0);// 若值不为null,则直接返回if (e != null)return e;Thread.interrupted();throw new InterruptedException();}
TransferQueue
static final class TransferQueue<E> extends Transferer<E>{/** Head of queue */transient volatile QNode head;/** Tail of queue */transient volatile QNode tail;/*** Reference to a cancelled node that might not yet have been* unlinked from queue because it was the last inserted node* when it was cancelled.*/transient volatile QNode cleanMe;TransferQueue() {QNode h = new QNode(null, false); // initialize to dummy node.head = h;tail = h;}}//它使用队列作为交易媒介,来实现公平交易,TransferQueue使用QNode类来作为队列节点:static final class QNode {// 指向下一个节点volatile QNode next; // next node in queue// item数据项volatile Object item; // CAS'ed to or from null// 等待线程volatile Thread waiter; // to control park/unpark// 是否为数据的标识final boolean isData;...}
TransferStack
static final class TransferStack<E> extends Transferstatic final class SNode {// 指向栈中的下一个节点volatile SNode next; // next node in stack// 匹配节点volatile SNode match; // the node matched to this// 等待线程volatile Thread waiter; // to control park/unpark// item数据线Object item; // data; or null for REQUESTs// 节点状态int mode;...}/** Node represents an unfulfilled consumer */static final int REQUEST = 0;/** Node represents an unfulfilled producer */static final int DATA = 1;/** Node is fulfilling another unfulfilled DATA or REQUEST */static final int FULFILLING = 2;//REQUEST:表示了一个请求交易但是没有得到匹配的消费者//DATA:表示一个请求交易但是没有交付数据的生产者//FULFILLING:表示正在进行交易的生产者或者消费者
从源码中可以看到,这两个方法都会调用transfer方法,其中,put方法传递的是e参数,所以模式为数据(公平isData = true,非公平mode= DATA),而take方法传递的是null,所以模式为请求(公平isData = false,非公平mode = REQUEST)。
公平模式(queue)
E transfer(E e, boolean timed, long nanos) {QNode s = null; // constructed/reused as needed// 获取当前节点的模式boolean isData = (e != null);for (;;) {QNode t = tail;QNode h = head;// 队列没有初始化,自旋if (t == null || h == null) // saw uninitialized valuecontinue; // spin// 头尾节点相等(队列为null),或者当前节点和队列尾节点具有相同的交易类型// 将节点添加到队列尾部,并且等待匹配if (h == t || t.isData == isData) { // empty or same-modeQNode tn = t.next;// t != tail表明已有其他线程修改了tail,当前线程需要重新再来if (t != tail) // inconsistent readcontinue;// 若尾节点的后继节点不为null,则表明已经有其他线程添加了节点,更新尾节点if (tn != null) { // lagging tailadvanceTail(t, tn);continue;}// 超时if (timed && nanos <= 0) // can't waitreturn null;// s == null,则创建一个新节点if (s == null)s = new QNode(e, isData);// 将新节点加入到队列中,如果不成功,继续处理if (!t.casNext(null, s)) // failed to link incontinue;// 更新尾节点advanceTail(t, s); // swing tail and wait// 调用awaitFulfill方法,若节点是head.next,则进行自旋// 否则,直接阻塞,直到有其他线程与之匹配,或它自己进行线程的中断Object x = awaitFulfill(s, e, timed, nanos);// 若返回的x == s表示,当前线程已经超时或者中断,不然的话s == null或者是匹配的节点if (x == s) { // wait was cancelledclean(t, s);return null;}// 若s节点还没有从队列删除if (!s.isOffList()) { // not already unlinked// 尝试将s节点设置为head,移出tadvanceHead(t, s); // unlink if headif (x != null) // and forget fieldss.item = s;s.waiter = null;}return (x != null) ? (E)x : e;}// 这里是从head.next开始,因为TransferQueue总是会存在一个dummy节点else { // complementary-modeQNode m = h.next; // node to fulfill// 不一致读,表明有其他线程修改了队列if (t != tail || m == null || h != head)continue; // inconsistent readObject x = m.item;// isData == (x != null):判断isData与x的模式是否相同,相同表示已经匹配了// x == m :m节点被取消了// !m.casItem(x, e):如果尝试将数据e设置到m上失败if (isData == (x != null) || // m already fulfilledx == m || // m cancelled!m.casItem(x, e)) { // lost CAS// 将m设置为头结点,h出列,然后重试advanceHead(h, m); // dequeue and retrycontinue;}// 成功匹配了,m设置为头结点h出列,向前推进advanceHead(h, m); // successfully fulfilled// 唤醒m的等待线程LockSupport.unpark(m.waiter);return (x != null) ? (E)x : e;}}}
https://blog.csdn.net/qq_38293564/article/details/80604194
…….
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认情况下元素采用自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue也是基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。
PriorityBlockingQueue有四个构造方法:
// 默认的构造方法,该方法会调用this(DEFAULT_INITIAL_CAPACITY, null),即默认的容量是11
public PriorityBlockingQueue()
// 根据initialCapacity来设置队列的初始容量
public PriorityBlockingQueue(int initialCapacity)
// 根据initialCapacity来设置队列的初始容量,并根据comparator对象来对数据进行排序
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
// 根据集合来创建队列
public PriorityBlockingQueue(Collection<? extends E> c)
public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {private static final long serialVersionUID = 5595510919245408276L;private static final int DEFAULT_INITIAL_CAPACITY = 11;private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;private transient Object[] queue;private transient int size;private transient Comparator<? super E> comparator;private final ReentrantLock lock;private final Condition notEmpty;private transient volatile int allocationSpinLock;//扩容时候用到,自旋锁private PriorityQueue<E> q;//数组实现的最小堆,writeObject和readObject用到。//为了兼容之前的版本,只有在序列化和反序列化才非空public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) {if (initialCapacity < 1)throw new IllegalArgumentException();this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();this.comparator = comparator;this.queue = new Object[initialCapacity];//构造函数没有初始化allocationSpinLock,q}public PriorityBlockingQueue(Collection<? extends E> c) {this.lock = new ReentrantLock();this.notEmpty = lock.newCondition();boolean heapify = true; // true if not known to be in heap orderboolean screen = true; // true if must screen for nullsif (c instanceof SortedSet<?>) {// 如果传入集合是有序集,则无须进行堆有序化SortedSet<? extends E> ss = (SortedSet<? extends E>) c;this.comparator = (Comparator<? super E>) ss.comparator();heapify = false;//不需要重建堆}// 如果传入集合是PriorityBlockingQueue类型,则不进行堆有序化else if (c instanceof PriorityBlockingQueue<?>) {PriorityBlockingQueue<? extends E> pq =(PriorityBlockingQueue<? extends E>) c;this.comparator = (Comparator<? super E>) pq.comparator();screen = false;if (pq.getClass() == PriorityBlockingQueue.class) // exact matchheapify = false;//不需要重建堆}Object[] a = c.toArray();int n = a.length;// If c.toArray incorrectly doesn't return Object[], copy it.if (a.getClass() != Object[].class)a = Arrays.copyOf(a, n, Object[].class);if (screen && (n == 1 || this.comparator != null)) {for (int i = 0; i < n; ++i)if (a[i] == null)throw new NullPointerException();}this.queue = a;this.size = n;if (heapify)heapify();//重建堆}private void removeAt(int i) {Object[] array = queue;int n = size - 1;if (n == i) // removed last elementarray[i] = null;else {E moved = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(i, moved, array, n);elsesiftDownUsingComparator(i, moved, array, n, cmp);if (array[i] == moved) {if (cmp == null)siftUpComparable(i, moved, array);elsesiftUpUsingComparator(i, moved, array, cmp);}}size = n;}private static <T> void siftDownComparable(int k, T x, Object[] array,int n) {//元素x放到k的位置if (n > 0) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // loop while a non-leafwhile (k < half) {int child = (k << 1) + 1; // assume left child is leastObject c = array[child];int right = child + 1;if (right < n &&((Comparable<? super T>) c).compareTo((T) array[right]) > 0)c = array[child = right];if (key.compareTo((T) c) <= 0)//比子节点小就不动,小堆break;array[k] = c;k = child;}array[k] = key;}}private static <T> void siftUpComparable(int k, T x, Object[] array) {//元素x放到k的位置Comparable<? super T> key = (Comparable<? super T>) x;while (k > 0) {int parent = (k - 1) >>> 1;Object e = array[parent];if (key.compareTo((T) e) >= 0)//比父亲大就不动,小堆break;array[k] = e;k = parent;}array[k] = key;}public boolean offer(E e) {if (e == null)// 若插入的元素为null,则直接抛出NullPointerException异常throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);//准备放在最后size位置处elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();// 唤醒等待在空上的线程} finally {lock.unlock();}return true;}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null && nanos > 0)nanos = notEmpty.awaitNanos(nanos);} finally {lock.unlock();}return result;}public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return (size == 0) ? null : (E) queue[0];} finally {lock.unlock();}}public int size() {final ReentrantLock lock = this.lock;lock.lock();try {return size;} finally {lock.unlock();}}private int indexOf(Object o) {if (o != null) {Object[] array = queue;int n = size;for (int i = 0; i < n; i++)if (o.equals(array[i]))return i;}return -1;}public boolean remove(Object o) {final ReentrantLock lock = this.lock;lock.lock();try {int i = indexOf(o);if (i == -1)return false;removeAt(i);return true;} finally {lock.unlock();}}public boolean contains(Object o) {final ReentrantLock lock = this.lock;lock.lock();try {return indexOf(o) != -1;} finally {lock.unlock();}}private E dequeue() {int n = size - 1;if (n < 0)return null;else {Object[] array = queue;E result = (E) array[0];E x = (E) array[n];array[n] = null;Comparator<? super E> cmp = comparator;if (cmp == null)siftDownComparable(0, x, array, n);elsesiftDownUsingComparator(0, x, array, n, cmp);size = n;return result;}}private void heapify() {Object[] array = queue;int n = size;int half = (n >>> 1) - 1;Comparator<? super E> cmp = comparator;if (cmp == null) {for (int i = half; i >= 0; i--)siftDownComparable(i, (E) array[i], array, n);//数组重建为堆}else {for (int i = half; i >= 0; i--)siftDownUsingComparator(i, (E) array[i], array, n, cmp);}}public void clear() {final ReentrantLock lock = this.lock;lock.lock();try {Object[] array = queue;int n = size;size = 0;for (int i = 0; i < n; i++)array[i] = null;} finally {lock.unlock();}public int drainTo(Collection<? super E> c, int maxElements) {//批量获取元素if (c == null)throw new NullPointerException();if (c == this)throw new IllegalArgumentException();if (maxElements <= 0)return 0;final ReentrantLock lock = this.lock;lock.lock();try {int n = Math.min(size, maxElements);for (int i = 0; i < n; i++) {// 循环遍历,不断弹出队首元素;c.add((E) queue[0]); // In this order, in case add() throws.dequeue();}return n;} finally {lock.unlock();}}}
扩容
private void tryGrow(Object[] array, int oldCap) {//旧数组和容量lock.unlock(); // 释放锁,防止阻塞出队操作Object[] newArray = null;//释放了锁,多个线程可以进来这里,但是只有一个线程可以执行if里面的代码,也就是只有一个线程可以扩容,if (allocationSpinLock == 0 && // 使用CAS操作来修改allocationSpinLockUNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,0, 1)) {try {// 容量越小增长得越快,若容量小于64,则新容量是oldCap * 2 + 2,否则是oldCap * 1.5int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : // grow faster if small(oldCap >> 1));if (newCap - MAX_ARRAY_SIZE > 0) { // 扩容后超过最大容量处理int minCap = oldCap + 1;if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//整数溢出throw new OutOfMemoryError();newCap = MAX_ARRAY_SIZE;}//queue是公共变量,if (newCap > oldCap && queue == array)newArray = new Object[newCap];} finally {// 解锁,因为只有一个线程到此,因而不需要CAS操作;allocationSpinLock = 0;}}//失败扩容的线程newArray == null,调用Thread.yield()让出cpu, 让扩容线程扩容后优先调用lock.lock重新获取锁,//但是这得不到一定的保证,有可能调用Thread.yield()的线程先获取了锁。if (newArray == null)Thread.yield();lock.lock();//有可能扩容的线程先走到这里,也有可能没有扩容的线程先走到这里。//准备赋值给共有变量queue,要加锁,//扩容的线程newArray != null ,没有扩容的线程newArray = nullif (newArray != null && queue == array) {//再次进入while循环去扩容。queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}}private static final sun.misc.Unsafe UNSAFE;private static final long allocationSpinLockOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> k = PriorityBlockingQueue.class;allocationSpinLockOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("allocationSpinLock")); //allocationSpinLock这个字段} catch (Exception e) {throw new Error(e);}}
LinkedTransferQueue
LinkedTransferQueue使用了一个叫做dual data structure的数据结构,或者叫做dual queue,译为双重数据结构或者双重队列。
放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。
放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。
取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。
属性
// 头节点transient volatile Node head;// 尾节点private transient volatile Node tail;// 放取元素的几种方式:// 立即返回,用于非超时的poll()和tryTransfer()方法中private static final int NOW = 0; // for untimed poll, tryTransfer// 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程private static final int ASYNC = 1; // for offer, put, add// 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止private static final int SYNC = 2; // for transfer, take// 超时,用于有超时的poll()和tryTransfer()方法中private static final int TIMED = 3; // for timed poll, tryTransfer
内部类
static final class Node {// 是否是数据节点(也就标识了是生产者还是消费者)final boolean isData; // false if this is a request node// 元素的值volatile Object item; // initially non-null if isData; CASed to match// 下一个节点volatile Node next;// 持有元素的线程volatile Thread waiter; // null until waiting}//构造方法public LinkedTransferQueue() {}public LinkedTransferQueue(Collection<? extends E> c) {this();addAll(c);}
只有这两个构造方法,且没有初始容量,所以是无界的一个阻塞队列。
xfer方法
private E xfer(E e, boolean haveData, int how, long nanos) {// 不允许放入空元素if (haveData && (e == null))throw new NullPointerException();Node s = null; // the node to append, if needed// 外层循环,自旋,失败就重试retry:for (;;) { // restart on append race// 下面这个for循环用于控制匹配的过程// 同一时刻队列中只会存储一种类型的节点// 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了// 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止for (Node h = head, p = h; p != null;) { // find & match first node// p节点的模式boolean isData = p.isData;// p节点的值Object item = p.item;// p没有被匹配到if (item != p && (item != null) == isData) { // unmatched// 如果两者模式一样,则不能匹配,跳出循环后尝试入队if (isData == haveData) // can't matchbreak;// 如果两者模式不一样,则尝试匹配// 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)if (p.casItem(item, e)) { // match// 匹配成功// for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的// 看不懂可以直接跳过for (Node q = p; q != h;) {// 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点Node n = q.next; // update by 2 unless singleton// 如果head还没变,就把它更新成新的节点// 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)// 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了// 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了// 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了if (head == h && casHead(h, n == null ? q : n)) {h.forgetNext();break;} // advance and retry// 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试if ((h = head) == null ||(q = h.next) == null || !q.isMatched())break; // unless slack < 2}// 唤醒p中等待的线程LockSupport.unpark(p.waiter);// 并返回匹配到的元素return LinkedTransferQueue.<E>cast(item);}}// p已经被匹配了或者尝试匹配的时候失败了// 也就是其它线程先一步匹配了p// 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己// 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试Node n = p.next;p = (p != n) ? n : (h = head); // Use head if p offlist}// 到这里肯定是队列中存储的节点类型和自己一样// 或者队列中没有元素了// 就入队(不管放元素还是取元素都得入队)// 入队又分成四种情况:// NOW,立即返回,没有匹配到立即返回,不做入队操作// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身// 如果不是立即返回if (how != NOW) { // No matches available// 新建s节点if (s == null)s = new Node(e, haveData);// 尝试入队Node pred = tryAppend(s, haveData);// 入队失败,重试if (pred == null)continue retry; // lost race vs opposite mode// 如果不是异步(同步或者有超时)// 就等待被匹配if (how != ASYNC)return awaitMatch(s, pred, e, (how == TIMED), nanos);}return e; // not waiting}}private Node tryAppend(Node s, boolean haveData) {// 从tail开始遍历,把s放到链表尾端for (Node t = tail, p = t;;) { // move p to last node and appendNode n, u; // temps for reads of next & tail// 如果首尾都是null,说明链表中还没有元素if (p == null && (p = head) == null) {// 就让首节点指向s// 注意,这里插入第一个元素的时候tail指针并没有指向sif (casHead(null, s))return s; // initialize}else if (p.cannotPrecede(haveData))// 如果p无法处理,则返回null// 这里无法处理的意思是,p和s节点的类型不一样,不允许s入队// 比如,其它线程先入队了一个数据节点,这时候要入队一个非数据节点,就不允许,// 队列中所有的元素都要保证是同一种类型的节点// 返回null后外面的方法会重新尝试匹配重新入队等return null; // lost race vs opposite modeelse if ((n = p.next) != null) // not last; keep traversing// 如果p的next不为空,说明不是最后一个节点// 则让p重新指向最后一个节点p = p != t && t != (u = tail) ? (t = u) : // stale tail(p != n) ? n : null; // restart if off listelse if (!p.casNext(null, s))// 如果CAS更新s为p的next失败// 则说明有其它线程先一步更新到p的next了// 就让p指向p的next,重新尝试让s入队p = p.next; // re-read on CAS failureelse {// 到这里说明s成功入队了// 如果p不等于t,就更新tail指针// 还记得上面插入第一个元素时tail指针并没有指向新元素吗?// 这里就是用来更新tail指针的if (p != t) { // update if slack now >= 2while ((tail != t || !casTail(t, s)) &&(t = tail) != null &&(s = t.next) != null && // advance and retry(s = s.next) != null && s != t);}// 返回p,即s的前一个元素return p;}}}private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {// 如果是有超时的,计算其超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;// 当前线程Thread w = Thread.currentThread();// 自旋次数int spins = -1; // initialized after first item and cancel checks// 随机数,随机让一些自旋的线程让出CPUThreadLocalRandom randomYields = null; // bound if neededfor (;;) {Object item = s.item;// 如果s元素的值不等于e,说明它被匹配到了if (item != e) { // matched// assert item != s;// 把s的item更新为s本身// 并把s中的waiter置为空s.forgetContents(); // avoid garbage// 返回匹配到的元素return LinkedTransferQueue.<E>cast(item);}// 如果当前线程中断了,或者有超时的到期了// 就更新s的元素值指向s本身if ((w.isInterrupted() || (timed && nanos <= 0)) &&s.casItem(e, s)) { // cancel// 尝试解除s与其前一个节点的关系// 也就是删除s节点unsplice(pred, s);// 返回元素的值本身,说明没匹配到return e;}// 如果自旋次数小于0,就计算自旋次数if (spins < 0) { // establish spins at/near front// spinsFor()计算自旋次数// 如果前面有节点未被匹配就返回0// 如果前面有节点且正在匹配中就返回一定的次数,等待if ((spins = spinsFor(pred, s.isData)) > 0)// 初始化随机数randomYields = ThreadLocalRandom.current();}else if (spins > 0) { // spin// 还有自旋次数就减1--spins;// 并随机让出CPUif (randomYields.nextInt(CHAINED_SPINS) == 0)Thread.yield(); // occasionally yield}else if (s.waiter == null) {// 更新s的waiter为当前线程s.waiter = w; // request unpark then recheck}else if (timed) {// 如果有超时,计算超时时间,并阻塞一定时间nanos = deadline - System.nanoTime();if (nanos > 0L)LockSupport.parkNanos(this, nanos);}else {// 不是超时的,直接阻塞,等待被唤醒// 唤醒后进入下一次循环,走第一个if的逻辑就返回匹配的元素了LockSupport.park(this);}}}
DelayQueue
https://www.cnblogs.com/tong-yuan/p/DelayQueue.html
