什么是阻塞队列? 当阻塞队列是空的时候,从队列中获取元素的操作将会被阻塞。 当阻塞队列是满的时候,往队列里添加元素的操作将会被阻塞。

image.png
在多线程领域,所谓阻塞,在某些情况下会被挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。
为什么需要blockingqueue?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切blockingqueue都给包办了。
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
blockingqueue核心方法
image.png
add:超过队列长度会报错,成功会返回true
Element() 返回队首元素
remove:移除 元素,没有元素可以移除则报错

offset:插入成功true,失败false,不报错
poll:取不到就返回null
peek,取出队首元素,不报错

put:只管插入没有返回值,当队列容量满时,则会等待(阻塞)
take:取出元素并返回,当无元素可取时,则会等待

Offer(e,time,unit):插入后遇到队列已满,则会等待两秒,2秒之后还是队列满的状态,则返回false

  1. blockingQueue.offer("a", 2L, TimeUnit.SECONDS)

一、队列分类

ArrayBlockingQueue:由数组结构组成的有界阻塞队列
LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列
SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。即只存储单个元素。
。。。。。。。

ArrayBlockingQueue

  1. public class BlockingQueueDemo {
  2. public static void main(String[] args) {
  3. BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
  4. System.out.println(blockingQueue.add("a"));
  5. System.out.println(blockingQueue.add("b"));
  6. System.out.println(blockingQueue.add("c"));
  7. System.out.println(blockingQueue.add("x"));
  8. }
  9. }

输出,第四个会报错,队列已满

  1. true
  2. true
  3. true
  4. Exception in thread "main" java.lang.IllegalStateException: Queue full
  5. at java.util.AbstractQueue.add(AbstractQueue.java:98)
  6. at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
  7. at com.supkingx.base.j_collection.Queue.BlockingQueueDemo.main(BlockingQueueDemo.java:20)

LinkedBlockingQueue

https://blog.csdn.net/Evankaka/article/details/51706109

从LinkedBlockingQueue的源码中,我们可以看出他和ArrayBlockingQueue主要有以下两点区别: 1、ArrayBlockingQueue数据是放在一个数组中。LinkedBlockingQueue是放在一个Node节点中,构成一个链接。 2、ArrayBlockingQueue取元素和放元素都是同一个锁,而LinkedBlockingQueue有两个锁,一个放入锁,一个取得锁。分别对应放入元素和取得元素时的操作。这是由链表的结构所确定的。但是删除一个元素时,要同时获得放入锁和取得锁。

SynchronousQueue

SynchronousQueue 这个队列实现了 BlockingQueue接口。该队列的特点 1.容量为0,无论何时 size方法总是返回0

  1. put操作阻塞, 直到另外一个线程取走队列的元素。 3.take操作阻塞,直到另外的线程put某个元素到队列中。
  2. 任何线程只能取得其他线程put进去的元素,而不会取到自己put进去的元素

产生一个元素,消费一个元素。依次进行

  1. public class BlockingQueueDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. // BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(1);
  4. // SynchronousQueue 只存储单个元素,直到被消费,否则就会一直阻塞,等待被消费
  5. BlockingQueue<Object> blockingQueue = new SynchronousQueue<>();
  6. new Thread(()->{
  7. try {
  8. blockingQueue.put("1");
  9. // 只有当SynchronousQueue里的元素被使用了,才会走到下一步,否则会一直阻塞,等待被使用
  10. System.out.println(Thread.currentThread().getName()+"\t put 1");
  11. blockingQueue.put("2");
  12. System.out.println(Thread.currentThread().getName()+"\t put 2");
  13. blockingQueue.put("3");
  14. System.out.println(Thread.currentThread().getName()+"\t put 3");
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. },"AAA").start();
  19. new Thread(()->{
  20. try {
  21. TimeUnit.SECONDS.sleep(5);
  22. System.out.println(Thread.currentThread().getName()+"\t"+blockingQueue.take());
  23. TimeUnit.SECONDS.sleep(5);
  24. System.out.println(Thread.currentThread().getName()+"\t"+blockingQueue.take());
  25. TimeUnit.SECONDS.sleep(5);
  26. System.out.println(Thread.currentThread().getName()+"\t"+blockingQueue.take());
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. },"BBB").start();
  31. }
  32. }

1、如果是使用 ArrayBlockingQueue 则结果如下

  1. AAA put 1
  2. BBB 1
  3. AAA put 2
  4. BBB 2
  5. AAA put 3
  6. BBB 3

2、如果是使用 SynchronousQueue 则结果如下

  1. 输出结果:
  2. BBB 1
  3. AAA put 1
  4. BBB 2
  5. AAA put 2
  6. BBB 3
  7. AAA put 3

注意观察结果:
blockingQueue.put(“1”);之后,会去BBB现场take()到该元素,然后回到AAA线程继续执行。
即先put,再take,一次一个元素,依次执行。

二、使用场景

1、消费者/生产者

定义资源类

  1. public class MyResource {
  2. // 利用volatile修饰,提高可见性
  3. private volatile boolean FLAG = true; // 默认开启,进行生产+消费
  4. private AtomicInteger atomicInteger = new AtomicInteger();
  5. BlockingQueue<String> blockingQueue = null;
  6. public MyResource(BlockingQueue<String> blockingQueue) {
  7. this.blockingQueue = blockingQueue;
  8. System.out.println(blockingQueue.getClass().getName());
  9. }
  10. public void myProd() throws InterruptedException {
  11. String data = null;
  12. boolean retValue;
  13. while (FLAG) {
  14. // 获取数据塞入队列
  15. data = atomicInteger.incrementAndGet() + "";
  16. // 向队列添加数据,队列满了则等待2秒
  17. retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
  18. if (retValue) {
  19. System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "成功");
  20. } else {
  21. System.out.println(Thread.currentThread().getName() + "\t 插入队列" + data + "失败");
  22. }
  23. // 降低生产频率,给消费以时间
  24. TimeUnit.MILLISECONDS.sleep(500);
  25. }
  26. System.out.println(Thread.currentThread().getName() + "\t大老板叫停了,表示FLAG=false,生产动作结束");
  27. }
  28. public void myConsumer() throws InterruptedException {
  29. String result = null;
  30. while (FLAG) {
  31. // 2s取不到,就不取了
  32. result = blockingQueue.poll(2L, TimeUnit.SECONDS);
  33. if (null == result || result.equalsIgnoreCase("")) {
  34. FLAG = false;
  35. System.out.println(Thread.currentThread().getName() + "\t 超过2s没取到");
  36. System.out.println();
  37. System.out.println();
  38. return;
  39. }
  40. System.out.println(Thread.currentThread().getName() + "\t 消费队列" + result + "成功");
  41. }
  42. }
  43. public void stop() {
  44. this.FLAG = false;
  45. }
  46. }
  1. public class ProdConsumer {
  2. public static void main(String[] args) {
  3. // 定义队列容量大小为10,超过10则插入失败
  4. MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
  5. new Thread(()->{
  6. System.out.println(Thread.currentThread().getName()+"生产线程启动");
  7. try {
  8. myResource.myProd();
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. },"prod").start();
  13. new Thread(()->{
  14. System.out.println(Thread.currentThread().getName()+"消费线程启动");
  15. try {
  16. myResource.myConsumer();
  17. System.out.println();
  18. System.out.println();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. },"consumer").start();
  23. try {
  24. TimeUnit.SECONDS.sleep(5);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. System.out.println();
  29. System.out.println();
  30. System.out.println();
  31. System.out.println("5秒钟时间到,大老板main线程叫停,活动结束");
  32. myResource.stop();
  33. }
  34. }

产出结果

  1. java.util.concurrent.ArrayBlockingQueue
  2. prod生产线程启动
  3. consumer消费线程启动
  4. prod 插入队列1成功
  5. consumer 消费队列1成功
  6. prod 插入队列2成功
  7. consumer 消费队列2成功
  8. prod 插入队列3成功
  9. consumer 消费队列3成功
  10. prod 插入队列4成功
  11. consumer 消费队列4成功
  12. prod 插入队列5成功
  13. consumer 消费队列5成功
  14. 5秒钟时间到,大老板main线程叫停,活动结束
  15. prod 大老板叫停了,表示FLAG=false,生产动作结束
  16. consumer 超过2s没取到