概述
在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:
常用方法概述
BlockingQueue的主要功能就是能够根据场景对于阻塞的需求来对外显示阻塞/非阻塞功能。其常用方法包括存、取功能。
放入数据
- offer(anObject): 表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);
- offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject): 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
获取数据
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
- poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
- take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
- drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
实现方法
LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。
LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。
其底层数据结构是一个链表。原理
BlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。代码分析
LinkedBlockingQueue可以指定容量,内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。对象的定义
```java //容量,如果没有指定,该值为Integer.MAX_VALUE; private final int capacity;
//当前队列中的元素 private final AtomicInteger count =new AtomicInteger();
//队列头节点,始终满足head.item==null transient Node head;
//队列的尾节点,始终满足last.next==null private transient Node last;
//用于出队的锁 private final ReentrantLock takeLock =new ReentrantLock();
//当队列为空时,保存执行出队的线程 private final Condition notEmpty = takeLock.newCondition();
//用于入队的锁 private final ReentrantLock putLock =new ReentrantLock();
//当队列满时,保存执行入队的线程 private final Condition notFull = putLock.newCondition();
<a name="e7UUF"></a>
### put(E e)方法
```java
public void put(E e) throws InterruptedException {
//不允许元素为null
if (e ==null) throw new NullPointerException();
int c = -1;
//以当前元素新建一个节点
Node node =new Node(e);
final ReentrantLock putLock =this.putLock;
final AtomicInteger count =this.count;
//获得入队的锁
putLock.lockInterruptibly();
try {
//如果队列已满,那么将该线程加入到Condition的等待队列中
while (count.get() == capacity) {
notFull.await();
}
//将节点入队
enqueue(node);
//得到插入之前队列的元素个数
c = count.getAndIncrement();
//如果还可以插入元素,那么释放等待的入队线程
if (c + 1 < capacity){
notFull.signal();
}
}finally {
//解锁
putLock.unlock();
}
//通知出队线程队列非空
if (c ==0) signalNotEmpty();
}
E take()方法
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//获取takeLock锁
takeLock.lockInterruptibly();
try {
//如果队列为空,那么加入到notEmpty条件的等待队列中
while (count.get() == 0) {
notEmpty.await();
}
//得到队头元素
x = dequeue();
//得到取走一个元素之前队列的元素个数
c = count.getAndDecrement();
//如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果队列中的元素从满到非满,通知put线程
if (c == capacity)
signalNotFull();
return x;
}
remove()方法
remove()方法用于删除队列中一个元素,如果队列中不含有该元素,那么返回false;有的话则删除并返回true。入队和出队都是只获取一个锁,而remove()方法需要同时获得两把锁,其实现如下:
public boolean remove(Object o) {
//因为队列不包含null元素,返回false
if (o == null) return false;
//获取两把锁
fullyLock();
try {
//从头的下一个节点开始遍历
for (Node trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//如果匹配,那么将节点从队列中移除,trail表示前驱节点
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
//释放两把锁
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
LinkedBlockingQueue与ArrayBlockingQueue比较
LinkedBlockingQueue
LinkedBlockingQueue是允许两个线程同时在两端进行入队或出队的操作的,但一端同时只能有一个线程进行操作,这是通过两把锁来区分的;为了维持底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(可以+1),另一个是出队的方法(可以-1),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。
一个单向链表+两把锁+两个条件 两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。 在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多
采用了链表,最大容量为整数最大值,可看做容量无限
ArrayBlockingQueue
ArrayBlockingQueue底层是使用一个数组实现队列的,并且在构造ArrayBlockingQueue时需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。
一个对象数组+一把锁+两个条件
入队与出队都用同一把锁
在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高
采用了数组,必须指定大小,即容量有限