1. 阻塞队列的产生来自于典型的生产者-消费者模式。

记住2个阻塞的方法

  • take:从队列中取
  • put:放入队列

两者个方法在队列满或空时都会阻塞

不可放Null值

阻塞队列中不可put一个null

各种阻塞队列

阻塞队列 - 图1

  • 其中有两个阻塞队列说是无界的。我大致看了一下源码好像并不对。

阻塞队列原理

这里我们就以ArrayBlockingQueue为例,阅读其put()和take()的方法

构造方法

  1. /**
  2. * Serialization ID. This class relies on default serialization
  3. * even for the items array, which is default-serialized, even if
  4. * it is empty. Otherwise it could not be declared final, which is
  5. * necessary here.
  6. */
  7. private static final long serialVersionUID = -817911632652898426L;
  8. /** 存放元素的数组 */
  9. final Object[] items;
  10. /** 下一个取数位置 */
  11. int takeIndex;
  12. /** 下一个入队列位置 */
  13. int putIndex;
  14. /** 元素个数 */
  15. int count;
  16. /*
  17. * Concurrency control uses the classic two-condition algorithm
  18. * found in any textbook.
  19. */
  20. /** 可重入锁 */
  21. final ReentrantLock lock;
  22. /** 等待有元素入队列条件 */
  23. private final Condition notEmpty;
  24. /** 等待队列不满条件 */
  25. private final Condition notFull;
  26. /**
  27. * Creates an {@code ArrayBlockingQueue} with the given (fixed)
  28. * capacity and default access policy.
  29. *
  30. * @param capacity the capacity of this queue
  31. * @throws IllegalArgumentException if {@code capacity < 1}
  32. */
  33. public ArrayBlockingQueue(int capacity) {
  34. this(capacity, false);
  35. }
  36. /**
  37. * Creates an {@code ArrayBlockingQueue} with the given (fixed)
  38. * capacity and the specified access policy.
  39. *
  40. * @param capacity the capacity of this queue
  41. * @param fair if {@code true} then queue accesses for threads blocked
  42. * on insertion or removal, are processed in FIFO order;
  43. * if {@code false} the access order is unspecified.
  44. * @throws IllegalArgumentException if {@code capacity < 1}
  45. */
  46. public ArrayBlockingQueue(int capacity, boolean fair) {
  47. if (capacity <= 0)
  48. throw new IllegalArgumentException();
  49. this.items = new Object[capacity];
  50. lock = new ReentrantLock(fair);
  51. notEmpty = lock.newCondition();
  52. notFull = lock.newCondition();
  53. }
  • 创建对象时,需要初始化可重入锁,以及2创建两个条件

    put操作

    ```java public void put(E e) throws InterruptedException { //检查不为null checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {

    1. //当元素个数==队列(数组长度) 循环等待 notFull 条件
    2. while (count == items.length)
    3. notFull.await();
    4. //入队列
    5. enqueue(e);

    } finally {

    1. lock.unlock();

    } }

    private static void checkNotNull(Object v) { if (v == null)

    1. throw new NullPointerException();

    }

    /**

    • Inserts element at current put position, advances, and signals.
    • Call only when holding lock.
    • 入队列时还通知 不为空 */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length)
      1. putIndex = 0;
      count++; notEmpty.signal(); }
  1. - 入队列时,判断元素不为空
  2. - 若队列已满,通过 NotFull阻塞
  3. - 入队列后,唤醒NotEmpty
  4. <a name="HttPy"></a>
  5. ## take操作
  6. ```java
  7. public E take() throws InterruptedException {
  8. final ReentrantLock lock = this.lock;
  9. lock.lockInterruptibly();
  10. try {
  11. while (count == 0)
  12. notEmpty.await();
  13. return dequeue();
  14. } finally {
  15. lock.unlock();
  16. }
  17. }
  18. /**
  19. * Extracts element at current take position, advances, and signals.
  20. * Call only when holding lock.
  21. */
  22. private E dequeue() {
  23. // assert lock.getHoldCount() == 1;
  24. // assert items[takeIndex] != null;
  25. final Object[] items = this.items;
  26. @SuppressWarnings("unchecked")
  27. E x = (E) items[takeIndex];
  28. items[takeIndex] = null;
  29. if (++takeIndex == items.length)
  30. takeIndex = 0;
  31. count--;
  32. if (itrs != null)
  33. itrs.elementDequeued();
  34. notFull.signal();
  35. return x;
  36. }
  • 若队列元素为0则,等待notEmpty条件
  • 出队列后,唤醒等待notFull条件的线程

    参考文章