图片.png

1. Map

1.1 Map集合性能测试

  1. import java.math.BigDecimal;
  2. import java.util.*;
  3. import java.util.concurrent.*;
  4. /**
  5. * 测试写入性能: 启动500个线程,每个线程装载20000对数据,总共装载10000000对数据。 观察耗时
  6. * 测试读取性能: 启动500个线程, 每个线程循环10W次读取集合内的数据。观察耗时
  7. */
  8. public class TestMapsWriteAndReadPerformance {
  9. private static int COUNT = 10000000; //集合内键值对总数
  10. private static int THREAD_COUNT = 500; //总线程数
  11. private static UUID[][] KEYS_ARRAY = new UUID[THREAD_COUNT][COUNT/THREAD_COUNT]; //key数组
  12. private static UUID[][] VALUES_ARRAY = new UUID[THREAD_COUNT][COUNT/THREAD_COUNT]; //value数组
  13. //待测试的集合
  14. private static List<Map<UUID, UUID>> mapList = new ArrayList<>();
  15. //初始化key和value的二维数组
  16. static {
  17. for (int i = 0; i < THREAD_COUNT; i++){
  18. for (int j = 0; j < COUNT/THREAD_COUNT; j++) {
  19. KEYS_ARRAY[i][j] = UUID.randomUUID();
  20. VALUES_ARRAY[i][j] = UUID.randomUUID();
  21. }
  22. }
  23. }
  24. /**
  25. * 测试各集合写入性能方法, 把10000000个键值对分成500页,每页启动一个线程分别装载.
  26. * @param map 具体的集合类,HashTable、HashMap、SynchronizedHashMap、ConcurrentHashMap
  27. * @return 耗时时长
  28. * @throws InterruptedException
  29. */
  30. private static BigDecimal testWritePerformance(Map map) throws InterruptedException {
  31. //设置门闩等到线程都执行完然后打印执行时间
  32. CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
  33. //启动500个线程,每个线程往m里面装一页数据。
  34. Thread[] threads = new Thread[THREAD_COUNT];
  35. for (int i = 0; i < THREAD_COUNT; i++) {
  36. UUID[] keys = KEYS_ARRAY[i];
  37. UUID[] values = VALUES_ARRAY[i];
  38. threads[i] = new Thread(()->{
  39. for (int j = 0; j < COUNT/THREAD_COUNT; j++){
  40. map.put(keys[j],values[j]);
  41. }
  42. countDownLatch.countDown();//门栓-1
  43. });
  44. }
  45. //计时开始
  46. BigDecimal timeStarted = new BigDecimal(System.currentTimeMillis());
  47. //启动所有线程
  48. Arrays.asList(threads).forEach(Thread::start);
  49. //门栓等待所有线程执行完
  50. countDownLatch.await();
  51. //计时结束
  52. BigDecimal timeEnd = new BigDecimal(System.currentTimeMillis());
  53. return timeEnd.subtract(timeStarted).divide(new BigDecimal(1000));
  54. }
  55. /**
  56. * 测试各集合读取性能方法, 启动500个线程, 每个线程循环10W次读取集合内的数据。
  57. * @param map 具体的集合类,HashTable、HashMap、SynchronizedHashMap、ConcurrentHashMap
  58. * @return 耗时时长
  59. * @throws InterruptedException
  60. */
  61. private static BigDecimal testReadPerformance(Map map) throws InterruptedException {
  62. //设置门闩等到线程都执行完然后打印执行时间
  63. CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
  64. //开始读取
  65. Thread[] threads = new Thread[THREAD_COUNT];
  66. for (int i = 0; i < THREAD_COUNT; i++) {
  67. UUID[] keys = KEYS_ARRAY[i];
  68. threads[i] = new Thread(()->{
  69. for (int j = 0; j < COUNT/THREAD_COUNT; j++) {
  70. map.get(keys[j]);
  71. }
  72. countDownLatch.countDown();//门栓-1
  73. });
  74. }
  75. //计时开始
  76. BigDecimal timeStarted = new BigDecimal(System.currentTimeMillis());
  77. //启动所有线程
  78. Arrays.asList(threads).forEach(Thread::start);
  79. //门栓等待所有线程执行完
  80. countDownLatch.await();
  81. //计时结束
  82. BigDecimal timeEnd = new BigDecimal(System.currentTimeMillis());
  83. return timeEnd.subtract(timeStarted).divide(new BigDecimal(1000));
  84. }
  85. public static void main(String[] args) throws InterruptedException {
  86. mapList.add(new Hashtable<>());
  87. mapList.add(Collections.synchronizedMap(new HashMap()));
  88. mapList.add(new ConcurrentHashMap<>());
  89. mapList.add(new ConcurrentSkipListMap<>());
  90. for (Map<UUID, UUID> map : mapList) {
  91. BigDecimal writeTime = testWritePerformance(map); //写入性能
  92. BigDecimal readTime = testReadPerformance(map); //读取性能
  93. System.out.println(map.getClass().getSimpleName()+"写入了"+map.size()+"对数据, 耗时"+writeTime+"ms, 读取"+COUNT/THREAD_COUNT+"次, 耗时"+readTime+"ms");
  94. }
  95. }
  96. }

运行结果:

Hashtable写入了10000000对数据, 耗时7.902ms, 读取20000次, 耗时3.688ms SynchronizedMap写入了10000000对数据, 耗时5.395ms, 读取20000次, 耗时1.952ms ConcurrentHashMap写入了10000000对数据, 耗时12.405ms, 读取20000次, 耗时0.31ms ConcurrentSkipListMap写入了10000000对数据, 耗时9.181ms, 读取20000次, 耗时6.06ms

1.2 为什么HashMap线程不安全?

因为JDK1.8时 HashMap在发生key冲突时超过8个,自动转化为红黑树, 假如多个线程操put同一个key,有的线程还在操作,其他线程已经在尝试转红黑树了。 这样会发生冲突。

1.3 跳表(SkipList)和 ConcurrentSkipListMap

1.3.1 什么是跳表

传统意义的单链表是一个线性结构,向有序的链表中插入一个节点需要O(n)的时间,查找操作需要O(n)的时间。

跳跃表的简单示例:
图片.png
如果我们使用上图所示的跳跃表,就可以减少查找所需时间为O(n/2),因为我们可以先通过每个节点的最上面的指针先进行查找,这样子就能跳过一半的节点。

比如我们想查找19, 首先和6比较,大于6之后,在和9进行比较,然后在和17进行比较, 比较到21的时候,发现21大于19,说明查找的点在17和21之间,从这个过程中,我们可以看出,查找的时候跳过了3、7、12等点,因此查找的复杂度为O(n/2)。

查找的过程如下图:
8. 容器 - 图3

其实,上面基本上就是跳跃表的思想,每一个结点不单单只包含指向下一个结点的指针,可能包含很多个指向后续结点的指针,这样就可以跳过一些不必要的结点,从而加快查找、删除等操作。对于一个链表内每一个结点包含多少个指向后续元素的指针,后续节点个数是通过一个随机函数生成器得到,这样子就构成了一个跳跃表。

随机生成的跳跃表可能如下图所示:

8. 容器 - 图4

跳跃表其实也是一种通过“空间来换取时间”的一个算法,通过在每个节点中增加了向前的指针,从而提升查找的效率。

“Skip lists are data structures that use probabilistic balancing rather than strictly enforced balancing. As a result, the algorithms for insertion and deletion in skip lists are much simpler and significantly faster than equivalent algorithms for balanced trees. ”
译文:跳跃表使用概率均衡技术而不是使用强制性均衡技术,因此,对于插入和删除结点比传统上的平衡树算法更为简洁高效

跳表是一种随机化的数据结构,目前开源软件 Redis 和 LevelDB 都有用到它。

跳跃的思想在其他地方也有应用,比如:内存对齐

1.3.2 SkipList的操作

1.3.2.1 查找

查找就是给定一个key,查找这个key是否出现在跳跃表中,如果出现,则返回其值,如果不存在,则返回不存在。我们结合一个图就是讲解查找操作,如下图所示:
8. 容器 - 图5

如果我们想查找19是否存在?如何查找呢?我们从头结点开始,首先和9进行判断,此时大于9,然后和21进行判断,小于21,此时这个值肯定在9结点和21结点之间,此时,我们和17进行判断,大于17,然后和21进行判断,小于21,此时肯定在17结点和21结点之间,此时和19进行判断,找到了。具体的示意图如图所示:
图片.png

1.3.2.2 插入

插入包含如下几个操作:1、查找到需要插入的位置 2、申请新的结点 3、调整指针。

我们结合下图进行讲解,查找路径如下图的灰色的线所示 申请新的结点如17结点所示, 调整指向新结点17的指针以及17结点指向后续结点的指针。这里有一个小技巧,就是使用update数组保存大于17结点的位置,update数组的内容如红线所示,这些位置才是有可能更新指针的位置。
8. 容器 - 图7

1.3.2.3 删除

删除操作类似于插入操作,包含如下3步:1、查找到需要删除的结点 2、删除结点 3、调整指针

8. 容器 - 图8

1.3.3 Key-Value数据结构

目前常用的key-value数据结构有三种:Hash表、红黑树、SkipList,它们各自有着不同的优缺点(不考虑删除操作):
Hash表:插入、查找最快,为O(1);如使用链表实现则可实现无锁;数据有序化需要显式的排序操作。
红黑树:插入、查找为O(logn),但常数项较小;无锁实现的复杂性很高,一般需要加锁;数据天然有序。
SkipList:插入、查找为O(logn),但常数项比红黑树要大;底层结构为链表,可无锁实现;数据天然有序。

如果要实现一个key-value结构,需求的功能有插入、查找、迭代、修改,那么首先Hash表就不是很适合了,因为迭代的时间复杂度比较高;而红黑树的插入很可能会涉及多个结点的旋转、变色操作,因此需要在外层加锁,这无形中降低了它可能的并发度。而SkipList底层是用链表实现的,可以实现为lock free,同时它还有着不错的性能(单线程下只比红黑树略慢),非常适合用来实现我们需求的那种key-value结构。
LevelDB、Reddis的底层存储结构就是用的SkipList。

1.3.4 ConcurrentHashMap 类

JDK为我们提供了很多Map接口的实现,使得我们可以方便地处理Key-Value的数据结构。

8. 容器 - 图9
当我们希望快速存取键值对时我们可以使用HashMap
当我们希望在多线程并发存取键值对时,我们会选择ConcurrentHashMap
TreeMap则会帮助我们保证数据是按照Key的自然顺序或者compareTo方法指定的排序规则进行排序。

OK,那么当我们需要多线程并发存取数据并且希望保证数据有序时,我们需要怎么做呢?
也许,我们可以选择ConcurrentTreeMap。不好意思,JDK没有提供这么好的数据结构给我们。

当然,我们可以自己添加lock来实现ConcurrentTreeMap,但是随着并发量的提升,lock带来的性能开销也随之增大。
Don’t cry……,JDK6里面引入的ConcurrentSkipListMap也许可以满足我们的需求

1.3.4.1 什么是ConcurrentSkipListMap

ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够O(log(n))时间内完成查找、插入、删除操作。

1.3.4.2 ConcurrentSkipListMap的存储结构

ConcurrentSkipListMap存储结构跳跃表(SkipList):
1、最底层的数据节点按照关键字升序排列。
2、包含多级索引,每个级别的索引节点按照其关联数据节点的关键字升序排列。
3、高级别索引是其低级别索引的子集。
4、如果关键字key在级别level=i的索引中出现,则级别level<=i的所有索引中都包含key。

2. Collection

2.1 List

2.1.1 List 集合性能测试

  1. import java.math.BigDecimal;
  2. import java.util.*;
  3. import java.util.concurrent.CopyOnWriteArrayList;
  4. import java.util.concurrent.CountDownLatch;
  5. import java.util.stream.IntStream;
  6. /**
  7. * 启动500个线程, 每个线程向集合内写入/读取1000个随机double, 观察其写入和读取性能.
  8. */
  9. public class TestListWriteAndReadPerformance {
  10. private static int COUNT = 1000; //集合内键值对总数
  11. private static int THREAD_COUNT = 500; //总线程数
  12. //待测试的集合
  13. private static List<List<Double>> testLists = new ArrayList<>();
  14. //填充
  15. static {
  16. testLists.add(new CopyOnWriteArrayList<>());
  17. // testLists.add(new ArrayList<>());
  18. testLists.add(Collections.synchronizedList(new ArrayList<Double>()));
  19. }
  20. /**
  21. * 测试写入性能
  22. * @param list
  23. * @return 方法执行耗时
  24. * @throws InterruptedException
  25. */
  26. private static BigDecimal testWritePerformance(List list) throws InterruptedException {
  27. //设置门闩等到线程都执行完然后打印执行时间
  28. CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
  29. Thread[] threads = new Thread[THREAD_COUNT];
  30. Random random = new Random();
  31. IntStream.range(0,THREAD_COUNT).forEach(e->{
  32. threads[e] = new Thread(()->{
  33. IntStream.range(0,COUNT).forEach(i->{
  34. list.add(random.nextDouble());
  35. });
  36. countDownLatch.countDown();
  37. });
  38. });
  39. //计时开始
  40. BigDecimal timeStarted = new BigDecimal(System.currentTimeMillis());
  41. //启动所有线程
  42. Arrays.asList(threads).forEach(Thread::start);
  43. //门栓等待所有线程执行完
  44. countDownLatch.await();
  45. //计时结束
  46. BigDecimal timeEnd = new BigDecimal(System.currentTimeMillis());
  47. return timeEnd.subtract(timeStarted).divide(new BigDecimal(1000));
  48. }
  49. /**
  50. * 测试读取性能
  51. * @param list
  52. * @return 方法执行耗时
  53. * @throws InterruptedException
  54. */
  55. private static BigDecimal testReadPerformance(List list) throws InterruptedException {
  56. //设置门闩等到线程都执行完然后打印执行时间
  57. CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
  58. Thread[] threads = new Thread[THREAD_COUNT];
  59. Random random = new Random();
  60. IntStream.range(0,THREAD_COUNT).forEach(e->{
  61. threads[e] = new Thread(()->{
  62. IntStream.range(0,COUNT).forEach(i->{
  63. list.get(i);
  64. });
  65. countDownLatch.countDown();
  66. });
  67. });
  68. //计时开始
  69. BigDecimal timeStarted = new BigDecimal(System.currentTimeMillis());
  70. //启动所有线程
  71. Arrays.asList(threads).forEach(Thread::start);
  72. //门栓等待所有线程执行完
  73. countDownLatch.await();
  74. //计时结束
  75. BigDecimal timeEnd = new BigDecimal(System.currentTimeMillis());
  76. return timeEnd.subtract(timeStarted).divide(new BigDecimal(1000));
  77. }
  78. public static void main(String[] args) throws InterruptedException {
  79. for (List<Double> list : testLists) {
  80. BigDecimal writeTime = testWritePerformance(list); //写入性能
  81. BigDecimal readTime = testReadPerformance(list); //读取性能
  82. System.out.println(list.getClass().getName()+"写入"+list.size()+"条数据, 耗时"+writeTime+"s, 读取"+list.size()+"条数据, 耗时"+readTime+"s");
  83. }
  84. }
  85. }

java.util.concurrent.CopyOnWriteArrayList写入500000条数据, 耗时81.575s, 读取500000条数据, 耗时0.039s java.util.Collections$SynchronizedRandomAccessList写入500000条数据, 耗时0.102s, 读取500000条数据, 耗时0.043s

2.1.2 从一个售票业务说起

有N张火车票,每张票都有一个编号。 同时有10个窗口对外售票。 请写一个模拟程序。 分析下面的程序可能会产生哪些问题?重复销售?超量销售?

  1. import java.util.*;
  2. import java.util.concurrent.ConcurrentLinkedQueue;
  3. /**
  4. * 有100000张火车票,每张票都有一个编号
  5. * 同时有10个窗口对外售票
  6. * 请写一个模拟程序
  7. *
  8. * 分析下面的程序可能会产生哪些问题?
  9. * 重复销售?超量销售?
  10. */
  11. public class TicketSeller {
  12. private static int COUNT = 100000; //总票数
  13. private static int THREAD_COUNT = 10; //总线程数
  14. /**
  15. * 售票
  16. * @param list
  17. */
  18. static void sale(List list){
  19. for (int i = 0; i < THREAD_COUNT; i++) {
  20. new Thread(()->{
  21. while(list.size() > 0) {
  22. System.out.println("销售了--" + list.remove(0));
  23. }
  24. }).start();
  25. }
  26. }
  27. /**
  28. * 售票
  29. * @param queue
  30. */
  31. static void sale(Queue queue){
  32. for (int i = 0; i < THREAD_COUNT; i++) {
  33. new Thread(()->{
  34. while(true) {
  35. String s = (String) queue.poll();
  36. if(s == null) break;
  37. else System.out.println("销售了--" + s);
  38. }
  39. }).start();
  40. }
  41. }
  42. public static void main(String[] args) {
  43. //ArrayList
  44. List<String> arrayListtickets = new ArrayList<>();
  45. //填满1W张票
  46. for(int i = 0; i< COUNT; i++) arrayListtickets.add("票编号:" + i);
  47. //售票
  48. //sale(arrayListtickets);
  49. //Vector
  50. Vector<String> vertorTickets = new Vector<>();
  51. //填满1W张票
  52. for(int i = 0; i< COUNT; i++) vertorTickets.add("票编号:" + i);
  53. //售票
  54. // sale(vertorTickets);
  55. //LinkedList
  56. List<String> linkedListtickets = new LinkedList<>();
  57. //填满1W张票
  58. for(int i = 0; i< COUNT; i++) linkedListtickets.add("票编号:" + i);
  59. //售票
  60. // sale(linkedListtickets);
  61. //Queue
  62. Queue<String> queueTickets = new ConcurrentLinkedQueue<>();
  63. //填满1W张票
  64. for(int i = 0; i< COUNT; i++) queueTickets.add("票编号:" + i);
  65. sale(queueTickets);
  66. }
  67. }

运行结果:

image.png

很简单的线程不安全问题:

2.1.3 CopyOnWriteArrayList

写入时复制(CopyOnWrite,简称COW)思想是计算机程序设计领域中的一种优化策略。其核心思想是,如果有多个调用者(Callers)同时要求相同的资源(如内存或者是磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者视图修改资源内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的(transparently)。此做法主要的优点是如果调用者没有修改资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。

CopyOnWrite的缺点 

  • 内存占用问题
  • 数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的数据能马上能读到,请不要使用CopyOnWrite容器。
  • 写入时性能极低,极低,极低!!!


CopyOnWrite的应用场景**
CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新的场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。 

2.1.4 synchronizedList


2.2 Queue

2.2.1. ConcurrentLinkedQueue

ConcurrentLinkedQueue 从名字上就可以看出它是一个线程安全的 链表结构 队列, 而且它是一个无界队列
ConcurrentLinkedQueue 的非阻塞算法实现可概括为下面 5 点:

  • 使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础。
  • head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 / 出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 / 出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键。
  • 由于队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式来维护非阻塞算法的正确性。
  • 以批处理方式来更新 head/tail,从整体上减少入队 / 出队操作的开销。
  • 为了有利于垃圾收集,队列使用特有的 head 更新机制;为了确保从已删除节点向后遍历,可到达所有的非删除节点,队列使用了特有的向后推进策略。

2.2.2. BlockingQueue

BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:
图片.png

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueueDelayQueueLinkedBlockingDequeLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

image.png
BlockingQueue**入队操作**

  1. //入队不阻塞
  2. boolean offer(E e);
  3. //入队阻塞-如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞
  4. void put(E e) throws InterruptedException;
  5. /**
  6. * 在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况
  7. * 1. 阻塞被唤醒
  8. * 2. 等待时间超时
  9. * 3. 当前线程被中断
  10. */
  11. boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;

BlockingQueue**出队操作**

  1. //不阻塞出队:如果没有元素,直接返回null;如果有元素,出队
  2. E poll();
  3. //阻塞出队:如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞
  4. E take() throws InterruptedException;
  5. /**
  6. * 如果队列不空,出队;如果队列已空且已经超时,返回null;
  7. * 如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:
  8. * 1. 被唤醒
  9. * 2. 等待时间超时
  10. * 3. 当前线程被中断
  11. */
  12. E poll(long timeout, TimeUnit unit)throws InterruptedException;

2.2.2.1 LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。
LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。
LinkedBlockingQueue内部是使用链表实现一个队列的,但是却有别于一般的队列,在于该队列至少有一个节点,头节点不含有元素。
LinkedBlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。

2.2.2.2 ArrayBlockingQueue

ArrayBlockingQueue底层是使用一个数组实现队列的,并且在构造ArrayBlockingQueue时需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列。因此,在队列全满时执行入队将会阻塞,在队列为空时出队同样将会阻塞。
ArrayBlockingQueue的并发阻塞是通过ReentrantLock和Condition来实现的,ArrayBlockingQueue内部只有一把锁,意味着同一时刻只有一个线程能进行入队或者出队的操作。

2.2.2.3 DelayQueue

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
image.png
DelayQueue只能添加(offer/put/add)实现了Delayed接口的对象,意思是说我们不能想往DelayQueue里添加什么就添加什么,不能添加int、也不能添加String进去,必须添加我们自己的实现了Delayed接口的类的对象:

  1. import lombok.Data;
  2. import lombok.ToString;
  3. import lombok.experimental.Accessors;
  4. import java.util.concurrent.DelayQueue;
  5. import java.util.concurrent.Delayed;
  6. import java.util.concurrent.TimeUnit;
  7. public class DelayQueueExample {
  8. public static void main(String[] args) throws InterruptedException {
  9. //创建延迟阻塞队列
  10. DelayQueue<MyDelayedTask> delayQueue = new DelayQueue<>();
  11. //新线程往队列里面插入不同执行时间的任务
  12. new Thread(()->{
  13. delayQueue.offer(new MyDelayedTask().setName("task1").setTime(10000));
  14. delayQueue.offer(new MyDelayedTask().setName("task2").setTime(3900));
  15. delayQueue.offer(new MyDelayedTask().setName("task3").setTime(1900));
  16. delayQueue.offer(new MyDelayedTask().setName("task4").setTime(5900));
  17. delayQueue.offer(new MyDelayedTask().setName("task5").setTime(6900));
  18. delayQueue.offer(new MyDelayedTask().setName("task6").setTime(7900));
  19. delayQueue.offer(new MyDelayedTask().setName("task7").setTime(4900));
  20. }).start();
  21. //不断从队列里面取出任务
  22. while (true){
  23. System.out.println(delayQueue.take());
  24. }
  25. }
  26. //实现Delayed接口。
  27. @Data
  28. @Accessors(chain = true)
  29. @ToString
  30. private static class MyDelayedTask implements Delayed{
  31. private String name;
  32. private long start = System.currentTimeMillis();
  33. private long time;
  34. /**
  35. * 需要实现的接口,获得延迟时间 用过期时间-当前时间
  36. * @param timeUnit
  37. * @return
  38. */
  39. @Override
  40. public long getDelay(TimeUnit timeUnit) {
  41. return timeUnit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
  42. }
  43. /**
  44. * 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
  45. * @param delayed
  46. * @return
  47. */
  48. @Override
  49. public int compareTo(Delayed delayed) {
  50. MyDelayedTask myDelayedTask = (MyDelayedTask) delayed;
  51. return (int) (this.getDelay(TimeUnit.MILLISECONDS) - delayed.getDelay(TimeUnit.MILLISECONDS));
  52. }
  53. }
  54. }

执行结果:

DelayQueueExample.MyDelayedTask(name=task3, start=1609600175055, time=1900) DelayQueueExample.MyDelayedTask(name=task2, start=1609600175055, time=3900) DelayQueueExample.MyDelayedTask(name=task7, start=1609600175055, time=4900) DelayQueueExample.MyDelayedTask(name=task4, start=1609600175055, time=5900) DelayQueueExample.MyDelayedTask(name=task5, start=1609600175055, time=6900) DelayQueueExample.MyDelayedTask(name=task6, start=1609600175055, time=7900) DelayQueueExample.MyDelayedTask(name=task1, start=1609600175055, time=10000)


DelayQueue应用场景:**

  • 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。
  • 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
  • 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
  • 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
  • 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等。


2.2.2.4 SynchronousQueue

SynchronousQueue是一个比较特别的队列,它的目的在于两个线程之间的通讯。而不是业务上的通讯。SynchronousQueue是一个经典的生产者消费者的实现,SynchronousQueue里面没有缓冲区, 或者可以理解为只能存储一个元素的位置,所以, 一个线程向SynchronousQueue放入元素,另一个SynchronousQueue线程必须取出这个元素,其他线程才能继续往里面放。反之亦然。由于SynchronousQueue实现了BlockingQueue,所以它的takeput都是阻塞操作
image.png

  1. import java.util.Scanner;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.SynchronousQueue;
  4. public class SynchronusQueue {
  5. public static void main(String[] args){
  6. BlockingQueue<String> strs = new SynchronousQueue<>();
  7. Thread t1 = new Thread(()->{
  8. try {
  9. String currentName = Thread.currentThread().getName();
  10. while (true){
  11. System.out.println("线程"+currentName+"取出数据: "+strs.take());
  12. }
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. },"t1");
  17. Thread t2 = new Thread(()->{
  18. String currentName = Thread.currentThread().getName();
  19. String str = null;;
  20. Scanner scanner = new Scanner(System.in);
  21. while (!"end".equals(str = scanner.nextLine())){
  22. try {
  23. System.out.println("线程"+currentName+"放入数据: "+str);
  24. strs.put(str);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. },"t2");
  30. t1.start();
  31. t2.start();
  32. }
  33. }

运行结果:

111 线程t2放入数据: 111 线程t1取出数据: 111 222 线程t2放入数据: 222 线程t1取出数据: 222 333 线程t2放入数据: 333 线程t1取出数据: 333 444 线程t2放入数据: 444 线程t1取出数据: 444 555 线程t2放入数据: 555

线程t1取出数据: 555


2.2.2.5 TransferQueue

我们知道 SynchronousQueue 内部无法存储元素,当要添加元素的时候,需要阻塞,不够完美,LinkedBolckingQueue 则内部使用了大量的锁,性能不高。
两两结合,岂不完美?性能又高,又不阻塞。有! TransferQueue!
image.png
TransferQueue的主要方法解析:

  1. public interface TransferQueue<E> extends BlockingQueue<E> {
  2. // 如果可能,立即将元素转移给等待的消费者。
  3. // 更确切地说,如果存在消费者已经等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则返回 false。
  4. boolean tryTransfer(E e);
  5. // 将元素转移给消费者,如果需要的话等待。
  6. // 更准确地说,如果存在一个消费者已经等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,则立即传送指定的元素,否则等待直到元素由消费者接收。
  7. void transfer(E e) throws InterruptedException;
  8. // 上面方法的基础上设置超时时间
  9. boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
  10. // 如果至少有一位消费者在等待,则返回 true
  11. boolean hasWaitingConsumer();
  12. // 返回等待消费者人数的估计值
  13. int getWaitingConsumerCount();
  14. }

LinkedTransferQueueSynchronousQueueLinkedBlockingQueue 的合体,性能比 LinkedBlockingQueue 更高(没有锁操作),比 SynchronousQueue能存储更多的元素。
put 时,如果有等待的线程,就直接将元素 “交给” 等待者, 否则直接进入队列。
puttransfer 方法的区别是,put 是立即返回的, transfer 是阻塞等待消费者拿到数据才返回。transfer方法和 SynchronousQueue的 put 方法类似。

2.2.3 PriorityQueue

PriorityQueue是基于优先堆的一个无界队列,这个优先队列中的元素可以默认自然排序或者通过提供的Comparator(比较器)在队列实例化的时排序。
image.png
优先队列不允许空值,而且不支持non-comparable(不可比较)的对象,比如用户自定义的类。优先队列要求使用Java Comparable和Comparator接口给对象排序,并且在排序时会按照优先级处理其中的元素。
优先队列的头是基于自然排序或者Comparator排序的最小元素。如果有多个对象拥有同样的排序,那么就可能随机地取其中任意一个。当我们获取队列时,返回队列的头对象。
优先队列的大小是不受限制的,但在创建时可以指定初始大小。当我们向优先队列增加元素的时候,队列大小会自动增加。
PriorityQueue是非线程安全的,所以Java提供了PriorityBlockingQueue(实现BlockingQueue接口)用于Java多线程环境

  1. import lombok.AllArgsConstructor;
  2. import lombok.Data;
  3. import lombok.NoArgsConstructor;
  4. import lombok.ToString;
  5. import java.util.Comparator;
  6. import java.util.PriorityQueue;
  7. import java.util.Queue;
  8. import java.util.Random;
  9. public class PriorityQueueExample {
  10. private static int queueLength = new Random().nextInt(20);
  11. public static void main(String[] args) {
  12. //优先队列自然排序示例
  13. Queue<Integer> integerPriorityQueue = new PriorityQueue<>(queueLength);
  14. //填充
  15. Random random = new Random();
  16. for (int i = 0; i < queueLength; i++) {
  17. integerPriorityQueue.add(random.nextInt(100));
  18. }
  19. //打印
  20. for (int i = 0; i < queueLength; i++) {
  21. System.out.print(integerPriorityQueue.poll()+" ");
  22. }
  23. System.out.println();
  24. //优先队列使用示例
  25. Queue<Person> personPriorityQueue = new PriorityQueue<Person>(queueLength, idComparator);
  26. //填充
  27. for(int i=0;i<queueLength;i++){
  28. int age = random.nextInt(100);
  29. String name = "名字"+age;
  30. personPriorityQueue.add(new Person(age,name));
  31. }
  32. //打印
  33. for (int i = 0; i < queueLength; i++) {
  34. System.out.println(personPriorityQueue.poll());
  35. }
  36. }
  37. //匿名Comparator实现
  38. public static Comparator<Person> idComparator = new Comparator<Person>(){
  39. @Override
  40. public int compare(Person p1, Person p2) {
  41. return p1.getAge()-p2.getAge();
  42. }
  43. };
  44. @Data
  45. @AllArgsConstructor
  46. @NoArgsConstructor
  47. @ToString
  48. private static class Person{
  49. private int age;
  50. private String name;
  51. }
  52. }

结果:

3 4 11 11 30 51 63 79 81 95 PriorityQueueExample.Person(age=4, name=名字4) PriorityQueueExample.Person(age=8, name=名字8) PriorityQueueExample.Person(age=10, name=名字10) PriorityQueueExample.Person(age=30, name=名字30) PriorityQueueExample.Person(age=30, name=名字30) PriorityQueueExample.Person(age=54, name=名字54) PriorityQueueExample.Person(age=67, name=名字67) PriorityQueueExample.Person(age=71, name=名字71) PriorityQueueExample.Person(age=73, name=名字73) PriorityQueueExample.Person(age=83, name=名字83)