1. ConcurrentHashMap

  • 多线程中直接使用 HashMap 进行 put 操作会引起死循环,而使用线程安全的 HashTable 效率低下(因为使用 synchronized 锁住全字段)。
  • ConcurrentHashMap 则采用分段锁设计,提升并发访问率,分段锁数组的最大长度为65536。

其由分段锁 Segment (其亦是重入锁)和 HashEntry 两种数组结构构成。每个 ConcurrentHashMap 包含一个 Segment 数组,每一个 Segment 元素守护一个 HashEntry 数组。

减少散列冲突:插入或查询时,会做一次再散列,让元素均匀分布在不同的 Segment 上。
统计大小:每个 Segment 中有一个 volatile 的全局变量 count ,在统计大小时先尝试2次不锁住 Segment 的方式计算总大小,如果结果一致则返回;如果不一致再加锁求值。此处的结果一致指的是在进行 put()remove() 等可能影响总大小的操作时,对变量 modCount 加一,则累计 count 前后比较modCount 值是否一致即可。

2. ConcurrentLinkedQueue/ConcurrentLinkedDueue

它们是一个基于链接节点的无界安全队列,采用FIFO模式(单向或双向)。

入列过程:定位尾节点,基于CAS将入队节点设为尾节点的 next 节点。

tails 并不一定指向队尾,这是为了减少使用CAS设置尾节点的次数。出队列时的 head 更新亦为同理。

3. 阻塞队列 BlockingQueue

支持阻塞插入和阻塞移除的队列,常用于生产者和消费者的场景,采用 Condition 实现(通知/唤醒模式)实现。

  • 队列满时插入抛出 IllegalStateException 、队列空时移除抛出 NoSuchElementException 异常。

存在以下实现类:

  1. ArrayBlockingQueue :用数组实现的有界阻塞队列,采用FIFO,但不保证公平。
  2. DelayQueue :支持延迟获取元素的无界阻塞队列,基于PriorityBlockingQueue 实现;常用于缓存获取系统、定时任务调度,存储元素需要实现 Delay 接口。
  3. LinkedBlockingDeque :用链表实现的有界阻塞队列,FIFO。
  4. LinkedBlockingQueue :用链表实现的有界双向阻塞队列。
  5. LinkedTransferQueue :用链表实现的无界阻塞队列。
  6. PriorityBlockingQueue :支持优先级的无界阻塞队列,采用自然升序排列,不一定保证同级元素的顺序。
  7. SynchronousQueue :不存储元素的阻塞队列,可以看作长度为1的阻塞队列,因此可以选择公平访问策略。

    4. Fork/Join框架

工作窃取算法:某个线程从其它队列里窃取任务来执行,使用双端队列实现——从队列尾偷取、从队列头正常提取。

框架设计

  1. 任务分割—— ForkJoinTask ;其一般有两个实现子类—— RecursiveActionRecursiveTask 分别用于无/有返回结果的任务。
  2. 执行任务合并—— ForkJoinPool

使用举例:

  1. public static void main(String[] args) throws Exception {
  2. ForkJoinPool forkJoinPool = new ForkJoinPool();
  3. Future<Integer> future = forkJoinPool.submit(new CountTask(1, 100000000));
  4. long start = System.currentTimeMillis();
  5. System.out.println("并行结果:" + future.get() + ", 并行耗时:" + (System.currentTimeMillis()-start));
  6. start = System.currentTimeMillis();
  7. int sums = 0;
  8. for(int i=1; i<=100000000; i++){ sums += i; }
  9. System.out.println("并行结果:" + sums + ", 串行耗时:" + (System.currentTimeMillis()-start));
  10. }
  11. static class CountTask extends RecursiveTask<Integer>{
  12. private int start, end;
  13. private final static int FORK_THRESHOLD = 100000; // 控制任务粒度
  14. CountTask(int start, int end){
  15. this.start = start;
  16. this.end = end;
  17. }
  18. @Override
  19. protected Integer compute() {
  20. int sums = 0;
  21. if(this.end - this.start > CountTask.FORK_THRESHOLD){
  22. int mid = (this.end + this.start) / 2;
  23. CountTask left = new CountTask(this.start, mid); // 拆分任务
  24. CountTask right = new CountTask(mid+1, this.end);
  25. left.fork(); right.fork();
  26. sums += left.join() + right.join(); // 收集结果
  27. } else {
  28. for(int i=this.start; i<=this.end; i++){ sums += i; }
  29. }
  30. return sums;
  31. }
  32. }