BlockingQueue

BlockingQueue 是 Java 并发包中的一个接口,用来表示线程安全队列,可以放入并获取实例。它长应用于生产者消费者模式,例如下图:
image.png

  • 生产线程:持续生产新对象并将它们插入队列,直到队列达到它可以包含的上限。换句话说,这是极限。如果阻塞队列达到其上限,则会在尝试插入新对象时阻塞生产线程。在消耗线程将对象带出队列之前,它一直处于阻塞状态。
  • 消费线程:不断将对象从阻塞队列中取出,并对其进行处理。如果消费线程试图将对象从空队列中取出,则消费线程将被阻塞,直到生成的线程将对象放入队列。

BlockingQueue 方法按照官方的分类可以分为四类

  • Throws exception:如果操作不能立即发生,则抛出一个异常
  • Special value:如果操作不能立即执行,则返回空或者 false
  • Blocks:如果操作不能立即执行,则阻塞
  • Times out:如果操作不能立即执行,则定时阻塞,返回一个特殊值(boolen 空或者队列的值)用来告知操作成功与否
Throws exception Special value Blocks Times out
Insert add(E e)) bool offer(E e)) put(E e) offer(E e,long timeout TimUnit unit)
Remove remove(Object o)) E poll() E take() E poll(long timeout,TimeUnit unit)
Examine element()) E peek()) - -

BlockingQueue 是个接口,在并发包下有多个它的实现类

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockQueue
  • PriorityBlockingQueue
  • SynchronousQueue
  • LinkedBlockingDeque

接下来就是挨个的去看下这些实现

ArrayBlockingQueue

ArrayBlockingQueue 由数组支持的有界阻塞队列,队列基于数组实现,容量大小在初始化对象时已经定义好。此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。默认采用非公平锁,结构如下:
image.png
队列的内部使用 ReentrantLock 锁两个 Condition(条件) 来实现的 take 和 put 方法的阻塞

  1. // ArrayBlockingQueue.java
  2. public ArrayBlockingQueue(int capacity, boolean fair) {
  3. if (capacity <= 0)
  4. throw new IllegalArgumentException();
  5. this.items = new Object[capacity];
  6. lock = new ReentrantLock(fair);
  7. notEmpty = lock.newCondition();
  8. notFull = lock.newCondition();
  9. }

下面是个阻塞的 take 的简单实例:

  1. Executor executor = Executors.newFixedThreadPool(2);
  2. ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(7);
  3. executor.execute(() -> {
  4. try {
  5. System.out.println("等待任务...");
  6. String str = queue.take();
  7. System.out.println("从队列里消费了" + str);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. });
  12. executor.execute(() -> {
  13. try {
  14. Thread.sleep(2000);
  15. System.out.println("发布任务");
  16. queue.offer("任务 1");
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. });

DelayQueue

LinkedBlockQueue

PriorityBlockingQueue

SynchronousQueue


LinkedBlockingDeque