概述

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:

BlockingQueue详解与实现 - 图1

常用方法概述

BlockingQueue的主要功能就是能够根据场景对于阻塞的需求来对外显示阻塞/非阻塞功能。其常用方法包括功能。

放入数据

  • offer(anObject): 表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程);
  • offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
  • put(anObject): 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

    获取数据

  • poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;

  • poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  • take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
  • drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    实现方法

    LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。
    LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。
    其底层数据结构是一个链表。

    原理

    BlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。

    代码分析

    LinkedBlockingQueue可以指定容量,内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。

    对象的定义

    ```java //容量,如果没有指定,该值为Integer.MAX_VALUE; private final int capacity;

//当前队列中的元素 private final AtomicInteger count =new AtomicInteger();

//队列头节点,始终满足head.item==null transient Node head;

//队列的尾节点,始终满足last.next==null private transient Node last;

//用于出队的锁 private final ReentrantLock takeLock =new ReentrantLock();

//当队列为空时,保存执行出队的线程 private final Condition notEmpty = takeLock.newCondition();

//用于入队的锁 private final ReentrantLock putLock =new ReentrantLock();

//当队列满时,保存执行入队的线程 private final Condition notFull = putLock.newCondition();

  1. <a name="e7UUF"></a>
  2. ### put(E e)方法
  3. ```java
  4. public void put(E e) throws InterruptedException {
  5. //不允许元素为null
  6. if (e ==null) throw new NullPointerException();
  7. int c = -1;
  8. //以当前元素新建一个节点
  9. Node node =new Node(e);
  10. final ReentrantLock putLock =this.putLock;
  11. final AtomicInteger count =this.count;
  12. //获得入队的锁
  13. putLock.lockInterruptibly();
  14. try {
  15. //如果队列已满,那么将该线程加入到Condition的等待队列中
  16. while (count.get() == capacity) {
  17. notFull.await();
  18. }
  19. //将节点入队
  20. enqueue(node);
  21. //得到插入之前队列的元素个数
  22. c = count.getAndIncrement();
  23. //如果还可以插入元素,那么释放等待的入队线程
  24. if (c + 1 < capacity){
  25. notFull.signal();
  26. }
  27. }finally {
  28. //解锁
  29. putLock.unlock();
  30. }
  31. //通知出队线程队列非空
  32. if (c ==0) signalNotEmpty();
  33. }

E take()方法

  1. public E take() throws InterruptedException {
  2. E x;
  3. int c = -1;
  4. final AtomicInteger count = this.count;
  5. final ReentrantLock takeLock = this.takeLock;
  6. //获取takeLock锁
  7. takeLock.lockInterruptibly();
  8. try {
  9. //如果队列为空,那么加入到notEmpty条件的等待队列中
  10. while (count.get() == 0) {
  11. notEmpty.await();
  12. }
  13. //得到队头元素
  14. x = dequeue();
  15. //得到取走一个元素之前队列的元素个数
  16. c = count.getAndDecrement();
  17. //如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
  18. if (c > 1)
  19. notEmpty.signal();
  20. } finally {
  21. takeLock.unlock();
  22. }
  23. //如果队列中的元素从满到非满,通知put线程
  24. if (c == capacity)
  25. signalNotFull();
  26. return x;
  27. }

remove()方法

remove()方法用于删除队列中一个元素,如果队列中不含有该元素,那么返回false;有的话则删除并返回true。入队和出队都是只获取一个锁,而remove()方法需要同时获得两把锁,其实现如下:

  1. public boolean remove(Object o) {
  2. //因为队列不包含null元素,返回false
  3. if (o == null) return false;
  4. //获取两把锁
  5. fullyLock();
  6. try {
  7. //从头的下一个节点开始遍历
  8. for (Node trail = head, p = trail.next;
  9. p != null;
  10. trail = p, p = p.next) {
  11. //如果匹配,那么将节点从队列中移除,trail表示前驱节点
  12. if (o.equals(p.item)) {
  13. unlink(p, trail);
  14. return true;
  15. }
  16. }
  17. return false;
  18. } finally {
  19. //释放两把锁
  20. fullyUnlock();
  21. }
  22. }
  23. void fullyLock() {
  24. putLock.lock();
  25. takeLock.lock();
  26. }

LinkedBlockingQueue与ArrayBlockingQueue比较

LinkedBlockingQueue

LinkedBlockingQueue是允许两个线程同时在两端进行入队或出队的操作的,但一端同时只能有一个线程进行操作,这是通过两把锁来区分的;为了维持底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(可以+1),另一个是出队的方法(可以-1),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。

一个单向链表+两把锁+两个条件 两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。 在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多

采用了链表,最大容量为整数最大值,可看做容量无限

ArrayBlockingQueue

ArrayBlockingQueue底层是使用一个数组实现队列的,并且在构造ArrayBlockingQueue时需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。

一个对象数组+一把锁+两个条件

入队与出队都用同一把锁

在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高

采用了数组,必须指定大小,即容量有限