1. ConcurrentHashMap
- 多线程中直接使用
HashMap进行put操作会引起死循环,而使用线程安全的HashTable效率低下(因为使用synchronized锁住全字段)。 ConcurrentHashMap则采用分段锁设计,提升并发访问率,分段锁数组的最大长度为65536。
其由分段锁 Segment (其亦是重入锁)和 HashEntry 两种数组结构构成。每个 ConcurrentHashMap 包含一个 Segment 数组,每一个 Segment 元素守护一个 HashEntry 数组。
减少散列冲突:插入或查询时,会做一次再散列,让元素均匀分布在不同的 Segment 上。
统计大小:每个 Segment 中有一个 volatile 的全局变量 count ,在统计大小时先尝试2次不锁住 Segment 的方式计算总大小,如果结果一致则返回;如果不一致再加锁求值。此处的结果一致指的是在进行 put() 、 remove() 等可能影响总大小的操作时,对变量 modCount 加一,则累计 count 前后比较modCount 值是否一致即可。
2. ConcurrentLinkedQueue/ConcurrentLinkedDueue
它们是一个基于链接节点的无界安全队列,采用FIFO模式(单向或双向)。
入列过程:定位尾节点,基于CAS将入队节点设为尾节点的 next 节点。
tails并不一定指向队尾,这是为了减少使用CAS设置尾节点的次数。出队列时的head更新亦为同理。
3. 阻塞队列 BlockingQueue
支持阻塞插入和阻塞移除的队列,常用于生产者和消费者的场景,采用 Condition 实现(通知/唤醒模式)实现。
- 队列满时插入抛出
IllegalStateException、队列空时移除抛出NoSuchElementException异常。
存在以下实现类:
ArrayBlockingQueue:用数组实现的有界阻塞队列,采用FIFO,但不保证公平。DelayQueue:支持延迟获取元素的无界阻塞队列,基于PriorityBlockingQueue实现;常用于缓存获取系统、定时任务调度,存储元素需要实现Delay接口。LinkedBlockingDeque:用链表实现的有界阻塞队列,FIFO。LinkedBlockingQueue:用链表实现的有界双向阻塞队列。LinkedTransferQueue:用链表实现的无界阻塞队列。PriorityBlockingQueue:支持优先级的无界阻塞队列,采用自然升序排列,不一定保证同级元素的顺序。SynchronousQueue:不存储元素的阻塞队列,可以看作长度为1的阻塞队列,因此可以选择公平访问策略。4. Fork/Join框架
工作窃取算法:某个线程从其它队列里窃取任务来执行,使用双端队列实现——从队列尾偷取、从队列头正常提取。
框架设计:
- 任务分割——
ForkJoinTask;其一般有两个实现子类——RecursiveAction和RecursiveTask分别用于无/有返回结果的任务。 - 执行任务合并——
ForkJoinPool。
使用举例:
public static void main(String[] args) throws Exception {ForkJoinPool forkJoinPool = new ForkJoinPool();Future<Integer> future = forkJoinPool.submit(new CountTask(1, 100000000));long start = System.currentTimeMillis();System.out.println("并行结果:" + future.get() + ", 并行耗时:" + (System.currentTimeMillis()-start));start = System.currentTimeMillis();int sums = 0;for(int i=1; i<=100000000; i++){ sums += i; }System.out.println("并行结果:" + sums + ", 串行耗时:" + (System.currentTimeMillis()-start));}static class CountTask extends RecursiveTask<Integer>{private int start, end;private final static int FORK_THRESHOLD = 100000; // 控制任务粒度CountTask(int start, int end){this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sums = 0;if(this.end - this.start > CountTask.FORK_THRESHOLD){int mid = (this.end + this.start) / 2;CountTask left = new CountTask(this.start, mid); // 拆分任务CountTask right = new CountTask(mid+1, this.end);left.fork(); right.fork();sums += left.join() + right.join(); // 收集结果} else {for(int i=this.start; i<=this.end; i++){ sums += i; }}return sums;}}
