- 这几个特殊的Queue都是BlockingQueue
- 这几种特殊的BlockingQueue都有特殊的用途
- PriorityQueue优先队列
- DelayQueue—->时间概念的,与时间有关,按照在里面等待的时间来进行排序,看什么时候到期
- SynchronousQueue和TransferQueue在线程池中或者自己实现线程池的时候经常运用的
PriorityQueue
- 是从AbstractQueue继承来的
- 内部的特点:往里面装的时候并不是按顺序往里面装的,而是内部进行了一个排序;优先队列内部是一个树的模型—->设计的排好顺序的一个树
- 这个队列取出来的顺序是拍好顺序的,默认是小的先输出来(小顶堆)
- 数据结构是一个二叉树(最小堆、小顶堆、小根堆)
- 方法iterator()中提供的迭代器并不保证以有序的方式遍历优先级队列中的元素。(原因可参考PriorityQueue的内部实现);如果需要按顺序遍历,可用Arrays.sort(pq.toArray())。
- 此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。PriorityQueue对元素采用的是堆排序,头是按指定排序方式的最小元素。堆排序只能保证根是最大(最小),整个堆并不是有序的。方法iterator()中提供的迭代器可能只是对整个数组的依次遍历。也就只能保证数组的第一个元素是最小的。(二叉树是用数组实现的,并且数组第一个位置上放的是最小的元素)参考连接
- 利用 优先队列 求数组中前k个最小的数字,时间复杂度:O(nlogk) ```java package com.mashibing.juc.c_025;
import java.util.PriorityQueue;
public class T07_01_PriorityQueque {
public static void main(String[] args) {
PriorityQueue
q.add("c");q.add("e");q.add("a");q.add("d");q.add("z");// 这是一个很容易犯的错误,因为每循环一次size就会改变// 不能用q.size()做为循环个数的判断标准,可以先求size,再写这个固定值或者用迭代器求// 用iterator迭代器???// for (int i = 0; i < q.size(); i++) {for (int i = 0; i < 5; i++) {System.out.println(q.poll());}}
}
<a name="Dxaac"></a>## DelayQueue- 时间概念的,与时间有关,按照在里面等待的时间来进行排序,看什么时候到期- 用途:时间等待越短的任务优先得到运行- 放进DelayQueue队列中的任务要实现Delayed接口,并重写其中的compareTo方法和getDelay方法- 在compareTo里面需要做一个比较,定义好排序时比较的规则,使里面的任务按执行的先后排序- 按紧迫程度排序- 一般用来**按时间进行任务调度**- **本质上用的是一个PriorityQueue来实现的**- 比较用的是compareTo方法来进行比较的,可以自己定义哪一个是优先级最高的- DelayQueue实现的是按时间优先级的排列```javapackage com.mashibing.juc.c_025;import java.util.Calendar;import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;public class T07_DelayQueue {static BlockingQueue<MyTask> tasks = new DelayQueue<>();static Random r = new Random();static class MyTask implements Delayed {String name;long runningTime;MyTask(String name, long rt) {this.name = name;this.runningTime = rt;}@Overridepublic int compareTo(Delayed o) {if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))return -1;else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))return 1;elsereturn 0;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return name + " " + runningTime;}}public static void main(String[] args) throws InterruptedException {long now = System.currentTimeMillis();MyTask t1 = new MyTask("t1", now + 1000);MyTask t2 = new MyTask("t2", now + 2000);MyTask t3 = new MyTask("t3", now + 1500);MyTask t4 = new MyTask("t4", now + 2500);MyTask t5 = new MyTask("t5", now + 500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for(int i=0; i<5; i++) {System.out.println(tasks.take());}}}
SynchronousQueue
- SynchronousQueue的容量为0,这东西不是用来装内容的,是用来让一个线程给另外一个线程下达任务的
- 本质上和Exchanger容器的概念是一样的,但是用起来比Exchanger方便
- 这个Queue不能往里面装东西,只能用来进行阻塞式的put调用!!!且要求只有前面有人等着拿这个东西的时候才能往里装
- 容量为0,本质上就是要递到另外一个人的手里才行
- 本质上实现了这样一个场景:一个线程直接到另外一个线程,而不是放在中间的队列中了
- 要接受的线程没有,就拿在手里等着,什么时候来了,就把任务递过去,相当于两个线程直接交换数据(一个线程给另外一个线程传数据的作用)
- Exchanger两边都要进行同步才可以
- SynchronousQueue虽然看似没有用,但是他是在juc包中和线程池中作用最大、用得最多的一个Queue,很多线程之间取任务时,线程之间调度的时候,用的都是他
- 必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行 ```java package com.mashibing.juc.c_025;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue;
public class T08_SynchronusQueue { //容量为0
public static void main(String[] args) throws InterruptedException {
BlockingQueue
// 必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行new Thread(()->{try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.put("aaa"); //阻塞等待消费者消费//strs.put("bbb");//strs.add("aaa");System.out.println(strs.size());}
}
<a name="g5iOX"></a>## TransferQueue- 比SynchronousQueue更加强大的Queue- SynchronousQueue:同步Queue,就是Exchanger的概念,是一个数据交换,会保证递到你手里- TransferQueue:传递内容,这些内容是可以有长度的,是可以向里面装的- TransferQueue好玩的Queue,用法也特别多- 唯一的区别就在于:- 用put和之前的没有什么区别,用put表示一个线程来了,但是往里一装,装完就走了- **添加的transfer方法:**transfer方法装完,就阻塞在那里,直到有人将其取走,他才会继续干下面的事情(装完取走付钱,必须付钱之后才能取走),transfer不满也会等着,来了就等着- **必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行**> 1. 是前面几种Queue的组合,但是又有些区别> 1. 可以给线程传递任务,但不止能传递一个任务(SynchronousQueue只能传递一个任务)> 1. TransferQueue中是一个链表,可以传好多个> 1. 底层有一个比较好玩的机制:消费者没有东西消费的时候放空蓝子???等着生产者往里面装> 1. SynchronousQueue中没有空篮子,因为他是线程之间手递手传递> 1. TransferQueue是多人对多人的手递手传递```javapackage com.mashibing.juc.c_025;import java.util.concurrent.LinkedTransferQueue;public class T09_TransferQueue {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();// 必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.transfer("aaa");//strs.put("aaa");/*new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();*/}}
