一、概述

queue:一个队列就是一个先入先出(FIFO)的数据结构,queue接口与list、set同一级别,都是集成Collection接口。
image.png

二、queue类图

image.png
常用的队列分为两类:阻塞队列和非阻塞队列

  • 常用阻塞队列

ArrayBlockingQueue - 基于数组实现的有界阻塞队列
LinkedBlockingQueue - 基于链表实现的有界阻塞队列
PriorityBlockingQueue - 一个具有优先级的无限阻塞队列,默认情况下元素采用自然顺序升序排列。基于最 小 二叉堆实现,使用CAS实现的自旋锁来控制队列的动态扩容
SynchronousQueue - 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除 操作,否则插 入操作处于阻塞状态

  • 常用非阻塞队列

LinkedList - 除了实现了List接口,也实现了Deque接口,可以当做双端队列来使用
PriorityQueue - 无界有序队列,按照自然顺序排列。Object数组实现
ConcurrentLinkedQueue - 基于连接节点(链表)、线程安全的队列(CAS)。

三、常用方法

  1. -- 尾部添加
  2. add() - 增加一个元素,如果队列已满,则抛出异常
  3. 队列满 notFull.await
  4. notEmpty.signal()
  5. offer() - 添加一个元素并返回true,如果队列已满,则返回false
  6. -- 删除并返回头部元素
  7. remove() - 移除并返回队列头部的元素 如果队列为空,则抛出异常
  8. notFull.single() 取元素调用
  9. notEmpty.await() 当队列长度为 0
  10. poll() - 移除并返回队列头部的元素,如果队列为空,则返回null
  11. -- 获取头部元素但不删除
  12. element() - 返回队列的头部元素,如果队列为空,则抛出异常
  13. peek() - 返回队列头部的元素,如果队列为空,则返回null
  14. -- 阻塞添加删除
  15. 队列满 notFull.await
  16. notEmpty.signal()
  17. put() - 添加一个元素,如果队列满,则阻塞
  18. notFull.single()
  19. notEmpty.await() 当队列长度为 0
  20. take() - 移除并返回队列头部的元素,如果队列为空,则阻塞

四、ArrayBlockingQueue

  • 通过数组实现的有界阻塞队列,此队列按照先进先出的原则对元素进行排序
  • 锁没有分离(只有一把锁),生产和消费用的是同一把锁
  • 数组队列在生产和消费(出队入队)时直接将对象从数组中插入或移除,效率较高
  • 因为是数组队列,所以初始化时必须指定队列大小。 ```java final Object[] items; / Main lock guarding all access */ final ReentrantLock lock; / Condition for waiting takes / private final Condition notEmpty; /** Condition for waiting puts / private final Condition notFull; / items index for next take, poll, peek or remove */ int takeIndex; / items index for next put, offer, or add / int putIndex; /** Number of elements in the queue / int count;

public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); //非空条件等待队列 notEmpty = lock.newCondition(); //非满条件等待队列 notFull = lock.newCondition(); }

public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { //如果队列已满,发挥false if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }

private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //当队列元素个数等于0时,阻塞等待 while (count == 0) notEmpty.await(); //不等于0,添加元素 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings(“unchecked”) E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count—; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }

  1. <a name="QLfN8"></a>
  2. # 五、LinkedBlockingQueue
  3. - 通过链表实现可选容量阻塞队列
  4. - 有两把锁,一把用于put(生产),一把用户take(消费)
  5. - 队列在生产和消费的时候,需要把数据对象封装到Node节点中,相对效率没有那么高
  6. - 队列容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量
  7. ```java
  8. /** 执行take, poll等操作时候需要获取该锁 */
  9. private final ReentrantLock takeLock = new ReentrantLock();
  10. /** 当队列为空时候执行出队操作(比如take)的线程会被放入这个条件队列进行等待 */
  11. private final Condition notEmpty = takeLock.newCondition();
  12. /** 执行put, offer等操作时候需要获取该锁*/
  13. private final ReentrantLock putLock = new ReentrantLock();
  14. /**当队列满时候执行进队操作(比如put)的线程会被放入这个条件队列进行等待 */
  15. private final Condition notFull = putLock.newCondition();
  16. /** 当前队列元素个数 ArrayBlockingQueue用的是int*/
  17. private final AtomicInteger count = new AtomicInteger(0);
  1. public LinkedBlockingQueue(int capacity) {
  2. if (capacity <= 0) throw new IllegalArgumentException();
  3. this.capacity = capacity;
  4. last = head = new Node<E>(null);
  5. }
  6. public boolean offer(E e, long timeout, TimeUnit unit)
  7. throws InterruptedException {
  8. if (e == null) throw new NullPointerException();
  9. long nanos = unit.toNanos(timeout);
  10. int c = -1;
  11. final ReentrantLock putLock = this.putLock;
  12. final AtomicInteger count = this.count;
  13. putLock.lockInterruptibly();
  14. try {
  15. while (count.get() == capacity) {
  16. if (nanos <= 0)
  17. return false;
  18. nanos = notFull.awaitNanos(nanos);
  19. }
  20. enqueue(new Node<E>(e));
  21. c = count.getAndIncrement();
  22. if (c + 1 < capacity)
  23. notFull.signal();
  24. } finally {
  25. putLock.unlock();
  26. }
  27. if (c == 0)
  28. signalNotEmpty();
  29. return true;
  30. }
  31. private void enqueue(Node<E> node) {
  32. // assert putLock.isHeldByCurrentThread();
  33. // assert last.next == null;
  34. last = last.next = node;
  35. }
  36. public E take() throws InterruptedException {
  37. E x;
  38. int c = -1;
  39. final AtomicInteger count = this.count;
  40. final ReentrantLock takeLock = this.takeLock;
  41. takeLock.lockInterruptibly();
  42. try {
  43. while (count.get() == 0) {
  44. notEmpty.await();
  45. }
  46. x = dequeue();
  47. c = count.getAndDecrement();
  48. if (c > 1)
  49. notEmpty.signal();
  50. } finally {
  51. takeLock.unlock();
  52. }
  53. if (c == capacity)
  54. signalNotFull();
  55. return x;
  56. }
  57. private E dequeue() {
  58. // assert takeLock.isHeldByCurrentThread();
  59. // assert head.item == null;
  60. Node<E> h = head;
  61. Node<E> first = h.next;
  62. h.next = h; // help GC
  63. head = first;
  64. E x = first.item;
  65. first.item = null;
  66. return x;
  67. }