PriorityQueue
优先级队列,是0个或多个元素的集合,集合中的每个元素都有一个权重值,每次出队都弹出优先级最大或最小的元素。一般来说,优先级队列使用堆来实现。
属性
// 默认容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 存储元素的地方
transient Object[] queue; // non-private to simplify nested class access
// 元素个数
private int size = 0;
// 比较器
private final Comparator<? super E> comparator;
// 修改次数
transient int modCount = 0; // non-private to simplify nested class access
入队
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
// 不支持null元素
if (e == null)
throw new NullPointerException();
modCount++;
// 取size
int i = size;
// 元素个数达到最大容量了,扩容
if (i >= queue.length)
grow(i + 1);
// 元素个数加1
size = i + 1;
// 如果还没有元素
// 直接插入到数组第一个位置
// 这里跟我们之前讲堆不一样了
// java里面是从0开始的
// 我们说的堆是从1开始的
if (i == 0)
queue[0] = e;
else
// 否则,插入元素到数组size的位置,也就是最后一个元素的下一位
// 注意这里的size不是数组大小,而是元素个数
// 然后,再做自下而上的堆化
siftUp(i, e);
return true;
}
private void siftUp(int k, E x) {
// 根据是否有比较器,使用不同的方法
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
// 找到父节点的位置
// 因为元素是从0开始的,所以减1之后再除以2
int parent = (k - 1) >>> 1;
// 父节点的值
Object e = queue[parent];
// 比较插入的元素与父节点的值
// 如果比父节点大,则跳出循环
// 否则交换位置
if (key.compareTo((E) e) >= 0)
break;
// 与父节点交换位置
queue[k] = e;
// 现在插入的元素位置移到了父节点的位置
// 继续与父节点再比较
k = parent;
}
// 最后找到应该插入的位置,放入元素
queue[k] = key;
}
(1)入队不允许null元素;
(2)如果数组不够用了,先扩容;
(3)如果还没有元素,就插入下标0的位置;
(4)如果有元素了,就插入到最后一个元素往后的一个位置;
(5)自下而上堆化,一直往上跟父节点比较;
(6)如果比父节点小,就与父节点交换位置,直到出现比父节点大为止;
(7)由此可见,PriorityQueue是一个小顶堆。
扩容
private void grow(int minCapacity) {
// 旧容量
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
// 旧容量小于64时,容量翻倍
// 旧容量大于等于64,容量只增加旧容量的一半
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
// overflow-conscious code
// 检查是否溢出
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// 创建出一个新容量大小的新数组并把旧数组元素拷贝过去
queue = Arrays.copyOf(queue, newCapacity);
}
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}
出队
public E remove() {
// 调用poll弹出队首元素
E x = poll();
if (x != null)
// 有元素就返回弹出的元素
return x;
else
// 没有元素就抛出异常
throw new NoSuchElementException();
}
@SuppressWarnings("unchecked")
public E poll() {
// 如果size为0,说明没有元素
if (size == 0)
return null;
// 弹出元素,元素个数减1
int s = --size;
modCount++;
// 队列首元素
E result = (E) queue[0];
// 队列末元素
E x = (E) queue[s];
// 将队列末元素删除
queue[s] = null;
// 如果弹出元素后还有元素
if (s != 0)
// 将队列末元素移到队列首
// 再做自上而下的堆化
siftDown(0, x);
// 返回弹出的元素
return result;
}
private void siftDown(int k, E x) {
// 根据是否有比较器,选择不同的方法
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
// 只需要比较一半就行了,因为叶子节点占了一半的元素
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
// 寻找子节点的位置,这里加1是因为元素从0号位置开始
int child = (k << 1) + 1; // assume left child is least
// 左子节点的值
Object c = queue[child];
// 右子节点的位置
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
// 左右节点取其小者
c = queue[child = right];
// 如果比子节点都小,则结束
if (key.compareTo((E) c) <= 0)
break;
// 如果比最小的子节点大,则交换位置
queue[k] = c;
// 指针移到最小子节点的位置继续往下比较
k = child;
}
// 找到正确的位置,放入元素
queue[k] = key;
}
(1)PriorityQueue是一个小顶堆;
(2)PriorityQueue是非线程安全的;
(3)PriorityQueue不是有序的,只有堆顶存储着最小的元素;
(4)入队就是堆的插入元素的实现;
(5)出队就是堆的删除元素的实现;
ArrayBlockingQueue
blockingqueue
public interface BlockingQueue<E> extends Queue<E> {
/**
* 插入数据到队列尾部(如果立即可行且不会超过该队列的容量)
* 在成功时返回 true,如果此队列已满,则抛IllegalStateException。(与offer方法的区别)
*/
boolean add(E e);
/**
* 插入数据到队列尾部,如果没有空间,直接返回false;
* 有空间直接插入,返回true。
*/
boolean offer(E e);
/**
* 插入数据到队列尾部,如果队列没有空间,一直阻塞;
* 有空间直接插入。
*/
void put(E e) throws InterruptedException;
/**
* 插入数据到队列尾部,如果没有额外的空间,等待一定的时间,有空间即插入,返回true,
* 到时间了,还是没有额外空间,返回false。
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 取出和删除队列中的头元素,如果没有数据,会一直阻塞到有数据
*/
E take() throws InterruptedException;
/**
* 取出和删除队列中的头元素,如果没有数据,需要会阻塞一定的时间,过期了还没有数据,返回null
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//除了上述方法还有继承自Queue接口的方法
/**
* 取出和删除队列头元素,如果是空队列直接返回null。
*/
E poll();
/**
* 取出但不删除头元素,该方法与peek方法的区别是当队列为空时会抛出NoSuchElementException异常
*/
E element();
/**
* 取出但不删除头元素,空队列直接返回null
*/
E peek();
/**
* 返回队列总额外的空间
*/
int remainingCapacity();
/**
* 删除队列中存在的元素
*/
boolean remove(Object o);
/**
* 判断队列中是否存在当前元素
*/
boolean contains(Object o);
}
arrayblockingqueue
ArrayBlockingQueue() 是一个用数组实现的有界阻塞队列,内部按先进先出的原则对元素进行排序; 其中 put
方法和 take
方法为添加和删除元素的阻塞方法。
ArrayBlockingQueue 内部的阻塞队列是通过 ReentrantLock 和 Condition 条件队列实现的, 所以 ArrayBlockingQueue 中的元素存在公平和非公平访问的区别,这是因为 ReentrantLock 里面存在公平锁和非公平锁的原因, ReentrantLock 的具体分析会在 Lock 章节进行具体分析的; 对于 Lock 是公平锁的时候, 则被阻塞的队列可以按照阻塞的先后顺序访问队列,Lock 是非公平锁的时候, 阻塞的线程将进入争夺锁资源的过程中,谁先抢到锁就可以先执行,没有固定的先后顺序。
构造方法
/**
* 创建一个具体容量的队列,默认是非公平队列
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 创建一个具体容量、是否公平的队列
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//返回队列剩余容量
public int remainingCapacity()
// 判断队列中是否存在当前元素o
public boolean contains(Object o)
// 返回一个按正确顺序,包含队列中所有元素的数组
public Object[] toArray()
// 返回一个按正确顺序,包含队列中所有元素的数组;数组的运行时类型是指定数组的运行时类型
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a)
// 自动清空队列中的所有元素
public void clear()
// 移除队列中所有可用元素,并将他们加入到给定的 Collection 中
public int drainTo(Collection<? super E> c)
// 从队列中最多移除指定数量的可用元素,并将他们加入到给定的 Collection 中
public int drainTo(Collection<? super E> c, int maxElements)
// 返回此队列中按正确顺序进行迭代的,包含所有元素的迭代器
public Iterator<E> iterator()
成员变量
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;
/** 获取数据的索引,用于下次 take, poll, peek or remove 等方法 */
int takeIndex;
/** 添加元素的索引, 用于下次 put, offer, or add 方法 */
int putIndex;
/** 队列元素的个数 */
int count;
/*
* 并发控制使用任何教科书中的经典双条件算法
*/
/** 控制并发访问的锁 */
final ReentrantLock lock;
/** 非空条件对象,用于通知 take 方法中在等待获取数据的线程,队列中已有数据,可以执行获取操作 */
private final Condition notEmpty;
/** 未满条件对象,用于通知 put 方法中在等待添加数据的线程,队列未满,可以执行添加操作 */
private final Condition notFull;
/** 迭代器 */
transient Itrs itrs = null;
}
对于 notEmpty
条件对象是用于存放等待调用(此时队列中没有数据) take 方法的线程,这些线程会加入到 notEmpty
条件对象的等待队列(单链表)中,同时当队列中有数据后会通过 notEmpty
条件对象唤醒等待队列(链表)中等待的线程(链表中第一个non-null 且 status 为 Condition的线程)去 take 数据。
对于 notFull
条件对象是用于存放等待调用(此时队列容量已满) put 方法的线程,这些线程会加入到 notFull
条件对象的等待队列(单链表)中,同时当队列中数据被消费后会通过 notFull
条件对象唤醒等待队列(链表)中等待的线程去 put 数据。takeIndex 表示的是下一个(take、poll、peek、remove)方法被调用时获取数组元素的索引,putIndex 表示的是下一个(put、offer、add)被调用时添加元素的索引。
添加方法
/**
* 在当前 put 位置插入数据,put 位置前进一位,
* 同时唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去 take 数据。
* 当然这一系列动作只有该线程获取锁的时候才能进行,即只有获取锁的线程
* 才能执行 enqueue 操作。
*/
// 元素统一入队操作
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x; // putIndex 位置添加数据
//putIndex 进行自增,当达到数组长度的时候,putIndex 重头再来,即设置为0
if (++putIndex == items.length)
putIndex = 0;
count++; //元素个数自增
notEmpty.signal(); //添加完数据后,说明数组中有数据了,所以可以唤醒 notEmpty 条件对象等待队列(链表)中第一个可用线程去 take 数据
}
// 添加数据,数组中元素已满时,直接返回 false。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 获取锁,保证线程安全
lock.lock();
try {
// 当数组元素个数已满时,直接返回false
if (count == items.length)
return false;
else {
// 执行入队操作,enqueue 方法在上面分析了
enqueue(e);
return true;
}
} finally {
// 释放锁,保证其他等待锁的线程可以获取到锁
// 为什么放到 finally (避免死锁)
lock.unlock();
}
}
// add 方法其实就是调用了 offer 方法来实现,
// 与 offer 方法的区别就是 offer 方法数组满,抛出 IllegalStateException 异常。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
阻塞添加方法put
/**
* 插入数据到队列尾部,如果队列已满,阻塞等待空间
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 获取锁,期间线程可以打断,打断则不会添加
lock.lockInterruptibly();
try {
// 通过上述分析,我们通过 count 来判断数组中元素个数
while (count == items.length)
notFull.await(); // 元素已满,线程挂起,线程加入 notFull 条件对象等待队列(链表)中,等待被唤醒
enqueue(e); // 队列未满,直接执行入队操作
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
构造方法
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
//调用第二个构造方法,传入的capacity是Int的最大值,可以说是一个无界队列。
final ReentrantLock putLock = this.putLock;
putLock.lock(); //开启排他锁
try {
int n = 0;//用于记录LinkedBlockingQueue的size
//循环传入的c集合
for (E e : c) {
if (e == null)//如果e==null,则抛出空指针异常
throw new NullPointerException();
if (n == capacity)//如果n==capacity,说明到了最大的容量,则抛出“Queue full”异常
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));//入队操作
++n;//n自增
}
count.set(n);//设置count
} finally {
putLock.unlock();//释放排他锁
}
}
node
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
put
public void put(E e) throws InterruptedException {
// 值不为空
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
//
int c = -1;
// 新生结点
Node<E> node = new Node<E>(e);
// 存元素锁
final ReentrantLock putLock = this.putLock;
// 元素个数
final AtomicInteger count = this.count;
// 如果当前线程未被中断,则获取锁
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) { // 元素个数到达指定容量
// 在notFull条件上进行等待
notFull.await();
}
// 入队列
enqueue(node);
// 更新元素个数,返回的是以前的元素个数(c从-1开始的)
c = count.getAndIncrement();
if (c + 1 < capacity) // 元素个数是否小于容量
// 唤醒在notFull条件上等待的某个线程
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
if (c == 0)
// 元素个数为0,表示已有take线程在notEmpty条件上进入了等待,
//则需要唤醒在notEmpty条件上等待的线程
signalNotEmpty();
}
private void signalNotEmpty() {
// 取元素锁
final ReentrantLock takeLock = this.takeLock;
// 获取锁
takeLock.lock();
try {
// 唤醒在notEmpty条件上等待的某个线程
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
}
SynchronousQueue
SynchronousQueue是一个比较特别的队列,此队列源码中充斥着大量的CAS语句,在线程池方面有所应用。
SynchronousQueue 的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。
SynchronousQueue采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,SynchronousQueue的put、take操作都是委托这两个类来实现的。
public void put(E e) throws InterruptedException {
// 若插入的数据是null,则直接抛出NullPointerException异常
if (e == null) throw new NullPointerException();
// 调用transfer方法
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
// 调用transfer方法
E e = transferer.transfer(null, false, 0);
// 若值不为null,则直接返回
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
TransferQueue
static final class TransferQueue<E> extends Transferer<E>{
/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
transient volatile QNode cleanMe;
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
}
//它使用队列作为交易媒介,来实现公平交易,TransferQueue使用QNode类来作为队列节点:
static final class QNode {
// 指向下一个节点
volatile QNode next; // next node in queue
// item数据项
volatile Object item; // CAS'ed to or from null
// 等待线程
volatile Thread waiter; // to control park/unpark
// 是否为数据的标识
final boolean isData;
...
}
TransferStack
static final class TransferStack<E> extends Transfer
static final class SNode {
// 指向栈中的下一个节点
volatile SNode next; // next node in stack
// 匹配节点
volatile SNode match; // the node matched to this
// 等待线程
volatile Thread waiter; // to control park/unpark
// item数据线
Object item; // data; or null for REQUESTs
// 节点状态
int mode;
...
}
/** Node represents an unfulfilled consumer */
static final int REQUEST = 0;
/** Node represents an unfulfilled producer */
static final int DATA = 1;
/** Node is fulfilling another unfulfilled DATA or REQUEST */
static final int FULFILLING = 2;
//REQUEST:表示了一个请求交易但是没有得到匹配的消费者
//DATA:表示一个请求交易但是没有交付数据的生产者
//FULFILLING:表示正在进行交易的生产者或者消费者
从源码中可以看到,这两个方法都会调用transfer方法,其中,put方法传递的是e参数,所以模式为数据(公平isData = true,非公平mode= DATA),而take方法传递的是null,所以模式为请求(公平isData = false,非公平mode = REQUEST)。
公平模式(queue)
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// 获取当前节点的模式
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 队列没有初始化,自旋
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 头尾节点相等(队列为null),或者当前节点和队列尾节点具有相同的交易类型
// 将节点添加到队列尾部,并且等待匹配
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// t != tail表明已有其他线程修改了tail,当前线程需要重新再来
if (t != tail) // inconsistent read
continue;
// 若尾节点的后继节点不为null,则表明已经有其他线程添加了节点,更新尾节点
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
// 超时
if (timed && nanos <= 0) // can't wait
return null;
// s == null,则创建一个新节点
if (s == null)
s = new QNode(e, isData);
// 将新节点加入到队列中,如果不成功,继续处理
if (!t.casNext(null, s)) // failed to link in
continue;
// 更新尾节点
advanceTail(t, s); // swing tail and wait
// 调用awaitFulfill方法,若节点是head.next,则进行自旋
// 否则,直接阻塞,直到有其他线程与之匹配,或它自己进行线程的中断
Object x = awaitFulfill(s, e, timed, nanos);
// 若返回的x == s表示,当前线程已经超时或者中断,不然的话s == null或者是匹配的节点
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
// 若s节点还没有从队列删除
if (!s.isOffList()) { // not already unlinked
// 尝试将s节点设置为head,移出t
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
}
// 这里是从head.next开始,因为TransferQueue总是会存在一个dummy节点
else { // complementary-mode
QNode m = h.next; // node to fulfill
// 不一致读,表明有其他线程修改了队列
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
// isData == (x != null):判断isData与x的模式是否相同,相同表示已经匹配了
// x == m :m节点被取消了
// !m.casItem(x, e):如果尝试将数据e设置到m上失败
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
// 将m设置为头结点,h出列,然后重试
advanceHead(h, m); // dequeue and retry
continue;
}
// 成功匹配了,m设置为头结点h出列,向前推进
advanceHead(h, m); // successfully fulfilled
// 唤醒m的等待线程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
https://blog.csdn.net/qq_38293564/article/details/80604194
…….
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,直到系统资源耗尽。默认情况下元素采用自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。但需要注意的是不能保证同优先级元素的顺序。PriorityBlockingQueue也是基于最小二叉堆实现,使用基于CAS实现的自旋锁来控制队列的动态扩容,保证了扩容操作不会阻塞take操作的执行。
PriorityBlockingQueue有四个构造方法:
// 默认的构造方法,该方法会调用this(DEFAULT_INITIAL_CAPACITY, null),即默认的容量是11
public PriorityBlockingQueue()
// 根据initialCapacity来设置队列的初始容量
public PriorityBlockingQueue(int initialCapacity)
// 根据initialCapacity来设置队列的初始容量,并根据comparator对象来对数据进行排序
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
// 根据集合来创建队列
public PriorityBlockingQueue(Collection<? extends E> c)
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = 5595510919245408276L;
private static final int DEFAULT_INITIAL_CAPACITY = 11;
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private transient Object[] queue;
private transient int size;
private transient Comparator<? super E> comparator;
private final ReentrantLock lock;
private final Condition notEmpty;
private transient volatile int allocationSpinLock;//扩容时候用到,自旋锁
private PriorityQueue<E> q;//数组实现的最小堆,writeObject和readObject用到。
//为了兼容之前的版本,只有在序列化和反序列化才非空
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
//构造函数没有初始化allocationSpinLock,q
}
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {// 如果传入集合是有序集,则无须进行堆有序化
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;//不需要重建堆
}// 如果传入集合是PriorityBlockingQueue类型,则不进行堆有序化
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;//不需要重建堆
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();//重建堆
}
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
if (n == i) // removed last element
array[i] = null;
else {
E moved = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp);
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {//元素x放到k的位置
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)//比子节点小就不动,小堆
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {//元素x放到k的位置
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)//比父亲大就不动,小堆
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
public boolean offer(E e) {
if (e == null)// 若插入的元素为null,则直接抛出NullPointerException异常
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);//准备放在最后size位置处
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();// 唤醒等待在空上的线程
} finally {
lock.unlock();
}
return true;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
private int indexOf(Object o) {
if (o != null) {
Object[] array = queue;
int n = size;
for (int i = 0; i < n; i++)
if (o.equals(array[i]))
return i;
}
return -1;
}
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(o);
if (i == -1)
return false;
removeAt(i);
return true;
} finally {
lock.unlock();
}
}
public boolean contains(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return indexOf(o) != -1;
} finally {
lock.unlock();
}
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
private void heapify() {
Object[] array = queue;
int n = size;
int half = (n >>> 1) - 1;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
for (int i = half; i >= 0; i--)
siftDownComparable(i, (E) array[i], array, n);//数组重建为堆
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] array = queue;
int n = size;
size = 0;
for (int i = 0; i < n; i++)
array[i] = null;
} finally {
lock.unlock();
}
public int drainTo(Collection<? super E> c, int maxElements) {//批量获取元素
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(size, maxElements);
for (int i = 0; i < n; i++) {// 循环遍历,不断弹出队首元素;
c.add((E) queue[0]); // In this order, in case add() throws.
dequeue();
}
return n;
} finally {
lock.unlock();
}
}
}
扩容
private void tryGrow(Object[] array, int oldCap) {//旧数组和容量
lock.unlock(); // 释放锁,防止阻塞出队操作
Object[] newArray = null;
//释放了锁,多个线程可以进来这里,但是只有一个线程可以执行if里面的代码,也就是只有一个线程可以扩容,
if (allocationSpinLock == 0 && // 使用CAS操作来修改allocationSpinLock
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {// 容量越小增长得越快,若容量小于64,则新容量是oldCap * 2 + 2,否则是oldCap * 1.5
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // 扩容后超过最大容量处理
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//整数溢出
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}//queue是公共变量,
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {// 解锁,因为只有一个线程到此,因而不需要CAS操作;
allocationSpinLock = 0;
}
}//失败扩容的线程newArray == null,调用Thread.yield()让出cpu, 让扩容线程扩容后优先调用lock.lock重新获取锁,
//但是这得不到一定的保证,有可能调用Thread.yield()的线程先获取了锁。
if (newArray == null)
Thread.yield();
lock.lock();//有可能扩容的线程先走到这里,也有可能没有扩容的线程先走到这里。
//准备赋值给共有变量queue,要加锁,
//扩容的线程newArray != null ,没有扩容的线程newArray = null
if (newArray != null && queue == array) {//再次进入while循环去扩容。
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
private static final sun.misc.Unsafe UNSAFE;
private static final long allocationSpinLockOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = PriorityBlockingQueue.class;
allocationSpinLockOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("allocationSpinLock")); //allocationSpinLock这个字段
} catch (Exception e) {
throw new Error(e);
}
}
LinkedTransferQueue
LinkedTransferQueue使用了一个叫做dual data structure
的数据结构,或者叫做dual queue
,译为双重数据结构或者双重队列。
放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。
放元素时先跟队列头节点对比,如果头节点是非数据节点,就让他们匹配,如果头节点是数据节点,就生成一个数据节点放在队列尾端(入队)。
取元素时也是先跟队列头节点对比,如果头节点是数据节点,就让他们匹配,如果头节点是非数据节点,就生成一个非数据节点放在队列尾端(入队)。
属性
// 头节点
transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 放取元素的几种方式:
// 立即返回,用于非超时的poll()和tryTransfer()方法中
private static final int NOW = 0; // for untimed poll, tryTransfer
// 异步,不会阻塞,用于放元素时,因为内部使用无界单链表存储元素,不会阻塞放元素的过程
private static final int ASYNC = 1; // for offer, put, add
// 同步,调用的时候如果没有匹配到会阻塞直到匹配到为止
private static final int SYNC = 2; // for transfer, take
// 超时,用于有超时的poll()和tryTransfer()方法中
private static final int TIMED = 3; // for timed poll, tryTransfer
内部类
static final class Node {
// 是否是数据节点(也就标识了是生产者还是消费者)
final boolean isData; // false if this is a request node
// 元素的值
volatile Object item; // initially non-null if isData; CASed to match
// 下一个节点
volatile Node next;
// 持有元素的线程
volatile Thread waiter; // null until waiting
}
//构造方法
public LinkedTransferQueue() {
}
public LinkedTransferQueue(Collection<? extends E> c) {
this();
addAll(c);
}
只有这两个构造方法,且没有初始容量,所以是无界的一个阻塞队列。
xfer方法
private E xfer(E e, boolean haveData, int how, long nanos) {
// 不允许放入空元素
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
// 外层循环,自旋,失败就重试
retry:
for (;;) { // restart on append race
// 下面这个for循环用于控制匹配的过程
// 同一时刻队列中只会存储一种类型的节点
// 从头节点开始尝试匹配,如果头节点被其它线程先一步匹配了
// 就再尝试其下一个,直到匹配到为止,或者到队列中没有元素为止
for (Node h = head, p = h; p != null;) { // find & match first node
// p节点的模式
boolean isData = p.isData;
// p节点的值
Object item = p.item;
// p没有被匹配到
if (item != p && (item != null) == isData) { // unmatched
// 如果两者模式一样,则不能匹配,跳出循环后尝试入队
if (isData == haveData) // can't match
break;
// 如果两者模式不一样,则尝试匹配
// 把p的值设置为e(如果是取元素则e是null,如果是放元素则e是元素值)
if (p.casItem(item, e)) { // match
// 匹配成功
// for里面的逻辑比较复杂,用于控制多线程同时放取元素时出现竞争的情况的
// 看不懂可以直接跳过
for (Node q = p; q != h;) {
// 进入到这里可能是头节点已经被匹配,然后p会变成h的下一个节点
Node n = q.next; // update by 2 unless singleton
// 如果head还没变,就把它更新成新的节点
// 并把它删除(forgetNext()会把它的next设为自己,也就是从单链表中删除了)
// 这时为什么要把head设为n呢?因为到这里了,肯定head本身已经被匹配掉了
// 而上面的p.casItem()又成功了,说明p也被当前这个元素给匹配掉了
// 所以需要把它们俩都出队列,让其它线程可以从真正的头开始,不用重复检查了
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
// 如果新的头节点为空,或者其next为空,或者其next未匹配,就重试
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 唤醒p中等待的线程
LockSupport.unpark(p.waiter);
// 并返回匹配到的元素
return LinkedTransferQueue.<E>cast(item);
}
}
// p已经被匹配了或者尝试匹配的时候失败了
// 也就是其它线程先一步匹配了p
// 这时候又分两种情况,p的next还没来得及修改,p的next指向了自己
// 如果p的next已经指向了自己,就重新取head重试,否则就取其next重试
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 到这里肯定是队列中存储的节点类型和自己一样
// 或者队列中没有元素了
// 就入队(不管放元素还是取元素都得入队)
// 入队又分成四种情况:
// NOW,立即返回,没有匹配到立即返回,不做入队操作
// ASYNC,异步,元素入队但当前线程不会阻塞(相当于无界LinkedBlockingQueue的元素入队)
// SYNC,同步,元素入队后当前线程阻塞,等待被匹配到
// TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
// 如果不是立即返回
if (how != NOW) { // No matches available
// 新建s节点
if (s == null)
s = new Node(e, haveData);
// 尝试入队
Node pred = tryAppend(s, haveData);
// 入队失败,重试
if (pred == null)
continue retry; // lost race vs opposite mode
// 如果不是异步(同步或者有超时)
// 就等待被匹配
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
private Node tryAppend(Node s, boolean haveData) {
// 从tail开始遍历,把s放到链表尾端
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// 如果首尾都是null,说明链表中还没有元素
if (p == null && (p = head) == null) {
// 就让首节点指向s
// 注意,这里插入第一个元素的时候tail指针并没有指向s
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
// 如果p无法处理,则返回null
// 这里无法处理的意思是,p和s节点的类型不一样,不允许s入队
// 比如,其它线程先入队了一个数据节点,这时候要入队一个非数据节点,就不允许,
// 队列中所有的元素都要保证是同一种类型的节点
// 返回null后外面的方法会重新尝试匹配重新入队等
return null; // lost race vs opposite mode
else if ((n = p.next) != null) // not last; keep traversing
// 如果p的next不为空,说明不是最后一个节点
// 则让p重新指向最后一个节点
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))
// 如果CAS更新s为p的next失败
// 则说明有其它线程先一步更新到p的next了
// 就让p指向p的next,重新尝试让s入队
p = p.next; // re-read on CAS failure
else {
// 到这里说明s成功入队了
// 如果p不等于t,就更新tail指针
// 还记得上面插入第一个元素时tail指针并没有指向新元素吗?
// 这里就是用来更新tail指针的
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
// 返回p,即s的前一个元素
return p;
}
}
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 如果是有超时的,计算其超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 自旋次数
int spins = -1; // initialized after first item and cancel checks
// 随机数,随机让一些自旋的线程让出CPU
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 如果s元素的值不等于e,说明它被匹配到了
if (item != e) { // matched
// assert item != s;
// 把s的item更新为s本身
// 并把s中的waiter置为空
s.forgetContents(); // avoid garbage
// 返回匹配到的元素
return LinkedTransferQueue.<E>cast(item);
}
// 如果当前线程中断了,或者有超时的到期了
// 就更新s的元素值指向s本身
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
// 尝试解除s与其前一个节点的关系
// 也就是删除s节点
unsplice(pred, s);
// 返回元素的值本身,说明没匹配到
return e;
}
// 如果自旋次数小于0,就计算自旋次数
if (spins < 0) { // establish spins at/near front
// spinsFor()计算自旋次数
// 如果前面有节点未被匹配就返回0
// 如果前面有节点且正在匹配中就返回一定的次数,等待
if ((spins = spinsFor(pred, s.isData)) > 0)
// 初始化随机数
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
// 还有自旋次数就减1
--spins;
// 并随机让出CPU
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
// 更新s的waiter为当前线程
s.waiter = w; // request unpark then recheck
}
else if (timed) {
// 如果有超时,计算超时时间,并阻塞一定时间
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
// 不是超时的,直接阻塞,等待被唤醒
// 唤醒后进入下一次循环,走第一个if的逻辑就返回匹配的元素了
LockSupport.park(this);
}
}
}
DelayQueue
https://www.cnblogs.com/tong-yuan/p/DelayQueue.html