SynchronousQueue
特性:
SynchronousQueue没有一个地方来暂存元素,即它的容量为 0。
每次取数据都要先阻塞,直到有数据被放入;
同理,每次放数据的时候也会阻塞,直到有消费者来取。
SynchronousQueue的队列中封装的节点更多针对的不是数据,而是要执行的操作,
应用场景:
Executors.newCachedThreadPool()就使用了 SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则 会重复使用,线程空闲了60秒后会被回收。
数据结构:
链表
先消费后生产的场景:
1:第一个线程Thread0是消费者访问,此次队列为空,则入队(创建Node结点并赋值)
2:第二个线程Thread1也是消费者访问,继续入队
3:第三个线程Thread2是生产者,携带了数据,直接将该线程携带的数据返回给队首的消费者,并唤醒队首线程Thread1出队
先生产后消费的场景,原理一样
锁:
CAS+自旋 ,自旋一定次数后调用LockSupport.park
公平与非公平实现:
非公平模式(默认策略):底层使用TransferStack,栈顶匹配,栈顶出栈,后进先出
volatile SNode next; // 指向下一个节点的指针
volatile SNode match; // 存放和它进行匹配的节点
volatile Thread waiter; // 保存阻塞的线程
Object item;
int mode;
SNode(Object item) {
this.item = item;
}
公平模式:底层使用TransferQueue, 队尾匹配(判断模式),队头出队,先进先出
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
PriorityBlockingQueue
/**
* 数据结构:数组+二叉树
*/
private transient Object[] queue;
//大小
private transient int size;
//比较器
private transient Comparator<? super E> comparator;
//一把锁
private final ReentrantLock lock;
//等待条件
private final Condition notEmpty;
1)一个支持优先级排序的无界堵塞队列:
入队:根据比较器进行排序
出队:优先级高的先出队、优先级低的后出队
数据结构:
物理上是用数组去存储,会自动扩容
逻辑上用二叉堆去排序
关于二叉堆知识的补充:
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始 排序。
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不 同,二叉堆又可以分为两个类型: 大顶堆和小顶堆。
- 大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值;
- 小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。
入队的时候小顶堆相关代码:
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
出队
/**
* Inserts item x at position k, maintaining heap invariant by
* demoting x down the tree repeatedly until it is less than or
* equal to its children or is a leaf.
*
* @param k the position to fill
* @param x the item to insert
* @param array the heap array
* @param n heap size
*/
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
DelayQueue
1)DelayQueue 是一个支持延时获取元素的阻塞队列,
入队:元素会根据延迟时间的长短被放到队列中,延迟时间较短的放队列前面
出队:堆顶元素过期时间小于等于0就出队,否则堵塞
2)应用场景:处理延迟任务,具体case
- 清理设置了过期时间的缓存
订单未支付,超时自动关闭 ```java //存储的数据结构 private final PriorityQueue
q = new PriorityQueue (); //一把锁 private final transient ReentrantLock lock = new ReentrantLock();
//等待条件 private final Condition available = lock.newCondition();
如何选择适合的阻塞队列
1)考虑功能能否实现,如优先级排序、延迟 执行等。
2)容量
根据任务数量来推算出 合适的容量,从而去选取合适的 BlockingQueue。
固定容量:ArrayBlockingQueue
无限容量:LinkedBlockingQueue,DelayQueue
无容量:SynchronousQueue
自动扩容:PriorityBlockingQueue
3)性能
从支持高并发角度去看,LinkedBlockingQueue比ArrayBlockingQueue高
fox老师对于堵塞队列总结的脑图
https://www.processon.com/view/link/618ce3941e0853689b0818e2#map