ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全(出队入队使用同一把锁,所以互斥)。
选择分析
在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。
使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。
原理解析
put方法
public void put(E e) throws InterruptedException {
//元素判空
checkNotNull(e);
final ReentrantLock lock = this.lock;
//可中断锁。(等待锁期间被中断,则抛出异常)
lock.lockInterruptibly();
try {
//阻塞队列放满情况下,生产者挂起,等待消费者唤醒
//设计注意点: 用while不用if是为了防止【虚假唤醒】
while (count == items.length)
notFull.await();//生产者线程,进入条件队列,并释放锁
//具体你入队操作
enqueue(e);
} finally {
lock.unlock();// 唤醒消费者线程
}
}
enqueue入队方法
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
//精髓设计思想: 环形数组,putIndex指针到数组尾部,返回头部
putIndex = 0;
count++;
//有新元素入队,则队列不为空;唤醒消费者线程。(条件队列转同步队列,生产者线程释放锁后,同步队列竞争锁)
notEmpty.signal();
}
take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//可中断锁。(等待锁期间被中断,则抛出异常)
lock.lockInterruptibly();
try {
//while,防止虚假 唤醒(唤醒后条件再次不满足唤醒条件,则会再次阻塞)
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();// 唤醒生产者线程
}
}
dequeue出队方法
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
//精髓设计思想: 环形数组,putIndex指针到数组尾部,返回头部
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//有新元素出队,则队列不满;唤醒生产者线程。(条件队列转同步队列,消费者线程释放锁后,同步队列竞争锁)
notFull.signal();
return x;
}