ArrayBlockingQueue 由数组实现的有界队列,是线程安全的,容量大小在创建对象时已定义好

  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2. implements BlockingQueue<E>, java.io.Serializable {

实现了BlockingQueue接口,该接口定义了队列的入队出队的方法
继承AbstractQueue父类,实现入队出队的基本操作

构造方法

创建ArrayBlockingQueue一定要指定容量

  1. //必须传入容量,可以控制重入锁是公平还是非公平
  2. public ArrayBlockingQueue(int capacity) {
  3. this(capacity, false);
  4. }

还可以设置锁是公平还是非公平,默认是非公平

  1. public ArrayBlockingQueue(int capacity, boolean fair) {
  2. if (capacity <= 0)
  3. throw new IllegalArgumentException();
  4. //创建数组
  5. this.items = new Object[capacity];
  6. lock = new ReentrantLock(fair);
  7. notEmpty = lock.newCondition();
  8. notFull = lock.newCondition();
  9. }

还可以将已有的集合转化为ArrayBlockingQueue

  1. public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
  2. //创建ArrayBlockingQueue
  3. this(capacity, fair);
  4. final ReentrantLock lock = this.lock;
  5. lock.lock(); //创建ArrayBlockingQueue没有并发问题,加锁是为了保证可见性,防止指令重排
  6. try {
  7. int i = 0;
  8. try {
  9. for (E e : c) {
  10. checkNotNull(e);
  11. items[i++] = e;
  12. }
  13. //如果超出创建的ArrayBlockingQueue长度会抛异常
  14. } catch (ArrayIndexOutOfBoundsException ex) {
  15. throw new IllegalArgumentException();
  16. }
  17. count = i;
  18. putIndex = (i == capacity) ? 0 : i;
  19. } finally {
  20. lock.unlock();
  21. }
  22. }

核心成员变量

由于ArrayBlockingQueue是基于数组实现的,为了保证Queue先进先出的特点,以及避免数组由于元素的出队和入队频繁地进行移位操作,ArrayBlockingQueue通过 count 属性记录当前数组元素个数,通过 putIndex takeIndex 来往数组添加、取出元素,每次入队出队操作不用再移动元素位置。

  1. //存放元素的数组
  2. final Object[] items;
  3. //取指针,记录下一次取操作的位置
  4. int takeIndex;
  5. //放指针,记录下一次放操作的未知
  6. int putIndex;
  7. //数组元素个数
  8. int count;
  9. //锁
  10. final ReentrantLock lock;
  11. //等待出队的条件(队列不为空)
  12. private final Condition notEmpty;
  13. //等待入队的条件(队列未满)
  14. private final Condition notFull;

每当插入一个元素,putIndex自增1
微信截图_20210801164615.png
每当取出一个元素,takeIndex自增1
微信截图_20210801164702.png
如果 takeIndex 和 putIndex 重叠,说明队列为空

生产者-消费者模型

我们创建一个阻塞队列,生产者调用 put() 方法负责往队列中添加元素,消费者调用 take() 方法负责往队列中取出元素。阻塞队列是线程安全的,只有抢到锁的线程才能操作队列。所以同一时间内,生产者和消费者只能有一个可以往队列添加元素或删除元素。
微信截图_20210731175852.png
如果出现这种情况,当生产者(消费者)获取到了锁,往队列中放(取)元素时,而此时队列放满了(为空),此时生产者(消费者)应该释放锁,让消费者(生产者)抢锁并往队列里面取(放)元素,从而使队列不为满(空),后续可以进行对应操作。基于AQS的特性,我们很容易可以想到,可以直接将生产者(消费者)线程放入CLH队列中并阻塞,等待下一次抢锁。但是如果后续一直是生产者(消费者)来抢锁,抢到锁后情况和之前一样,又重新释放锁。
微信截图_20210731221408.png

为了解决上面这中情况,利用到了 Array BlockingQueue 中的两个 Condition 属性(notEmpty 和 notFull)和 AQS 中的条件等待队列

加入条件等待队列

通过调用 Condition.await() 方法加入条件等待队列,主要干三件事

  1. 将当前线程加入条件等待队列
  2. 释放当前线程获得的锁,并唤醒CLH队头节点
  3. 阻塞当前线程

    1. public final void await() throws InterruptedException {
    2. if (Thread.interrupted())
    3. throw new InterruptedException();
    4. //创建节点并加入条件等待队列
    5. Node node = addConditionWaiter();
    6. //释放锁
    7. int savedState = fullyRelease(node);
    8. int interruptMode = 0;
    9. //如果当前节点不再CLH队列中
    10. while (!isOnSyncQueue(node)) {
    11. //将当前线程阻塞
    12. LockSupport.park(this);
    13. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    14. break;
    15. }
    16. //被唤醒后调用acquireQueued()继续尝试获取锁
    17. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    18. interruptMode = REINTERRUPT;
    19. if (node.nextWaiter != null) // clean up if cancelled
    20. unlinkCancelledWaiters();
    21. if (interruptMode != 0)
    22. reportInterruptAfterWait(interruptMode);
    23. }

创建节点并加入条件等待队列

  1. private Node addConditionWaiter() {
  2. //获取尾节点
  3. Node t = lastWaiter;
  4. //如果尾节点的状态不为CONDITION
  5. if (t != null && t.waitStatus != Node.CONDITION) {
  6. //清除条件队列中所有条件不为CONDITION的节点
  7. unlinkCancelledWaiters();
  8. t = lastWaiter;
  9. }
  10. //创建Node节点,并将节点状态设置为CONDITION
  11. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  12. if (t == null)
  13. firstWaiter = node;
  14. else
  15. t.nextWaiter = node;
  16. //尾节点指向新加入的节点
  17. lastWaiter = node;
  18. return node;
  19. }

微信截图_20210731221711.png

释放锁

  1. final int fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. //获取当前state的值
  5. int savedState = getState();
  6. //释放锁,就算是重入锁也是直接释放(state - state = 0)
  7. if (release(savedState)) {
  8. failed = false;
  9. return savedState;
  10. } else {
  11. throw new IllegalMonitorStateException();
  12. }
  13. } finally {
  14. if (failed)
  15. node.waitStatus = Node.CANCELLED;
  16. }
  17. }
  18. -------------------------------------------
  19. public final boolean release(int arg) {
  20. //释放锁
  21. if (tryRelease(arg)) {
  22. Node h = head;
  23. if (h != null && h.waitStatus != 0)
  24. //唤醒CLH队头节点
  25. unparkSuccessor(h);
  26. return true;
  27. }
  28. return false;
  29. }
  30. -----------------------------------------------
  31. protected final boolean tryRelease(int releases) {
  32. //重新将state设置为0
  33. int c = getState() - releases;
  34. if (Thread.currentThread() != getExclusiveOwnerThread())
  35. throw new IllegalMonitorStateException();
  36. boolean free = false;
  37. if (c == 0) {
  38. free = true;
  39. //将当前获取锁线程设置为null
  40. setExclusiveOwnerThread(null);
  41. }
  42. setState(c);
  43. return free;
  44. }

从条件等待队列出队

将条件等待队列中的头节点移到CLH队列中,后续可以争夺锁

  1. public final void signal() {
  2. if (!isHeldExclusively())
  3. throw new IllegalMonitorStateException();
  4. //获取条件等待队列头节点
  5. Node first = firstWaiter;
  6. if (first != null)
  7. doSignal(first);
  8. }
  9. -----------------------------------------
  10. private void doSignal(Node first) {
  11. do {
  12. //firstWaiter指针指向头结点的下一个节点
  13. if ( (firstWaiter = first.nextWaiter) == null)
  14. lastWaiter = null;
  15. //头节点的nextWaiter指针设置为null
  16. first.nextWaiter = null;
  17. //通过while循环,将条件等待队列的头节点移到CLH队列
  18. } while (!transferForSignal(first) &&
  19. (first = firstWaiter) != null);
  20. }
  21. -------------------------------------------------

微信截图_20210801105309.png

  1. final boolean transferForSignal(Node node) {
  2. //将节点的状态由CONDITION设置为0
  3. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  4. //设置失败直接return false
  5. return false;
  6. //将节点加入CLH队列,返回旧的tail节点
  7. Node p = enq(node);
  8. int ws = p.waitStatus;
  9. //如果节点时CANCELLED状态或者旧的tail节点waitStatus设置失败,则直接唤醒线程(做异常处理)
  10. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  11. LockSupport.unpark(node.thread);
  12. return true;
  13. }

入队

入队通过 putIndex 获取当前插入的位置

  1. private void enqueue(E x) {
  2. //获取存放元素的数组
  3. final Object[] items = this.items;
  4. //通过putIndex添加元素
  5. items[putIndex] = x;
  6. //如果putIndex等于数组最后一个元素下标,putIndex重置为0
  7. if (++putIndex == items.length)
  8. putIndex = 0;
  9. count++;
  10. //将notEmpty条件等待队列中的队头元素放入CLH队列
  11. notEmpty.signal();
  12. }

入队主要有四种方式,每种入队方式表现都不一样
微信截图_20210801105816.png

add()

如果队列满了,调用add()方法插入元素会抛异常(调用父类AbstractQueue的add方法)

  1. public boolean add(E e) {
  2. //调用了父类
  3. return super.add(e);
  4. }
  5. -------------------------------
  6. //最终调用ArrayBlockingQueue的offer()方法,如果失败则抛异常
  7. public boolean add(E e) {
  8. if (offer(e))
  9. return true;
  10. else
  11. throw new IllegalStateException("Queue full");
  12. }

offer()

入队成功返回true,入队失败返回false

  1. public boolean offer(E e) {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. //加锁
  5. lock.lock();
  6. try {
  7. if (count == items.length)
  8. return false;
  9. else {
  10. enqueue(e);
  11. return true;
  12. }
  13. } finally {
  14. lock.unlock();
  15. }
  16. }

offer(e, time, unit)

如果队列满了,则阻塞 timeout 时长后再次尝试,如果队列还是已满,则返回false

  1. public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  2. checkNotNull(e);
  3. long nanos = unit.toNanos(timeout);
  4. final ReentrantLock lock = this.lock;
  5. lock.lockInterruptibly();
  6. try {
  7. while (count == items.length) {
  8. if (nanos <= 0)
  9. return false;
  10. //如果队列已满,则阻塞timeout时长,该方法返回 timeout-实际阻塞时间
  11. //阻塞timeout时长后唤醒,while循环重新判断队列是否已满,是则返回false,入队失败
  12. nanos = notFull.awaitNanos(nanos);
  13. }
  14. enqueue(e);
  15. return true;
  16. } finally {
  17. lock.unlock();
  18. }
  19. }

put()

如果队列已满,则加入条件等待队列,直到出队操作调用 signal() 方法将节点加入CLH队列,后续争夺锁完成入队操作。

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. while (count == items.length)
  7. //如果队列已满,则加入条件等待队列中
  8. notFull.await();
  9. enqueue(e);
  10. } finally {
  11. lock.unlock();
  12. }
  13. }

出队

通过 takeIndex 获取当前出队元素的位置

  1. private E dequeue() {
  2. final Object[] items = this.items;
  3. E x = (E) items[takeIndex];
  4. items[takeIndex] = null;
  5. //如果takeIndex等于数组最后一个元素下标,putIndex重置为0
  6. if (++takeIndex == items.length)
  7. takeIndex = 0;
  8. count--;
  9. if (itrs != null)
  10. itrs.elementDequeued();
  11. notFull.signal();
  12. return x;
  13. }

出队也有四种方法,每种方式表现都不一样
微信截图_20210801152151.png

take()

如果队列为空,则加入条件等待队列,直到入队操作调用 signal() 方法将节点加入CLH队列,后续争夺锁完成入队操作。

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. while (count == 0)
  6. //如果队列为空,则加入条件等待队列阻塞
  7. notEmpty.await();
  8. return dequeue();
  9. } finally {
  10. lock.unlock();
  11. }
  12. }

poll()

如果队列为空,则返回null

  1. public E poll() {
  2. final ReentrantLock lock = this.lock;
  3. lock.lock();
  4. try {
  5. //如果队列为空,则返回null
  6. return (count == 0) ? null : dequeue();
  7. } finally {
  8. lock.unlock();
  9. }
  10. }

poll(time, unit)

如果队列为空,阻塞timeout时长,再次判断是否为空,是则返回null

  1. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  2. long nanos = unit.toNanos(timeout);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. while (count == 0) {
  7. if (nanos <= 0)
  8. return null;
  9. //如果队列为空,则阻塞timeout时长
  10. //阻塞timeout时长后唤醒,while循环重新判断队列是否为空,是则返回null
  11. nanos = notEmpty.awaitNanos(nanos);
  12. }
  13. return dequeue();
  14. } finally {
  15. lock.unlock();
  16. }
  17. }

remove()

移除指定元素,通过 takeIndex 开始遍历,直到找到指定元素

  1. public boolean remove(Object o) {
  2. if (o == null) return false;
  3. final Object[] items = this.items;
  4. final ReentrantLock lock = this.lock;
  5. lock.lock();
  6. try {
  7. if (count > 0) {
  8. //获取putIndex
  9. final int putIndex = this.putIndex;
  10. //获取takeIndex
  11. int i = takeIndex;
  12. do {
  13. //找到指定元素
  14. if (o.equals(items[i])) {
  15. //移除该元素
  16. removeAt(i);
  17. return true;
  18. }
  19. if (++i == items.length)
  20. i = 0;
  21. } while (i != putIndex);
  22. }
  23. return false;
  24. } finally {
  25. lock.unlock();
  26. }
  27. }

删除前还要重新调整 putIndex 位置

  1. 如果删除元素的下标等于 takeIndex,只需将takeIndex自增1
  2. 如果删除元素的下标不等于 takeIndex,则将 removeIndex 和 putIndex之间的元素相继往后移动一个单位,最后将 item[putIndex-1]的元素置为null,putIndex也向后移动一个单位

    1. void removeAt(final int removeIndex) {
    2. final Object[] items = this.items;
    3. //如果要删除元素的下标等于 takeIndex
    4. if (removeIndex == takeIndex) {
    5. items[takeIndex] = null;
    6. //takeIndex自增1,如果自增后等于数组长度,重置为0
    7. if (++takeIndex == items.length)
    8. takeIndex = 0;
    9. count--;
    10. if (itrs != null)
    11. itrs.elementDequeued();
    12. } else {
    13. //获取putIndex
    14. final int putIndex = this.putIndex;
    15. //将 removeIndex 和 putIndex-1之间的元素往后移一位
    16. for (int i = removeIndex;;) {
    17. int next = i + 1;
    18. if (next == items.length)
    19. next = 0;
    20. if (next != putIndex) {
    21. items[i] = items[next];
    22. i = next;
    23. } else {
    24. //将对应数组下标元素置为null
    25. items[i] = null;
    26. //putIndex后移一位
    27. this.putIndex = i;
    28. break;
    29. }
    30. }
    31. count--;
    32. if (itrs != null)
    33. itrs.removedAt(removeIndex);
    34. }
    35. notFull.signal();
    36. }

    微信截图_20210801170503.png