- 这几个特殊的Queue都是BlockingQueue
- 这几种特殊的BlockingQueue都有特殊的用途
- PriorityQueue优先队列
- DelayQueue—->时间概念的,与时间有关,按照在里面等待的时间来进行排序,看什么时候到期
- SynchronousQueue和TransferQueue在线程池中或者自己实现线程池的时候经常运用的
PriorityQueue
- 是从AbstractQueue继承来的
- 内部的特点:往里面装的时候并不是按顺序往里面装的,而是内部进行了一个排序;优先队列内部是一个树的模型—->设计的排好顺序的一个树
- 这个队列取出来的顺序是拍好顺序的,默认是小的先输出来(小顶堆)
- 数据结构是一个二叉树(最小堆、小顶堆、小根堆)
- 方法iterator()中提供的迭代器并不保证以有序的方式遍历优先级队列中的元素。(原因可参考PriorityQueue的内部实现);如果需要按顺序遍历,可用Arrays.sort(pq.toArray())。
- 此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。PriorityQueue对元素采用的是堆排序,头是按指定排序方式的最小元素。堆排序只能保证根是最大(最小),整个堆并不是有序的。方法iterator()中提供的迭代器可能只是对整个数组的依次遍历。也就只能保证数组的第一个元素是最小的。(二叉树是用数组实现的,并且数组第一个位置上放的是最小的元素)参考连接
- 利用 优先队列 求数组中前k个最小的数字,时间复杂度:O(nlogk) ```java package com.mashibing.juc.c_025;
import java.util.PriorityQueue;
public class T07_01_PriorityQueque {
public static void main(String[] args) {
PriorityQueue
q.add("c");
q.add("e");
q.add("a");
q.add("d");
q.add("z");
// 这是一个很容易犯的错误,因为每循环一次size就会改变
// 不能用q.size()做为循环个数的判断标准,可以先求size,再写这个固定值或者用迭代器求
// 用iterator迭代器???
// for (int i = 0; i < q.size(); i++) {
for (int i = 0; i < 5; i++) {
System.out.println(q.poll());
}
}
}
<a name="Dxaac"></a>
## DelayQueue
- 时间概念的,与时间有关,按照在里面等待的时间来进行排序,看什么时候到期
- 用途:时间等待越短的任务优先得到运行
- 放进DelayQueue队列中的任务要实现Delayed接口,并重写其中的compareTo方法和getDelay方法
- 在compareTo里面需要做一个比较,定义好排序时比较的规则,使里面的任务按执行的先后排序
- 按紧迫程度排序
- 一般用来**按时间进行任务调度**
- **本质上用的是一个PriorityQueue来实现的**
- 比较用的是compareTo方法来进行比较的,可以自己定义哪一个是优先级最高的
- DelayQueue实现的是按时间优先级的排列
```java
package com.mashibing.juc.c_025;
import java.util.Calendar;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class T07_DelayQueue {
static BlockingQueue<MyTask> tasks = new DelayQueue<>();
static Random r = new Random();
static class MyTask implements Delayed {
String name;
long runningTime;
MyTask(String name, long rt) {
this.name = name;
this.runningTime = rt;
}
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return name + " " + runningTime;
}
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask("t1", now + 1000);
MyTask t2 = new MyTask("t2", now + 2000);
MyTask t3 = new MyTask("t3", now + 1500);
MyTask t4 = new MyTask("t4", now + 2500);
MyTask t5 = new MyTask("t5", now + 500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for(int i=0; i<5; i++) {
System.out.println(tasks.take());
}
}
}
SynchronousQueue
- SynchronousQueue的容量为0,这东西不是用来装内容的,是用来让一个线程给另外一个线程下达任务的
- 本质上和Exchanger容器的概念是一样的,但是用起来比Exchanger方便
- 这个Queue不能往里面装东西,只能用来进行阻塞式的put调用!!!且要求只有前面有人等着拿这个东西的时候才能往里装
- 容量为0,本质上就是要递到另外一个人的手里才行
- 本质上实现了这样一个场景:一个线程直接到另外一个线程,而不是放在中间的队列中了
- 要接受的线程没有,就拿在手里等着,什么时候来了,就把任务递过去,相当于两个线程直接交换数据(一个线程给另外一个线程传数据的作用)
- Exchanger两边都要进行同步才可以
- SynchronousQueue虽然看似没有用,但是他是在juc包中和线程池中作用最大、用得最多的一个Queue,很多线程之间取任务时,线程之间调度的时候,用的都是他
- 必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行 ```java package com.mashibing.juc.c_025;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue;
public class T08_SynchronusQueue { //容量为0
public static void main(String[] args) throws InterruptedException {
BlockingQueue
// 必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行
new Thread(()->{
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.put("aaa"); //阻塞等待消费者消费
//strs.put("bbb");
//strs.add("aaa");
System.out.println(strs.size());
}
}
<a name="g5iOX"></a>
## TransferQueue
- 比SynchronousQueue更加强大的Queue
- SynchronousQueue:同步Queue,就是Exchanger的概念,是一个数据交换,会保证递到你手里
- TransferQueue:传递内容,这些内容是可以有长度的,是可以向里面装的
- TransferQueue好玩的Queue,用法也特别多
- 唯一的区别就在于:
- 用put和之前的没有什么区别,用put表示一个线程来了,但是往里一装,装完就走了
- **添加的transfer方法:**transfer方法装完,就阻塞在那里,直到有人将其取走,他才会继续干下面的事情(装完取走付钱,必须付钱之后才能取走),transfer不满也会等着,来了就等着
- **必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行**
> 1. 是前面几种Queue的组合,但是又有些区别
> 1. 可以给线程传递任务,但不止能传递一个任务(SynchronousQueue只能传递一个任务)
> 1. TransferQueue中是一个链表,可以传好多个
> 1. 底层有一个比较好玩的机制:消费者没有东西消费的时候放空蓝子???等着生产者往里面装
> 1. SynchronousQueue中没有空篮子,因为他是线程之间手递手传递
> 1. TransferQueue是多人对多人的手递手传递
```java
package com.mashibing.juc.c_025;
import java.util.concurrent.LinkedTransferQueue;
public class T09_TransferQueue {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
// 必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行
new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.transfer("aaa");
//strs.put("aaa");
/*new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();*/
}
}