1. ConcurrentHashMap
- 多线程中直接使用
HashMap
进行put
操作会引起死循环,而使用线程安全的HashTable
效率低下(因为使用synchronized
锁住全字段)。 ConcurrentHashMap
则采用分段锁设计,提升并发访问率,分段锁数组的最大长度为65536。
其由分段锁 Segment
(其亦是重入锁)和 HashEntry
两种数组结构构成。每个 ConcurrentHashMap
包含一个 Segment
数组,每一个 Segment
元素守护一个 HashEntry
数组。
减少散列冲突:插入或查询时,会做一次再散列,让元素均匀分布在不同的 Segment
上。
统计大小:每个 Segment
中有一个 volatile
的全局变量 count
,在统计大小时先尝试2次不锁住 Segment
的方式计算总大小,如果结果一致则返回;如果不一致再加锁求值。此处的结果一致指的是在进行 put()
、 remove()
等可能影响总大小的操作时,对变量 modCount
加一,则累计 count
前后比较modCount
值是否一致即可。
2. ConcurrentLinkedQueue/ConcurrentLinkedDueue
它们是一个基于链接节点的无界安全队列,采用FIFO模式(单向或双向)。
入列过程:定位尾节点,基于CAS将入队节点设为尾节点的 next
节点。
tails
并不一定指向队尾,这是为了减少使用CAS设置尾节点的次数。出队列时的head
更新亦为同理。
3. 阻塞队列 BlockingQueue
支持阻塞插入和阻塞移除的队列,常用于生产者和消费者的场景,采用 Condition
实现(通知/唤醒模式)实现。
- 队列满时插入抛出
IllegalStateException
、队列空时移除抛出NoSuchElementException
异常。
存在以下实现类:
ArrayBlockingQueue
:用数组实现的有界阻塞队列,采用FIFO,但不保证公平。DelayQueue
:支持延迟获取元素的无界阻塞队列,基于PriorityBlockingQueue
实现;常用于缓存获取系统、定时任务调度,存储元素需要实现Delay
接口。LinkedBlockingDeque
:用链表实现的有界阻塞队列,FIFO。LinkedBlockingQueue
:用链表实现的有界双向阻塞队列。LinkedTransferQueue
:用链表实现的无界阻塞队列。PriorityBlockingQueue
:支持优先级的无界阻塞队列,采用自然升序排列,不一定保证同级元素的顺序。SynchronousQueue
:不存储元素的阻塞队列,可以看作长度为1的阻塞队列,因此可以选择公平访问策略。4. Fork/Join框架
工作窃取算法:某个线程从其它队列里窃取任务来执行,使用双端队列实现——从队列尾偷取、从队列头正常提取。
框架设计:
- 任务分割——
ForkJoinTask
;其一般有两个实现子类——RecursiveAction
和RecursiveTask
分别用于无/有返回结果的任务。 - 执行任务合并——
ForkJoinPool
。
使用举例:
public static void main(String[] args) throws Exception {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future<Integer> future = forkJoinPool.submit(new CountTask(1, 100000000));
long start = System.currentTimeMillis();
System.out.println("并行结果:" + future.get() + ", 并行耗时:" + (System.currentTimeMillis()-start));
start = System.currentTimeMillis();
int sums = 0;
for(int i=1; i<=100000000; i++){ sums += i; }
System.out.println("并行结果:" + sums + ", 串行耗时:" + (System.currentTimeMillis()-start));
}
static class CountTask extends RecursiveTask<Integer>{
private int start, end;
private final static int FORK_THRESHOLD = 100000; // 控制任务粒度
CountTask(int start, int end){
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sums = 0;
if(this.end - this.start > CountTask.FORK_THRESHOLD){
int mid = (this.end + this.start) / 2;
CountTask left = new CountTask(this.start, mid); // 拆分任务
CountTask right = new CountTask(mid+1, this.end);
left.fork(); right.fork();
sums += left.join() + right.join(); // 收集结果
} else {
for(int i=this.start; i<=this.end; i++){ sums += i; }
}
return sums;
}
}