一、SynchronousQueue

SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作 put必须等待消费者的移除操作take。
SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候, SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。

1、应用场景

SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了 SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

2、SynchronousQueue使用

在构造方法中没有传入容量的参数,容量是默认的。默认创建的队列是非公平的,也可以传入true设置为公平的,SynchronousQueue队列是有链表来实现的,公平是用队列结构来实现,即每次新的节点是插入链表的尾部,非公平是用栈结构来实现的,即每次新的节点从链表的头部插入进行压栈。

  1. public SynchronousQueue() {
  2. this(false);
  3. }
  4. public SynchronousQueue(boolean fair) {
  5. transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
  6. }

非公平的栈结构链表节点是SNode对象。next保存下一个节点,match用来交换生产者和消费者的数据,waiter绑定当前线程,item为元素的数据,mode是具体的操作模式,用来区分出队和入队操作,REQUEST 标注 SNode 的 mode 出队操作,DATA 标注 SNode 的 mode 入队操作。
1.png
公平的队列结构是使用QNode对象作为节点,在TransferQueue对象实例化的同时,就创建好了队列的头节点和尾节点(头尾相同)。

  1. TransferQueue() {
  2. QNode h = new QNode(null, false); // initialize to dummy node.
  3. head = h;
  4. tail = h;
  5. }

在QNode中,next指向下一个节点,item保存元素数据,waiter保存当前线程,isData表示出队或入队操作。
3.png
队列的出队(take()方法)和入队(put()方法)都是调用的transfer()方法,根据非公平和公平有不同的两种实现类来实现transfer()方法的逻辑。
2.png

3、公平模式源码

SynchronousQueue-TransferQueue-Producer.put&Consumer.take.png

4、非公平模式源码

SynchronousQueue-TransferStack-Producer.put&Consumer.take.png

二、PriorityBlockingQueue

PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,数组的默认长度是 11,虽然指定了数组的长度,但是可以无限的扩充,直到资源消耗尽为止,每次出队都返回优先级别最高的或者最低的元素。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。
优先级队列PriorityQueue,队列中每个元素都有一个优先级,出队的时候,优先级最高的先出。

1、PriorityBlockingQueue使用

  1. // 初始容量
  2. private static final int DEFAULT_INITIAL_CAPACITY = 11;
  3. // 最大容量,Integer最大值减8
  4. private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
  5. // 保存元素的数组
  6. private transient Object[] queue;
  7. // 数组中的元素个数
  8. private transient int size;
  9. // 读和写都使用同一把锁
  10. private final ReentrantLock lock;
  11. // 因为可以无限扩容,所以不存在写满的情况,在数组为空的时候,消费者线程阻塞
  12. private final Condition notEmpty;
  1. public static void main(String[] args) throws InterruptedException {
  2. //创建优先级阻塞队列 Comparator为null,自然排序
  3. PriorityBlockingQueue<Integer> queue=new PriorityBlockingQueue<Integer>(3);
  4. // 自定义Comparator
  5. // PriorityBlockingQueue queue=new PriorityBlockingQueue<Integer>(
  6. // 5, new Comparator<Integer>() {
  7. // @Override
  8. // public int compare(Integer o1, Integer o2) {
  9. // return o2-o1;
  10. // }
  11. // });
  12. Random random = new Random();
  13. System.out.println("put:");
  14. for (int i = 0; i < 5; i++) {
  15. int j = random.nextInt(100);
  16. System.out.print(j+" ");
  17. queue.put(j);
  18. }
  19. System.out.println("\ntake:");
  20. for (int i = 0; i < 5; i++) {
  21. System.out.print(queue.take()+" ");
  22. }
  23. }

2、使用普通线性数组(无序)来表示优先级队列

执行插入操作时,直接将元素插入到数组末端,需要的成本为O(1)。
获取优先级最高元素,我们需要遍历整个线性队列,匹配出优先级最高元素,需要的成 本为o(n)。
删除优先级最高元素,我们需要两个步骤,第一找出优先级最高元素,第二步删除优先 级最高元素,然后将后面的元素依次迁移,填补空缺,需要的成本为O(n)+O(n)=O(n)。

3、使用一个按顺序排列的有序向量实现优先级队列

获取优先级最高元素,O(1)。
删除优先级最高元素,O(1)。
插入一个元素,需要两个步骤,第一步我们需要找出要插的位置,这里我们可以使用二 分查找,成本为O(logn),第二步是插入元素之后,将其所有后继进行后移操作,成本为 O(n),所有总成本为O(logn)+O(n)=O(n)。

4、二叉堆

完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
完全二叉树可以表示一个数组,二叉树的根节点对应数组下标为0的,第一个左节点对应下标1,第一个右节点对应下标2,从左到右,从上到下。
所以只要知道某个元素在数组中的下标t,就可以找到其父节点和子节点在数组中的下标。
父节点下标:(t-1)/2
左子节点下标:(2t)+1;右子节点下标:(2t) + 2
二叉树.png
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型,大顶堆和小顶堆。
大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值。
小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。
二叉堆.png
大顶堆和小顶堆都是堆排序的一种实现。在一个元素入队之后,就和其父级节点去比较,更大的元素就上浮,知道根节点是最大的元素。在出队时,根节点和最末尾的元素交换,然后交换后的根节点和其子节点比较,更小的元素就下沉,堆排序对于相等的元素是没有顺序的,所以是不稳定的排序。PriorityBlockingQueue默认是最小堆排序。

5、put()方法

PriorityBlockingQueue-put.png

6、take()方法

PriorityBlockingQueue-take.png

三、DelayQueue

延迟队列也是基于优先级队列实现的,当队列为空时会阻塞,延迟队列中的元素需要实现Delayed接口,因为Delayed接口继承了Comparable比较器,所以需要重写getDelay()和compareTo()方法。

  1. public interface Delayed extends Comparable<Delayed> {
  2. long getDelay(TimeUnit unit);
  3. }

在DelayQueue中,也是使用了PriorityQueue来保存元素,和堆排序,其中leader属性是保存了当前操作队列的线程,如果这个值不为空,那么则表示当前有线程正在操作,其他线程就阻塞,避免了无效的等待。3.jpg

  1. public class DelayQueueDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. //实例化一个DelayQueue
  4. BlockingQueue<DelayObject> blockingQueue = new DelayQueue<>();
  5. //向DelayQueue添加四个元素对象,注意延时时间不同
  6. blockingQueue.put(new DelayObject("A", 1000 * 10)); //延时10秒
  7. blockingQueue.put(new DelayObject("B", 4000 * 10)); //延时40秒
  8. blockingQueue.put(new DelayObject("C", 3000 * 10)); //延时30秒
  9. blockingQueue.put(new DelayObject("D", 2000 * 10)); //延时20秒
  10. //将对象从DelayQueue取出,注意取出的顺序与延时时间有关
  11. System.out.println(blockingQueue.take()); //取出A
  12. System.out.println(blockingQueue.take()); //取出D
  13. System.out.println(blockingQueue.take()); //取出C
  14. System.out.println(blockingQueue.take()); //取出B
  15. }
  16. }
  17. class DelayObject implements Delayed {
  18. private String name;
  19. private long time; //延时时间
  20. public DelayObject(String name, long delayTime) {
  21. this.name = name;
  22. this.time = System.currentTimeMillis() + delayTime;
  23. }
  24. @Override
  25. public long getDelay(TimeUnit unit) {
  26. long diff = time - System.currentTimeMillis();
  27. return unit.convert(diff, TimeUnit.MILLISECONDS);
  28. }
  29. @Override
  30. public int compareTo(Delayed obj) {
  31. if (this.time < ((DelayObject) obj).time) {
  32. return -1;
  33. }
  34. if (this.time > ((DelayObject) obj).time) {
  35. return 1;
  36. }
  37. return 0;
  38. }
  39. @Override
  40. public String toString() {
  41. Date date = new Date(time);
  42. SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  43. return "\nDelayObject:{"
  44. + "name=" + name
  45. + ", time=" + sd.format(date)
  46. + "}";
  47. }
  48. }

1、put()方法

DelayQueue-put.png

2、take()方法

DelayQueue-take.png

四、如何选择适合的阻塞队列

1、线程池对于阻塞队列的选择

线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue。
CachedThreadPool 选取的是 SynchronousQueue。
ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列。

2、选择策略

1)功能

是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。

2)容量

是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的, 如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。

3)能否扩容

有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况, 也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反, PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。

4)内存结构

通过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。

5)性能

LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。