SynchronousQueue
没有数据缓冲的BlockingQueue
生产者线程对其的插入操作put必须等待消费者的移除操作take
容量为0
每次取数据都会先阻塞,直到数据被放入,同理:每次放输入也会等消费者来取
所做的就是直接传递(direct handoff)
不需要存储,
应用场景
适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
Executors.newCachedThreadPool()就使用了SynchronousQueue,有根据需求创建新的线程,如果有空闲就会重复使用
线程空闲60秒,会被回收
不确定来自生产者请求数量,但这些请求需要很快处理掉,适合为每一个生产者请求分配一个消费线程的处理高效的方法
SynchronousQueue使用
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();
公平非公平,一个是队列结构,一个是栈结构
PriorityBlockingQueue使用
//创建优先级阻塞队列 Comparator为null,自然排序
PriorityBlockingQueue<Integer> queue=new PriorityBlockingQueue<Integer>(5);
//自定义Comparator
PriorityBlockingQueue queue=new PriorityBlockingQueue<Integer>(
5, new Comparator<Integer>() {
// 重写比较方法
@Override
public int compare(Integer o1, Integer o2) {
return o2-o1;
}
}
底层实现:二叉堆
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,
二叉堆又可以分为两个类型:
- 大顶堆:父结点的键值大于或等于任何一个子结点的键值
- 小顶堆:父结点的键值小于或等于任何一个子结点的键值
//Inserts the specified element into this priority queue.
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
//Tries to grow array to accommodate at least one more element
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
//没有定义comparator 就走默认的排序规定
siftUpComparable(n, e, array);
else
//走自定义的比较规则
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
应用场景
根据客户优先级的排队
DelayQueue
支持延时获取元素的阻塞队列, 采用优先队列 PriorityQueue 存储元素,
同时元素必须实现 Delayed 接口。设置延迟时间
延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
无界队列
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();\
//采用priorityQueue存储元素
private final PriorityQueue<E> q = new PriorityQueue<E>();
}
public interface Delayed extends Comparable<Delayed> {
//getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,
//如果返回 0 或者负数则代表任务已过期。
//元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。
long getDelay(TimeUnit unit);
}
DelayQueue使用
DelayQueue<OrderInfo> queue = new DelayQueue<OrderInfo>();
// 对象需要实现delayed 接口
class OrderInfo implements Delayed {
private String name;
private long time; //延时时间
@Override
public long getDelay(TimeUnit unit) {
long diff = time - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed obj) {
if (this.time < ((DelayObject) obj).time) {
return -1;
}
if (this.time > ((DelayObject) obj).time) {
return 1;
}
return 0;
}
}
DelayQueue的原理
数据结构
//用于保证队列操作的线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列,存储元素,用于保证延迟低的优先执行
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻塞的线程
private Thread leader = null;
// 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通知
private final Condition available = lock.newCondition();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
入队put方法
public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 入队
q.offer(e);
if (q.peek() == e) {
// 若入队的元素位于队列头部,说明当前元素延迟最小
// 将 leader 置空
leader = null;
// available条件队列转同步队列,准备唤醒阻塞在available上的线程
available.signal();
}
return true;
} finally {
lock.unlock(); // 解锁,真正唤醒阻塞的线程
}
}入队put方法
出队take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();// 取出堆顶元素( 最早过期的元素,但是不弹出对象)
if (first == null)// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
available.await();//当前线程无限期等待,直到被唤醒,并且释放锁。
else {
long delay = first.getDelay(NANOSECONDS);// 堆顶元素的到期时间
if (delay <= 0)// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
return q.poll();
// 如果delay大于0 ,则下面要阻塞了
// 将first置为空方便gc
first = null;
// 如果有线程争抢的Leader线程,则进行无限期等待。
if (leader != null)
available.await();
else {
// 如果leader为null,把当前线程赋值给它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待剩余等待时间
available.awaitNanos(delay);
} finally {
// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader == null && q.peek() != null)
// available条件队列转同步队列,准备唤醒阻塞在available上的线程
available.signal();
// 解锁,真正唤醒阻塞的线程
lock.unlock();
}
}
- 获取锁
- 获取最早过期的元素,先不弹出元素素。
- 判断最早是否为空,如果是空就无线等吧,
- 如果为不为空,判断剩余的过期时间,
- 如果已经过期则直接返回当前元素
- 如果没有过期,也就是说剩余时间还存在,则先获取Leader对象,如果Leader已经有线程在处理,就是说还轮不到当前的线程执行,当前线程需要进行无限期等待,如果Leader为空,就可以只需等待剩余的时间就ok了
- 如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。
总结就是:如果获取的对象以及过期就直接出队,如果哦没有就等下剩余的时间
如何选择适合的阻塞队列
线程池对于阻塞队列的选择
- FixedThreadPool———> LinkedBlockingQueue
- SingleThreadExecutor ———> LinkedBlockingQueue
- CachedThreadPool ———> SynchronousQueue
- ScheduledThreadPool ——->DelayQueue
- SingleThreadScheduledExecutor ——->DelayQueue
选择策略
通常我们可以从以下 5 个角度考虑,来选择合适的阻塞队列:
- 功能:
需要排序?延迟? - 容量:
考虑并发量,和业务需求是否有大量的任务
固定容量的:ArrayBlockingQueue
无界队列:LinkedBlockingQueue,DelayQueue - 是否扩容
看业务是否稳定类型,还是高并发类型
不能自动扩容:ArrayBlockingQueue
会自动扩容:PriorityBlockingQueue - 内存结构:
考虑内存利用率的性能
ArrayBlockingQueue:数据实现,没有所谓的结点,利用率高
LinkedBlockingQueue :链表实现,多了一层结点 - 性能:
ArrayBlockingQueue :一把锁,读写都会
LinkedBlockingQueue :两把锁,读写分离
SynchronousQueue :数据直传,没有存储的过程