一、SynchronousQueue
SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作 put必须等待消费者的移除操作take。
SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候, SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
1、应用场景
SynchronousQueue非常适合传递性场景做交换工作,生产者的线程和消费者的线程同步传递某些信息、事件或者任务。
SynchronousQueue的一个使用场景是在线程池里。如果我们不确定来自生产者请求数量,但是这些请求需要很快的处理掉,那么配合SynchronousQueue为每个生产者请求分配一个消费线程是处理效率最高的办法。Executors.newCachedThreadPool()就使用了 SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
2、SynchronousQueue使用
在构造方法中没有传入容量的参数,容量是默认的。默认创建的队列是非公平的,也可以传入true设置为公平的,SynchronousQueue队列是有链表来实现的,公平是用队列结构来实现,即每次新的节点是插入链表的尾部,非公平是用栈结构来实现的,即每次新的节点从链表的头部插入进行压栈。
public SynchronousQueue() {this(false);}public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
非公平的栈结构链表节点是SNode对象。next保存下一个节点,match用来交换生产者和消费者的数据,waiter绑定当前线程,item为元素的数据,mode是具体的操作模式,用来区分出队和入队操作,REQUEST 标注 SNode 的 mode 出队操作,DATA 标注 SNode 的 mode 入队操作。
公平的队列结构是使用QNode对象作为节点,在TransferQueue对象实例化的同时,就创建好了队列的头节点和尾节点(头尾相同)。
TransferQueue() {QNode h = new QNode(null, false); // initialize to dummy node.head = h;tail = h;}
在QNode中,next指向下一个节点,item保存元素数据,waiter保存当前线程,isData表示出队或入队操作。
队列的出队(take()方法)和入队(put()方法)都是调用的transfer()方法,根据非公平和公平有不同的两种实现类来实现transfer()方法的逻辑。
3、公平模式源码
4、非公平模式源码
二、PriorityBlockingQueue
PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,数组的默认长度是 11,虽然指定了数组的长度,但是可以无限的扩充,直到资源消耗尽为止,每次出队都返回优先级别最高的或者最低的元素。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。
优先级队列PriorityQueue,队列中每个元素都有一个优先级,出队的时候,优先级最高的先出。
1、PriorityBlockingQueue使用
// 初始容量private static final int DEFAULT_INITIAL_CAPACITY = 11;// 最大容量,Integer最大值减8private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;// 保存元素的数组private transient Object[] queue;// 数组中的元素个数private transient int size;// 读和写都使用同一把锁private final ReentrantLock lock;// 因为可以无限扩容,所以不存在写满的情况,在数组为空的时候,消费者线程阻塞private final Condition notEmpty;
public static void main(String[] args) throws InterruptedException {//创建优先级阻塞队列 Comparator为null,自然排序PriorityBlockingQueue<Integer> queue=new PriorityBlockingQueue<Integer>(3);// 自定义Comparator// PriorityBlockingQueue queue=new PriorityBlockingQueue<Integer>(// 5, new Comparator<Integer>() {// @Override// public int compare(Integer o1, Integer o2) {// return o2-o1;// }// });Random random = new Random();System.out.println("put:");for (int i = 0; i < 5; i++) {int j = random.nextInt(100);System.out.print(j+" ");queue.put(j);}System.out.println("\ntake:");for (int i = 0; i < 5; i++) {System.out.print(queue.take()+" ");}}
2、使用普通线性数组(无序)来表示优先级队列
执行插入操作时,直接将元素插入到数组末端,需要的成本为O(1)。
获取优先级最高元素,我们需要遍历整个线性队列,匹配出优先级最高元素,需要的成 本为o(n)。
删除优先级最高元素,我们需要两个步骤,第一找出优先级最高元素,第二步删除优先 级最高元素,然后将后面的元素依次迁移,填补空缺,需要的成本为O(n)+O(n)=O(n)。
3、使用一个按顺序排列的有序向量实现优先级队列
获取优先级最高元素,O(1)。
删除优先级最高元素,O(1)。
插入一个元素,需要两个步骤,第一步我们需要找出要插的位置,这里我们可以使用二 分查找,成本为O(logn),第二步是插入元素之后,将其所有后继进行后移操作,成本为 O(n),所有总成本为O(logn)+O(n)=O(n)。
4、二叉堆
完全二叉树:除了最后一行,其他行都满的二叉树,而且最后一行所有叶子节点都从左向右开始排序。
完全二叉树可以表示一个数组,二叉树的根节点对应数组下标为0的,第一个左节点对应下标1,第一个右节点对应下标2,从左到右,从上到下。
所以只要知道某个元素在数组中的下标t,就可以找到其父节点和子节点在数组中的下标。
父节点下标:(t-1)/2
左子节点下标:(2t)+1;右子节点下标:(2t) + 2
二叉堆:完全二叉树的基础上,加以一定的条件约束的一种特殊的二叉树。根据约束条件的不同,二叉堆又可以分为两个类型,大顶堆和小顶堆。
大顶堆(最大堆):父结点的键值总是大于或等于任何一个子节点的键值。
小顶堆(最小堆):父结点的键值总是小于或等于任何一个子节点的键值。
大顶堆和小顶堆都是堆排序的一种实现。在一个元素入队之后,就和其父级节点去比较,更大的元素就上浮,知道根节点是最大的元素。在出队时,根节点和最末尾的元素交换,然后交换后的根节点和其子节点比较,更小的元素就下沉,堆排序对于相等的元素是没有顺序的,所以是不稳定的排序。PriorityBlockingQueue默认是最小堆排序。
5、put()方法
6、take()方法
三、DelayQueue
延迟队列也是基于优先级队列实现的,当队列为空时会阻塞,延迟队列中的元素需要实现Delayed接口,因为Delayed接口继承了Comparable比较器,所以需要重写getDelay()和compareTo()方法。
public interface Delayed extends Comparable<Delayed> {long getDelay(TimeUnit unit);}
在DelayQueue中,也是使用了PriorityQueue来保存元素,和堆排序,其中leader属性是保存了当前操作队列的线程,如果这个值不为空,那么则表示当前有线程正在操作,其他线程就阻塞,避免了无效的等待。
public class DelayQueueDemo {public static void main(String[] args) throws InterruptedException {//实例化一个DelayQueueBlockingQueue<DelayObject> blockingQueue = new DelayQueue<>();//向DelayQueue添加四个元素对象,注意延时时间不同blockingQueue.put(new DelayObject("A", 1000 * 10)); //延时10秒blockingQueue.put(new DelayObject("B", 4000 * 10)); //延时40秒blockingQueue.put(new DelayObject("C", 3000 * 10)); //延时30秒blockingQueue.put(new DelayObject("D", 2000 * 10)); //延时20秒//将对象从DelayQueue取出,注意取出的顺序与延时时间有关System.out.println(blockingQueue.take()); //取出ASystem.out.println(blockingQueue.take()); //取出DSystem.out.println(blockingQueue.take()); //取出CSystem.out.println(blockingQueue.take()); //取出B}}class DelayObject implements Delayed {private String name;private long time; //延时时间public DelayObject(String name, long delayTime) {this.name = name;this.time = System.currentTimeMillis() + delayTime;}@Overridepublic long getDelay(TimeUnit unit) {long diff = time - System.currentTimeMillis();return unit.convert(diff, TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed obj) {if (this.time < ((DelayObject) obj).time) {return -1;}if (this.time > ((DelayObject) obj).time) {return 1;}return 0;}@Overridepublic String toString() {Date date = new Date(time);SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");return "\nDelayObject:{"+ "name=" + name+ ", time=" + sd.format(date)+ "}";}}
1、put()方法
2、take()方法
四、如何选择适合的阻塞队列
1、线程池对于阻塞队列的选择
线程池有很多种,不同种类的线程池会根据自己的特点,来选择适合自己的阻塞队列。
FixedThreadPool(SingleThreadExecutor 同理)选取的是 LinkedBlockingQueue。
CachedThreadPool 选取的是 SynchronousQueue。
ScheduledThreadPool(SingleThreadScheduledExecutor同理)选取的是延迟队列。
2、选择策略
1)功能
是否需要阻塞队列帮我们排序,如优先级排序、延迟执行等。如果有这个需要,我们就必须选择类似于 PriorityBlockingQueue 之类的有排序能力的阻塞队列。
2)容量
是否有存储的要求,还是只需要“直接传递”。在考虑这一点的时候,我们知道前面介绍的那几种阻塞队列,有的是容量固定的, 如 ArrayBlockingQueue;有的默认是容量无限的,如 LinkedBlockingQueue;而有的里面没有任何容量,如 SynchronousQueue;而对于 DelayQueue 而言,它的容量固定就是 Integer.MAX_VALUE。所以不同阻塞队列的容量是千差万别的,我们需要根据任务数量来推算出合适的容量,从而去选取合适的 BlockingQueue。
3)能否扩容
有时我们并不能在初始的时候很好的准确估计队列的大小,因为业务可能有高峰期、低谷期。如果一开始就固定一个容量,可能无法应对所有的情况, 也是不合适的,有可能需要动态扩容。如果我们需要动态扩容的话,那么就不能选择 ArrayBlockingQueue ,因为它的容量在创建时就确定了,无法扩容。相反, PriorityBlockingQueue 即使在指定了初始容量之后,后续如果有需要,也可以自动扩容。所以我们可以根据是否需要扩容来选取合适的队列。
4)内存结构
通过 ArrayBlockingQueue 的源码,看到了它的内部结构是“数组”的形式。和它不同的是,LinkedBlockingQueue 的内部是用链表实现的,所以这里就需要我们考虑到,ArrayBlockingQueue 没有链表所需要的“节点”,空间利用率更高。所以如果我们对性能有要求可以从内存的结构角度去考虑这个问题。
5)性能
LinkedBlockingQueue 由于拥有两把锁,它的操作粒度更细,在并发程度高的时候,相对于只有一把锁的 ArrayBlockingQueue 性能会更好。另外,SynchronousQueue 性能往往优于其他实现,因为它只需要“直接传递”,而不需要存储的过程。如果我们的场景需要直接传递的话,可以优先考虑 SynchronousQueue。
