阻塞队列

BlockingQueue接口

BlockingQueue 继承了 Queue 接口,是队列的一种。阻塞队列(BlockingQueue)是一个在队列基础上又支持了两个附加操作的队列,常用解耦。两个附加操作:

  • 支持阻塞的插入方法put: 队列满时,队列会阻塞插入元素的线程,直到队列不满
  • 支持阻塞的移除方法take: 队列空时,获取元素的线程会等待队列变为非空(等到队列有元素)

BlockingQueue和JDK集合包中的Queue接口兼容,同时在其基础上增加了阻塞功能。

入队:

  • offer(E e):如果队列没满,返回true,如果队列已满,返回false(不阻塞)
  • offer(E e, long timeout, TimeUnit unit):可以设置阻塞时间,如果队列已满,则进行阻塞。超过阻塞时间,则返回false
  • put(E e):队列没满的时候是正常的插入,如果队列已满,则阻塞,直至队列空出位置


出队:

  • poll():如果有数据,出队,如果没有数据,返回null (不阻塞)
  • poll(long timeout, TimeUnit unit):可以设置阻塞时间,如果没有数据,则阻塞,超过阻塞时间,则返回null
  • take():队列里有数据会正常取出数据并删除;但是如果队列里无数据,则阻塞,直到队列里有数据

截屏2022-03-29 23.37.45.png

阻塞队列特性

阻塞

阻塞队列区别于其他类型的队列的最主要的特点就是“阻塞”这两个字,阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put 方法。

take 方法

take 方法的功能是获取并移除队列的头结点,通常在队列里有数据的时候是可以正常移除的。可是一旦执行 take 方法的时候,队列里无数据,则阻塞,直到队列里有数据。一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。

put 方法

put 方法插入元素时,如果队列没有满,那就和普通的插入一样是正常的插入,但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间。如果后续队列有了空闲空间,比如消费者消费了一个元素,那么此时队列就会解除阻塞状态,并把需要添加的数据添加到队列中。

应用场景

BlockingQueue 是线程安全的,我们在很多场景下都可以利用线程安全的队列来优雅地解决我们业务自身的线程安全问题。比如说,使用生产者/消费者模式的时候,我们生产者只需要往队列里添加元素,而消费者只需要从队列里取出它们就可以了。
截屏2022-03-29 23.50.48.png

阻塞队列优点

  1. 降低了我们开发的难度和工作量,因为阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的,不会发生线程安全问题。生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题
  2. 队列它还能起到隔离的作用,生产者只需要往队列里添加元素,而消费者只需要从队列里取出它们就可。任务被放在了阻塞队列中,而负责放任务的线程是无法直接访问到我们具体实现操作的对象的,实现了隔离,提高了安全性

常见阻塞队列

截屏2022-03-30 00.13.55.png

ArrayBlockingQueue

ArrayBlockingQueue是最典型的有界阻塞队列,其内部是用数组存储元素的,初始化时需要指定容量大小,利用 ReentrantLock 实现线程安全。
在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。
使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。
截屏2022-03-30 00.46.05.png

ArrayBlockingQueue的原理

利用了Lock锁的Condition通知机制进行阻塞控制。

入队put方法

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. //加锁,可以被中断,如果线程中断抛出异常
  5. lock.lockInterruptibly();
  6. try {
  7. //如果阻塞队列已经满,则将生产者挂起,等待消费者唤醒
  8. while (count == items.length)
  9. notFull.await();//队列满了,使用notFull等待(生产者阻塞)
  10. //入队
  11. enqueue(e);
  12. } finally {
  13. lock.unlock();// 唤醒消费者线程
  14. }
  15. }
  16. private void enqueue(E x) {
  17. // assert lock.getHoldCount() == 1;
  18. // assert items[putIndex] == null;
  19. final Object[] items = this.items;
  20. //入队 使用的putIndex
  21. items[putIndex] = x;
  22. //这里会先把putIndex+1然后判断是否等于数组的长度,等于则把putIndex=0
  23. if (++putIndex == items.length)
  24. putIndex = 0;//环形数组,putIndex指针到数组尽头了,返回头部
  25. count++;
  26. //notEmpty条件队列转同步队列,准备唤醒消费者线程,因为入队了一个元素,肯定不为空了
  27. notEmpty.signal();
  28. }

出队take方法

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. //加锁,如果线程中断抛出异常
  4. //这个loc和put的是同一把锁,所以可以实现线程安全,独占
  5. lock.lockInterruptibly();
  6. try {
  7. //如果队列为空,则消费者挂起
  8. while (count == 0)
  9. notEmpty.await();
  10. //出队
  11. return dequeue();
  12. } finally {
  13. lock.unlock();// 唤醒生产者线程
  14. }
  15. }
  16. private E dequeue() {
  17. // assert lock.getHoldCount() == 1;
  18. // assert items[takeIndex] != null;
  19. final Object[] items = this.items;
  20. @SuppressWarnings("unchecked")
  21. E x = (E) items[takeIndex];
  22. items[takeIndex] = null;
  23. if (++takeIndex == items.length)
  24. takeIndex = 0;//环形数组,takeIndex 指针到数组尽头了,返回头部
  25. count--;
  26. if (itrs != null)
  27. itrs.elementDequeued();
  28. //notFull条件队列转同步队列,准备唤醒生产者线程,此时队列有空位
  29. notFull.signal();
  30. return x;
  31. }