阻塞队列介绍
Queue接口
public interface Queue<E> extends Collection<E> {
//添加一个元素,添加成功返回true, 如果队列满了,就会抛出异常
boolean add(E e);
//添加一个元素,添加成功返回true, 如果队列满了,返回false
boolean offer(E e);
//返回并删除队首元素,队列为空则抛出异常
E remove();
//返回并删除队首元素,队列为空则返回null
E poll();
//返回队首元素,但不移除,队列为空则抛出异常
E element();
//获取队首元素,但不移除,队列为空则返回null
E peek();
}
BlockingQueue接口
BlockingQueue继承了Queue接口,是队列的一种。Queue和BlockingQueue都是在 Java 5 中加入的。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列。
常用于:
解耦:生产者消费者模式
多线程同步
两个附加操作:
支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程,直到队列不满
支持阻塞的移除方法take: 队列空时,获取元素的线程会等待队列变为非空
BlockingQueue和JDK集合包中的Queue接口兼容,同时在其基础上增加了阻塞功能。
入队:
(1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不阻塞)
(2)offer(E e, long timeout, TimeUnit unit):可以设置阻塞时间,如果队列已满,则进行阻塞。超过阻塞时间,则返回false
(3)put(E e):队列没满的时候是正常的插入,如果队列已满,则阻塞,直至队列空出位置
出队:
(1)poll():如果有数据,出队,如果没有数据,返回null(不阻塞)
(2)poll(long timeout, TimeUnit unit):可以设置阻塞时间,如果没有数据,则阻塞,超过阻塞时间,则返回null
(3)take():队列里有数据会正常取出数据并删除;但是如果队列里无数据,则阻塞,直到队列里有数据
常用方法示例
当队列满了无法添加元素,或者是队列空了无法移除元素时:
1. 抛出异常:add、remove、element
2. 返回结果但不抛出异常:offer、poll、peek
3. 阻塞:put、take
public class BlockingQueueTest {
public static void main(String[] args) {
takeTest();
}
/**
* add 方法是往队列里添加一个元素,如果队列满了,就会抛出异常来提示队列已满。
*/
private static void addTest() {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
System.out.println(blockingQueue.add(1));
System.out.println(blockingQueue.add(2));
System.out.println(blockingQueue.add(3));
}
/**
* remove 方法的作用是删除元素并返回队列的头节点,如果删除的队列是空的, remove 方法就会抛出异常。
*/
private static void removeTest() {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
blockingQueue.add(1);
blockingQueue.add(2);
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
}
/**
* element 方法是返回队列的头部节点,但是并不删除。如果队列为空,抛出异常
*/
private static void elementTest() {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
blockingQueue.element();
}
/**
* offer 方法用来插入一个元素。如果添加成功会返回 true,而如果队列已经满了,返回false
*/
private static void offerTest(){
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(2));
System.out.println(blockingQueue.offer(3));
}
/**
* poll 方法作用也是移除并返回队列的头节点。 如果队列为空,返回null
*/
private static void pollTest() {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3);
blockingQueue.offer(1);
blockingQueue.offer(2);
blockingQueue.offer(3);
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
/**
* peek 方法返回队列的头元素但并不删除。 如果队列为空,返回null
*/
private static void peekTest() {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
System.out.println(blockingQueue.peek());
}
/**
* put 方法的作用是插入元素。如果队列已满就无法继续插入,阻塞插入线程,直至队列空出位置
*/
private static void putTest(){
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
try {
blockingQueue.put(1);
blockingQueue.put(2);
blockingQueue.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* take 方法的作用是获取并移除队列的头结点。如果执队列里无数据,则阻塞,直到队列里有数据
*/
private static void takeTest(){
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
try {
blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
阻塞队列特性
阻塞
阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字,所以下面重点介绍阻塞功能:阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。
take()方法
获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。过程如图所示:
put()方法
插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到 队列中。过程如图所示:
是否有界
阻塞队列还有一个非常重要的属性,那就是容量的大小,分为有界和无界两种。无界队列意味着里面可以容纳非常多的元素,例如LinkedBlockingQueue的上限是 Integer.MAX_VALUE,是非常大的一个数,可以近似认为是无限容量,因为我们几乎无法把这个容量装满。但是有的阻塞队列是有界的,例如 ArrayBlockingQueue 如果容量满了,也不会扩容,所以一旦满了就无法再往里放数据了。
应用场景
生产消费模式
隔离、解耦
Nacos 异步持久化
MQ挂了,本地兜底
常见的阻塞队列
BlockingQueue接口的实现类都被放在了juc包中,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
ArrayBlockingQueue
有界阻塞队列,先进先出,存取互斥
1.数据结构:静态数组,使用连续内存,容量固定必须指定长度,没有扩容机制
2.锁:使用ReentrantLock实现线程安全,存取使用了同一把锁,操作的是同一个数组对象,互斥,所以性能稍微低一些,高并发场景优先选择LinkedBlockingQueue
3.阻塞对象:两个条件队列notEmpty,notFull
a.入队 从头部开始添加元素,记录putIndex,到尾部设置为0,唤醒notEmpty
b.出队 从头部开始取出元素,记录takeIndex,到尾部设置为0,唤醒notFull
使用
BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1"); /向队列中添加元素
Object object = queue.take(); /从队列中取出元素
原理
利用了Lock锁的Condition通知机制进行阻塞控制。
核心:一把锁,两个条件
//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者
private final Condition notEmpty;
//生产者
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
...
lock = new ReentrantLock(fair); //公平,非公平
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
入队put()方法
public void put(E e) throws InterruptedException {
//检查是否为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加锁,如果线程中断抛出异常
lock.lockInterruptibly();
try {
//阻塞队列已满,则将生产者挂起,等待消费者唤醒
//设计注意点: 用while不用if是为了防止虚假唤醒
while (count == items.length)
notFull.await(); //队列满了,使用notFull等待(生产者阻塞)
// 入队
enqueue(e);
} finally {
lock.unlock(); // 唤醒消费者线程
}
}
private void enqueue(E x) {
final Object[] items = this.items;
//入队 使用的putIndex
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0; //设计的精髓: 环形数组,putIndex指针到数组尽头了,返回头部
count++;
//notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了
notEmpty.signal();
}
出队take()方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加锁,如果线程中断抛出异常
lock.lockInterruptibly();
try {
//如果队列为空,则消费者挂起
while (count == 0)
notEmpty.await();
//出队
return dequeue();
} finally {
lock.unlock();// 唤醒生产者线程
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex]; //取出takeIndex位置的元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0; //设计的精髓: 环形数组,takeIndex 指针到数组尽头了,返回头部
count‐‐;
if (itrs != null)
itrs.elementDequeued();
//notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位
notFull.signal();
return x;
}
为什么对数组操作要设计成双指针(环装数组)?
1.不需要维护数组的扩容,降低开发难度
2.数组的插入和删除操作,时间复杂度从O(n)变为O(1)
LinkedBlockingQueue
无界阻塞队列,可以指定容量,默认为Integer.MAX_VALUE,先进先出,存取互不干扰
1.数据结构:链表,内部类Node存储元素
2.锁分离:存取互不干扰,存取操作的是不同的Node对象
a.takeLock 取Node节点保证前驱后继不会乱
b.putLock 存Node节点,保证前去后继不会乱
删除元素时,两个锁一起加
3.阻塞对象
a.notEmpty 出队:队列count=0,无元素可取时,阻塞在该对象上
b.notFull 入队:队列count=capacity,存不进元素时,阻塞在该对象上
4.入队 队尾入队,由last指针记录
5.出队 队首出队,由head指针记录
先进先出
线程池中为什么使用LinkedBlockingQueue而不用ArrayBlockingQueue? 锁分离,性能高一些
使用
//指定队列的大小创建有界队列
BlockingQueue<Integer> boundedQueue = new LinkedBlockingQueue<>(100);
//无界队列
BlockingQueue<Integer> unboundedQueue = new LinkedBlockingQueue<>();
原理
基本属性
// 容量,指定容量就是有界队列
private final int capacity;
// 元素数量
private final AtomicInteger count = new AtomicInteger();
// 链表头 本身是不存储任何元素的,初始化时item指向null
transient Node<E> head;
// 链表尾
private transient Node<E> last;
// take锁 锁分离,提高效率
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty条件
// 当队列无元素时,take锁会阻塞在notEmpty条件上,等待其它线程唤醒
private final Condition notEmpty = takeLock.newCondition();
// put锁
private final ReentrantLock putLock = new ReentrantLock();
// notFull条件
// 当队列满了时,put锁会会阻塞在notFull上,等待其它线程唤醒
private final Condition notFull = putLock.newCondition();
//典型的单链表结构
static class Node<E> {
E item; /存储元素
Node<E> next; /后继节点 单链表结构
Node(E x) { item = x; }
}
构造器
public LinkedBlockingQueue() {
// 如果没传容量,就使用最大int值初始化其容量
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity = 0) throw new IllegalArgumentException();
// 初始化容量
this.capacity = capacity;
// 初始化head和last指针为空值节点
last = head = new Node<E>(null);
}
入队put()方法
public void put(E e) throws InterruptedException {
// 不允许null元素
if (e = null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 新建一个节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 使用put锁加锁
putLock.lockInterruptibly();
try {
// 如果队列满了,就阻塞在notFull上等待被其它线程唤醒(阻塞生产者线程)
while (count.get() = capacity) {
notFull.await();
}
// 队列不满,就入队
enqueue(node);
// 队列长度+1,返回原值
c = count.getAndIncrement();
// 如果现队列长度小于容量,notFull条件队列转同步队列,准备唤醒一个阻塞在
notFull条件上的线程(可以继续入队)
// 这里为啥要唤醒一下呢?
// 因为可能有很多线程阻塞在notFull这个条件上,而取元素时只有取之前队列是满的才
会唤醒notFull,此处不用等到取元素时才唤醒
if (c + 1 < capacity)
notFull.signal();
} finally {
// 真正唤醒生产者线程
putLock.unlock();
}
// 如果原队列长度为0,现在加了一个元素后立即唤醒阻塞在notEmpty上的线程
if (c = 0)
signalNotEmpty();
}
// 入队,尾插
private void enqueue(Node<E> node) {
// 直接加到last后面,last指向入队元素
last = last.next = node;
}
// 唤醒也要加takeLock
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 加take锁
takeLock.lock();
try {
// notEmpty条件队列转同步队列,准备唤醒阻塞在notEmpty上的线程
notEmpty.signal();
} finally {
// 真正唤醒消费者线程
takeLock.unlock();
}
}
入队,加锁,如果满了,生产者阻塞
如果元素数量小于capacity,唤醒生产线程,不用等到取元素时候才唤醒,提高吞吐量
出队take()方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 使用takeLock加锁
takeLock.lockInterruptibly();
try {
// 如果队列无元素,则阻塞在notEmpty条件上(消费者线程阻塞)
while (count.get() = 0) {
notEmpty.await();
}
// 否则出队
x = dequeue();
// 长度-1,返回原值
c = count.getAndDecrement();
// 如果取之前队列长度大于1,notEmpty条件队列转同步队列,准备唤醒阻塞在
notEmpty上的线程,原因与入队同理
if (c > 1)
notEmpty.signal();
} finally {
// 真正唤醒消费者线程
takeLock.unlock();
}
// 为什么队列是满的才唤醒阻塞在notFull上的线程呢?
// 因为唤醒是需要加putLock的,这是为了减少锁的次数,所以,这里索性在放完元素就检测一
下,未满就唤醒其它notFull上的线程,
// 这也是锁分离带来的代价
// 如果取之前队列长度等于容量(已满),则唤醒阻塞在notFull的线程
if (c = capacity)
signalNotFull();
return x;
}
private E dequeue() {
// head节点本身是不存储任何元素的
// 把head删除,并把head下一个节点作为新的值
// 把其值置空,返回原来的值
Node<E> h = head;
Node<E> first = h.next;
h.next = h; / help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// notFull条件队列转同步队列,准备唤醒阻塞在notFull上的线程
notFull.signal();
} finally {
// 解锁,这才会真正的唤醒生产者线程
putLock.unlock();
}
}
LinkedBlockingQueue与ArrayBlockingQueue对比
1.ArrayBlockingQueue有界,LinkedBlockingQueue无界,默认为Integer.MAX_VALUE大小,也可设置为有界
2.数据结构 ArrayBlockingQueue是数组,LinkedBlockingQueue是链表
3.锁分离 ArrayBlockingQueue没有锁分离,存取都用ReentrantLock,LinkedBlockingQueue有 putLock和takeLock锁分离设计,提高并发性能
SynchronousQueue
同步阻塞队列,是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take。 SynchronousQueue的容量为0,所以没有地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
SynchronousQueue 的容量不是1而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,它不会为队列中元素维护存储空间,只是多个线程之间数据交换的媒介。
1.数据结构:链表(Node),在其内部类中维护了数据,先消费(take),后生产(put)
a.第一个线程Thread0是消费者访问,此时队列为空,则入队(创建Node结点并赋值)
b.第二个线程Thread1也是消费者访问,与队尾模式相同,继续入队
c.第三个线程Thread2是生产者,携带了数据e,与队尾模式不同,不进行入队操作直接将该线程携带的数据e返回给队首的消费者,并唤醒队首线程Thread1(默认非公平策略是栈结构),出队
2.锁:CAS+自旋(无锁)
3.阻塞:自旋了一定次数后调用LockSupport.park()
4.存取调用同一个方法:transfer()
a. put、offer 为生产者,携带了数据 e,为 Data 模式,设置到 SNode或QNode 属性中
b. take、poll 为消费者,不携帯数据,为 Request 模式,设置到 SNode或QNode属性中
5.过程
线程访问阻塞队列,先判断队尾节点或者栈顶节点的 Node 与当前入队模式是否相同,相同则构造节点 Node 入队,并阻塞当前线程,元素 e 和线程赋值给 Node 属性,不同则将元素 e(不为 null) 返回给取数据线程,队首或栈顶线程被唤醒,出队
6.公平模式
TransferQueue 队尾匹配(判断模式),队头出队,先进先出
7.非公平模式(默认策略)
TransferStack 栈顶匹配,栈顶出栈,后进先出
场景
- 生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
2. 线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,配合 SynchronousQueue为每个生产者请求分配一个消费线程。Executors.newCachedThreadPool()就使 用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重 复使用,线程空闲了60秒后会被回收。使用
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue >();
原理
take出队、put入队都调用了transferpublic E take() throws InterruptedException { // transfer有两种实现方式,队列和栈 E e = transferer.transfer(null, false, 0); if (e = null) return e; Thread.interrupted(); throw new InterruptedException(); } public void put(E e) throws InterruptedException { if (e = null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) = null) { Thread.interrupted(); throw new InterruptedException(); } }
PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列,优先级高的先出队,优先级低的后出队。不能保证同优先级元素的顺序。
1.数据结构:数组+二叉堆
默认容量11,可指定初始容量,会自动扩容,直到资源消耗尽为止,最大容量是 Integer.MAX_VALUE
2.锁:ReentrantLock 存取是同一把锁
3.阻塞对象:NotEmpty 出队,队列为空时阻塞
4.入队
a.不阻塞,永远返回成功,无界
b.根据比较器进行堆化(排序)自下而上
传入比较器对象就按照比较器的顺序排序
如果比较器为 null,则按照自然顺序排序(从小到大)
5.出队
优先级最高的元素在堆顶(弹出堆顶元素)
弹出前比较两个子节点再进行堆化(自上而下)
6.优先级
默认情况下元素采用自然顺序升序排序,也可以通过构造函数来指定Comparator来对元素进行排序。场景
电商抢购活动,会员级别高的用户优先抢购到商品
银行办理业务,vip客户插队使用
//创建优先级阻塞队列 Comparator为null,自然排序 PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue >(5); // 自定义Comparator PriorityBlockingQueue<Integer> queue2 = new PriorityBlockingQueue >(5, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { return o2 - o1; } });
原理
二叉堆
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型:
大顶堆和小顶堆。
大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值;
小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。
DelayQueue
一个使用优先级队列实现的无界阻塞队列,内部采用优先队列PriorityQueue存储元素,同时元素必须实现Delayed接口,无界队列。 在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。 延迟队列的特点是:不是先进先出,而是会按照延迟时间的长短来排序,下一个即将执行的任务会排到队列的最前面。
1.数据结构:优先队列PriorityQueue 与PriorityBlockingQueue类似,不过没有阻塞功能
2.锁:ReentrantLock
3.阻塞对象:Condition available
4.入队:不阻塞,无界队列,与优先级队列入队相同,available
5.出队
为空时阻塞
检查堆顶元素过期时间
a.小于等于0则出队
b.大于0,说明没过期,则阻塞
判断leader线程是否为空(为了保证优先级)
a.不为空(已有线程阻塞),直接阻塞
b.为空,则将当前线程置为leader,并按照过期时间进行阻塞场景
商城订单超时关闭 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单
异步短信通知功能 饿了么订餐通知:下单成功后60s之后给用户发送短信通知。
关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭。
缓存过期清除。缓存中的对象,超过了存活时间,需要从缓存中移出。
任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等。
public interface Delayed extends Comparable<Delayed> { //getDelay 方法返回的是"还剩下多长的延迟时间才会被执行", //如果返回 0 或者负数则代表任务已过期。 //元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。 long getDelay(TimeUnit unit); }
使用
DelayQueue<OrderInfo> queue = new DelayQueue<OrderInfo>();
原理
入队put() ```java public void put(E e) { offer(e); }// 用于保证队列操作的线程安全 private final transient ReentrantLock lock = new ReentrantLock(); // 优先级队列,存储元素,用于保证延迟低的优先执行 private final PriorityQueue<E> q = new PriorityQueue<E>(); // 用于标记当前是否有线程在排队(仅用于取元素时) leader 指向的是第一个从队列获取元素阻 塞的线程 private Thread leader = null; // 条件,用于表示现在是否有可取的元素 当新元素到达,或新线程可能需要成为leader时被通 知 private final Condition available = lock.newCondition(); public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 入队 q.offer(e); // 若入队的元素位于队列头部,说明当前元素延迟最小 if (q.peek() = e) { // 将 leader 置空 leader = null; // available条件队列转同步队列,准备唤醒阻塞在available上的线程 available.signal(); } return true; } finally { // 解锁,真正唤醒阻塞的线程 lock.unlock(); } }
public boolean offer(E e) { if (e = null) throw new NullPointerException(); modCount +; int i = size; if (i = queue.length) grow(i + 1); size = i + 1; if (i = 0) queue[0] = e; else siftUp(i, e); return true; }
**出队take()**
```java
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for ( ;) {
// 取出堆顶元素(最早过期的元素,但是不弹出对象)
E first = q.peek();
// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
if (first = null)
// 当前线程无限期等待,直到被唤醒,并且释放锁。
available.await();
else {
// 堆顶元素的到期时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
if (delay <= 0)
return q.poll();
// 如果delay大于0 ,则下面要阻塞了,将first置为空方便gc
first = null; // don't retain ref while waiting
// 如果有线程争抢的Leader线程,则进行无限期等待。
if (leader != null)
available.await();
else {
// 如果leader为null,把当前线程赋值给它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待剩余等待时间
available.awaitNanos(delay);
} finally {
// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
if (leader = thisThread)
leader = null;
}
}
}
}
} finally {
// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader = null & q.peek() = null)
// available条件队列转同步队列,准备唤醒阻塞在available上的线程
available.signal();
// 解锁,真正唤醒阻塞的线程
lock.unlock();
}
}
出队总结
1. 当获取元素时,先获取到锁对象。
2. 获取最早过期的元素,但是并不从队列中弹出元素。
3. 判断最早过期元素是否为空,如果为空则直接让当前线程无限期等待状态,并且让出当前锁对象。
4. 如果最早过期的元素不为空,获取最早过期元素的剩余过期时间
a.如果已经过期则直接返回当前元素
b.如果没有过期,也就是说剩余时间还存在,则先获取Leader对象
-如果Leader已经有线程在处理,则当前线程进行无限期等待
-如果Leader为空,则首先将Leader设置为当前线程,并且让当前线程等待剩余时间。
5. 最后将Leader线程设置为空
6. 如果Leader已经为空,并且队列有内容则唤醒一个等待的队列。
如何选择适合的阻塞队列
通常可以从以下5个角度考虑,来选择合适的阻塞队列
功能
比如是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,必须选择类似于 PriorityBlockingQueue之类的有排序能力的阻塞队列。
容量
是否有存储的要求,还是只需要“直接传递”。有容量固定ArrayBlockingQueue;有默认容量无限的 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数 量来推算出合适的容量,从而去选取合适的 BlockingQueue。
能否扩容
有时并不能在初始的时候很好的准确估计队列的大小,业务可能有高峰期、低谷期。如果一开始就固定一个容 量,可能无法应对所有的情况,也是不合适的,有可能需要动态扩容。如果需要动态扩容的话,就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反,PriorityBlockingQueue即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以可以根据是否需要扩容来选取合适的队列。
内存结构
ArrayBlockingQueue 的内部结构是“数组”的形式。LinkedBlockingQueue 的内部是用链表实现的, ArrayBlockingQueue没有链表所需要的节点,空间利用率更高。如果对性能有要求可以从内存的结构角度去考虑这个问题。
性能
LinkedBlockingQueue 拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要”直接传递”,而不需要存储的过程。如果场景需要直接传递的话,可以优先考虑 SynchronousQueue。
问题一:聊聊对阻塞队列的理解
BlockingQueue继承了Queue接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。
两个附加操作:
支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法take: 队列空时,获取元素的线程会等待队列变为非空
BlockingQueue 是线程安全的,我们在很多场景下都可以利用线程安全的队列来优雅地解决我们业务自身的线程安全问题。
问题二:请聊下对象的内存布局
在HotSpot虚拟机中,对象在内存中存储的布局可以分为3块区域:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。
对象头包括两部分信息,第一部分用于存储对象自身的运行时数据,如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID、偏向时间戳等。
对象头的另外一部分是类型指针,即对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。
如果对象是一个java数组,那么在对象头中还有一块用于记录数组长度的数据。
第三部分对齐填充并不是必然存在的,也没有特别的含义,它仅仅起着占位符的作用。由于HotSpot VM的自动内存管理系统要求对对象的大小必须是8字节的整数倍。当对象其他数据部分没有对齐时,就需要通过对齐填充来补全。
问题三:SynchronousQueue公平和非公平实现原理是什么?
公平模式:底层使用TransferQueue, 队尾匹配(判断模式),队头出队,先进先出
非公平模式(默认策略):底层使用TransferStack,栈顶匹配,栈顶出栈,后进先出