阻塞队列的产生来自于典型的生产者-消费者模式。
记住2个阻塞的方法
- take:从队列中取
- put:放入队列
不可放Null值
各种阻塞队列
- 其中有两个阻塞队列说是无界的。我大致看了一下源码好像并不对。
阻塞队列原理
这里我们就以ArrayBlockingQueue为例,阅读其put()和take()的方法
构造方法
/**
* Serialization ID. This class relies on default serialization
* even for the items array, which is default-serialized, even if
* it is empty. Otherwise it could not be declared final, which is
* necessary here.
*/
private static final long serialVersionUID = -817911632652898426L;
/** 存放元素的数组 */
final Object[] items;
/** 下一个取数位置 */
int takeIndex;
/** 下一个入队列位置 */
int putIndex;
/** 元素个数 */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** 可重入锁 */
final ReentrantLock lock;
/** 等待有元素入队列条件 */
private final Condition notEmpty;
/** 等待队列不满条件 */
private final Condition notFull;
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
-
put操作
```java public void put(E e) throws InterruptedException { //检查不为null checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {
//当元素个数==队列(数组长度) 循环等待 notFull 条件
while (count == items.length)
notFull.await();
//入队列
enqueue(e);
} finally {
lock.unlock();
} }
private static void checkNotNull(Object v) { if (v == null)
throw new NullPointerException();
}
/**
- Inserts element at current put position, advances, and signals.
- Call only when holding lock.
- 入队列时还通知 不为空
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
count++; notEmpty.signal(); }putIndex = 0;
- 入队列时,判断元素不为空
- 若队列已满,通过 NotFull阻塞
- 入队列后,唤醒NotEmpty
<a name="HttPy"></a>
## take操作
```java
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}