
ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全(出队入队使用同一把锁,所以互斥)。
选择分析
在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。
使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。
原理解析
put方法
public void put(E e) throws InterruptedException {//元素判空checkNotNull(e);final ReentrantLock lock = this.lock;//可中断锁。(等待锁期间被中断,则抛出异常)lock.lockInterruptibly();try {//阻塞队列放满情况下,生产者挂起,等待消费者唤醒//设计注意点: 用while不用if是为了防止【虚假唤醒】while (count == items.length)notFull.await();//生产者线程,进入条件队列,并释放锁//具体你入队操作enqueue(e);} finally {lock.unlock();// 唤醒消费者线程}}
enqueue入队方法
private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)//精髓设计思想: 环形数组,putIndex指针到数组尾部,返回头部putIndex = 0;count++;//有新元素入队,则队列不为空;唤醒消费者线程。(条件队列转同步队列,生产者线程释放锁后,同步队列竞争锁)notEmpty.signal();}
take方法
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;//可中断锁。(等待锁期间被中断,则抛出异常)lock.lockInterruptibly();try {//while,防止虚假 唤醒(唤醒后条件再次不满足唤醒条件,则会再次阻塞)while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();// 唤醒生产者线程}}
dequeue出队方法
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;//精髓设计思想: 环形数组,putIndex指针到数组尾部,返回头部if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();//有新元素出队,则队列不满;唤醒生产者线程。(条件队列转同步队列,消费者线程释放锁后,同步队列竞争锁)notFull.signal();return x;}
