1. 这几个特殊的Queue都是BlockingQueue
  2. 这几种特殊的BlockingQueue都有特殊的用途
  3. PriorityQueue优先队列
  4. DelayQueue—->时间概念的,与时间有关,按照在里面等待的时间来进行排序,看什么时候到期
  5. SynchronousQueue和TransferQueue在线程池中或者自己实现线程池的时候经常运用的

PriorityQueue

  • 是从AbstractQueue继承来的
  • 内部的特点:往里面装的时候并不是按顺序往里面装的,而是内部进行了一个排序;优先队列内部是一个树的模型—->设计的排好顺序的一个树
  • 这个队列取出来的顺序是拍好顺序的,默认是小的先输出来(小顶堆)
  • 数据结构是一个二叉树(最小堆、小顶堆、小根堆)
  • 方法iterator()中提供的迭代器并不保证以有序的方式遍历优先级队列中的元素。(原因可参考PriorityQueue的内部实现);如果需要按顺序遍历,可用Arrays.sort(pq.toArray())。
  • 此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选 方法。PriorityQueue对元素采用的是堆排序,头是按指定排序方式的最小元素。堆排序只能保证根是最大(最小),整个堆并不是有序的。方法iterator()中提供的迭代器可能只是对整个数组的依次遍历。也就只能保证数组的第一个元素是最小的。(二叉树是用数组实现的,并且数组第一个位置上放的是最小的元素)参考连接
  • 利用 优先队列 求数组中前k个最小的数字,时间复杂度:O(nlogk) ```java package com.mashibing.juc.c_025;

import java.util.PriorityQueue;

public class T07_01_PriorityQueque { public static void main(String[] args) { PriorityQueue q = new PriorityQueue<>();

  1. q.add("c");
  2. q.add("e");
  3. q.add("a");
  4. q.add("d");
  5. q.add("z");
  6. // 这是一个很容易犯的错误,因为每循环一次size就会改变
  7. // 不能用q.size()做为循环个数的判断标准,可以先求size,再写这个固定值或者用迭代器求
  8. // 用iterator迭代器???
  9. // for (int i = 0; i < q.size(); i++) {
  10. for (int i = 0; i < 5; i++) {
  11. System.out.println(q.poll());
  12. }
  13. }

}

  1. <a name="Dxaac"></a>
  2. ## DelayQueue
  3. - 时间概念的,与时间有关,按照在里面等待的时间来进行排序,看什么时候到期
  4. - 用途:时间等待越短的任务优先得到运行
  5. - 放进DelayQueue队列中的任务要实现Delayed接口,并重写其中的compareTo方法和getDelay方法
  6. - 在compareTo里面需要做一个比较,定义好排序时比较的规则,使里面的任务按执行的先后排序
  7. - 按紧迫程度排序
  8. - 一般用来**按时间进行任务调度**
  9. - **本质上用的是一个PriorityQueue来实现的**
  10. - 比较用的是compareTo方法来进行比较的,可以自己定义哪一个是优先级最高的
  11. - DelayQueue实现的是按时间优先级的排列
  12. ```java
  13. package com.mashibing.juc.c_025;
  14. import java.util.Calendar;
  15. import java.util.Random;
  16. import java.util.concurrent.BlockingQueue;
  17. import java.util.concurrent.DelayQueue;
  18. import java.util.concurrent.Delayed;
  19. import java.util.concurrent.TimeUnit;
  20. public class T07_DelayQueue {
  21. static BlockingQueue<MyTask> tasks = new DelayQueue<>();
  22. static Random r = new Random();
  23. static class MyTask implements Delayed {
  24. String name;
  25. long runningTime;
  26. MyTask(String name, long rt) {
  27. this.name = name;
  28. this.runningTime = rt;
  29. }
  30. @Override
  31. public int compareTo(Delayed o) {
  32. if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
  33. return -1;
  34. else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
  35. return 1;
  36. else
  37. return 0;
  38. }
  39. @Override
  40. public long getDelay(TimeUnit unit) {
  41. return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  42. }
  43. @Override
  44. public String toString() {
  45. return name + " " + runningTime;
  46. }
  47. }
  48. public static void main(String[] args) throws InterruptedException {
  49. long now = System.currentTimeMillis();
  50. MyTask t1 = new MyTask("t1", now + 1000);
  51. MyTask t2 = new MyTask("t2", now + 2000);
  52. MyTask t3 = new MyTask("t3", now + 1500);
  53. MyTask t4 = new MyTask("t4", now + 2500);
  54. MyTask t5 = new MyTask("t5", now + 500);
  55. tasks.put(t1);
  56. tasks.put(t2);
  57. tasks.put(t3);
  58. tasks.put(t4);
  59. tasks.put(t5);
  60. System.out.println(tasks);
  61. for(int i=0; i<5; i++) {
  62. System.out.println(tasks.take());
  63. }
  64. }
  65. }

SynchronousQueue

  • SynchronousQueue的容量为0,这东西不是用来装内容的,是用来让一个线程给另外一个线程下达任务的
  • 本质上和Exchanger容器的概念是一样的,但是用起来比Exchanger方便
  • 这个Queue不能往里面装东西,只能用来进行阻塞式的put调用!!!且要求只有前面有人等着拿这个东西的时候才能往里装
  • 容量为0,本质上就是要递到另外一个人的手里才行
  • 本质上实现了这样一个场景:一个线程直接到另外一个线程,而不是放在中间的队列中了
  • 要接受的线程没有,就拿在手里等着,什么时候来了,就把任务递过去,相当于两个线程直接交换数据(一个线程给另外一个线程传数据的作用)
  • Exchanger两边都要进行同步才可以
  • SynchronousQueue虽然看似没有用,但是他是在juc包中和线程池中作用最大、用得最多的一个Queue,很多线程之间取任务时,线程之间调度的时候,用的都是他
  • 必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行 ```java package com.mashibing.juc.c_025;

import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue;

public class T08_SynchronusQueue { //容量为0 public static void main(String[] args) throws InterruptedException { BlockingQueue strs = new SynchronousQueue<>();

  1. // 必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行
  2. new Thread(()->{
  3. try {
  4. System.out.println(strs.take());
  5. } catch (InterruptedException e) {
  6. e.printStackTrace();
  7. }
  8. }).start();
  9. strs.put("aaa"); //阻塞等待消费者消费
  10. //strs.put("bbb");
  11. //strs.add("aaa");
  12. System.out.println(strs.size());
  13. }

}

  1. <a name="g5iOX"></a>
  2. ## TransferQueue
  3. - 比SynchronousQueue更加强大的Queue
  4. - SynchronousQueue:同步Queue,就是Exchanger的概念,是一个数据交换,会保证递到你手里
  5. - TransferQueue:传递内容,这些内容是可以有长度的,是可以向里面装的
  6. - TransferQueue好玩的Queue,用法也特别多
  7. - 唯一的区别就在于:
  8. - 用put和之前的没有什么区别,用put表示一个线程来了,但是往里一装,装完就走了
  9. - **添加的transfer方法:**transfer方法装完,就阻塞在那里,直到有人将其取走,他才会继续干下面的事情(装完取走付钱,必须付钱之后才能取走),transfer不满也会等着,来了就等着
  10. - **必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行**
  11. > 1. 是前面几种Queue的组合,但是又有些区别
  12. > 1. 可以给线程传递任务,但不止能传递一个任务(SynchronousQueue只能传递一个任务)
  13. > 1. TransferQueue中是一个链表,可以传好多个
  14. > 1. 底层有一个比较好玩的机制:消费者没有东西消费的时候放空蓝子???等着生产者往里面装
  15. > 1. SynchronousQueue中没有空篮子,因为他是线程之间手递手传递
  16. > 1. TransferQueue是多人对多人的手递手传递
  17. ```java
  18. package com.mashibing.juc.c_025;
  19. import java.util.concurrent.LinkedTransferQueue;
  20. public class T09_TransferQueue {
  21. public static void main(String[] args) throws InterruptedException {
  22. LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
  23. // 必须先有消费者去获取,否则直接上生产者的话会造成阻塞在那里,不能往下执行
  24. new Thread(() -> {
  25. try {
  26. System.out.println(strs.take());
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }).start();
  31. strs.transfer("aaa");
  32. //strs.put("aaa");
  33. /*new Thread(() -> {
  34. try {
  35. System.out.println(strs.take());
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. }
  39. }).start();*/
  40. }
  41. }