一 特性

  • 数组支持的有界队列
  • 按照FIFO(先进先出) 对元素进行排序
  • 一旦创建则不能增加其容量
  • 队列满了之后再添加会导致操作阻塞,向空队列提取元素也会被阻塞
  • 支持对等待的生产者线程和使用者线程进行排序的可选公平策略
  • 采用ReentrantLock + Condition实现线程安全和阻塞
  • 生产者和消费者公用一把锁
  • 支持阻塞方法和 非阻塞方法
  • 不支持null元素

    二 属性

    1. /** The queued items 存放元素的数组 */
    2. final Object[] items;
    3. /** items index for next take, poll, peek or remove 出队索引*/
    4. int takeIndex;
    5. /** items index for next put, offer, or add 入队索引*/
    6. int putIndex;
    7. /** Number of elements in the queue */
    8. int count;
    9. /*
    10. * Concurrency control uses the classic two-condition algorithm
    11. * found in any textbook.
    12. */
    13. /** Main lock guarding all access */
    14. final ReentrantLock lock;
    15. /** Condition for waiting takes */
    16. private final Condition notEmpty;
    17. /** Condition for waiting puts */
    18. private final Condition notFull;
    19. /**
    20. * Shared state for currently active iterators, or null if there
    21. * are known not to be any. Allows queue operations to update
    22. * iterator state.
    23. */
    24. transient Itrs itrs = null;

    三 构造方法

    1. public ArrayBlockingQueue(int capacity, boolean fair) {
    2. if (capacity <= 0)
    3. throw new IllegalArgumentException();
    4. this.items = new Object[capacity];
    5. // 这里是是否按照FIFO的策略进行访问 底层ReentrantLock 的实现则为 如果是true 则当前线程加入到等待队列中 否则直接CAS设置
    6. lock = new ReentrantLock(fair);
    7. notEmpty = lock.newCondition();
    8. notFull = lock.newCondition();
    9. }

    四 添加

    ```java //调用offer
    public boolean add(E e) {

    1. return super.add(e);

    }

    public boolean offer(E e) {

    1. checkNotNull(e);
    2. final ReentrantLock lock = this.lock;
    3. lock.lock();
    4. try {
    5. //队列满了则插入失败
    6. if (count == items.length)
    7. return false;
    8. else {
    9. enqueue(e);
    10. return true;
    11. }
    12. } finally {
    13. lock.unlock();
    14. }

    }

    //真正的插入元素 private void enqueue(E x) {

    1. // assert lock.getHoldCount() == 1;
    2. // assert items[putIndex] == null;
    3. final Object[] items = this.items;
    4. items[putIndex] = x;
    5. //这里到队尾之后再从数组头开始 外边进来已经进行了大小判断 超过数组大小不会再进来了
    6. if (++putIndex == items.length)
    7. putIndex = 0;
    8. count++;
    9. notEmpty.signal();

    }

//这个当队列满的时候会进行等待 而不是直接返回 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }

//插入的时候添加等待时间 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {

  1. checkNotNull(e);
  2. long nanos = unit.toNanos(timeout);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. while (count == items.length) {
  7. if (nanos <= 0)
  8. return false;
  9. nanos = notFull.awaitNanos(nanos);
  10. }
  11. enqueue(e);
  12. return true;
  13. } finally {
  14. lock.unlock();
  15. }
  16. }
  1. <a name="kNREb"></a>
  2. # 五 获取
  3. ```java
  4. //这个区别于下边的是会直接返回的
  5. public E poll() {
  6. final ReentrantLock lock = this.lock;
  7. lock.lock();
  8. try {
  9. return (count == 0) ? null : dequeue();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. //这个是当队列为空的时候会进入等待阶段
  15. public E take() throws InterruptedException {
  16. final ReentrantLock lock = this.lock;
  17. lock.lockInterruptibly();
  18. try {
  19. while (count == 0)
  20. notEmpty.await();
  21. return dequeue();
  22. } finally {
  23. lock.unlock();
  24. }
  25. }
  26. //一直等待下去直到有元素位置
  27. public E take() throws InterruptedException {
  28. final ReentrantLock lock = this.lock;
  29. lock.lockInterruptibly();
  30. try {
  31. while (count == 0)
  32. notEmpty.await();
  33. return dequeue();
  34. } finally {
  35. lock.unlock();
  36. }
  37. }
  38. // 这个会等待指定的时间
  39. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  40. long nanos = unit.toNanos(timeout);
  41. final ReentrantLock lock = this.lock;
  42. lock.lockInterruptibly();
  43. try {
  44. while (count == 0) {
  45. if (nanos <= 0)
  46. return null;
  47. nanos = notEmpty.awaitNanos(nanos);
  48. }
  49. return dequeue();
  50. } finally {
  51. lock.unlock();
  52. }
  53. }
  54. //真正的出队操作
  55. private E dequeue() {
  56. // assert lock.getHoldCount() == 1;
  57. // assert items[takeIndex] != null;
  58. final Object[] items = this.items;
  59. @SuppressWarnings("unchecked")
  60. E x = (E) items[takeIndex];
  61. items[takeIndex] = null;
  62. if (++takeIndex == items.length)
  63. takeIndex = 0;
  64. count--;
  65. if (itrs != null)
  66. itrs.elementDequeued();
  67. notFull.signal();
  68. return x;
  69. }

六 删除

  1. //删除的话会循环数组 找到相等的元素然后删除
  2. public boolean remove(Object o) {
  3. if (o == null) return false;
  4. final Object[] items = this.items;
  5. final ReentrantLock lock = this.lock;
  6. lock.lock();
  7. try {
  8. if (count > 0) {
  9. final int putIndex = this.putIndex;
  10. int i = takeIndex;
  11. do {
  12. if (o.equals(items[i])) {
  13. removeAt(i);
  14. return true;
  15. }
  16. if (++i == items.length)
  17. i = 0;
  18. } while (i != putIndex);
  19. }
  20. return false;
  21. } finally {
  22. lock.unlock();
  23. }
  24. }
  25. void removeAt(final int removeIndex) {
  26. // assert lock.getHoldCount() == 1;
  27. // assert items[removeIndex] != null;
  28. // assert removeIndex >= 0 && removeIndex < items.length;
  29. final Object[] items = this.items;
  30. //如果删除的是头结点则直接删除
  31. if (removeIndex == takeIndex) {
  32. // removing front item; just advance
  33. items[takeIndex] = null;
  34. if (++takeIndex == items.length)
  35. takeIndex = 0;
  36. count--;
  37. if (itrs != null)
  38. itrs.elementDequeued();
  39. } else {
  40. // an "interior" remove
  41. // slide over all others up through putIndex.
  42. final int putIndex = this.putIndex;
  43. //元素从删除的位置开始 向putIndex 挨个前移
  44. for (int i = removeIndex;;) {
  45. int next = i + 1;
  46. if (next == items.length)
  47. next = 0;
  48. if (next != putIndex) {
  49. items[i] = items[next];
  50. i = next;
  51. } else {
  52. items[i] = null;
  53. this.putIndex = i;
  54. break;
  55. }
  56. }
  57. count--;
  58. if (itrs != null)
  59. itrs.removedAt(removeIndex);
  60. }
  61. notFull.signal();
  62. }