对传统线程池的增强;也是对传统线程池的补充。适合CPU计算密集型。
传统线程池,仅单纯执行提交的任务,不会对提交的任务,进行切分(但也可以通过递归改造);也没有窃取功能。而forkjoin可以对提交的任务,按指定的逻辑进行切分
传统线程池隐患
利用递归改造任务+传统线程池,因为递归一次,都会新建一个线程。线程数最大值,不可预估,可能达到最大,导致无法线程池无法继续创建,最终致线程池饥饿挂起。
ForkJoinPool
按照指定逻辑拆分任务,拆分的任务,会放入工作队列中。
一、合理设置线程数
CPU密集型任务
CPU密集型任务也叫计算密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍以上,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降。
IO密集型任务
IO密集型任务,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。对于这种任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在工作队列中等待的任务就会减少,可以更好地利用资源。
线程数计算公式
线程数 = CPU 核心数 (1+平均等待时间/平均工作时间)
可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少。
太少的线程数会使得程序整体性能降低,而过多的线程也会消耗内存等其他资源,所以*如果想要更准确的话,可以进行压测,监控 JVM 的线程情况以及 CPU 的负载情况,根据实际情况衡量应该创建的线程数,合理并充分利用资源。
二、Fork/Join使用
Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。
多个线程,每个线程维护一个双端工作队列(当前线程提交的任务,如果拆分,则存入工作队列中)。不同线程的多个工作队列使用工作队列数组存储(在数组中下标位置,使用类hash算法确定)。
ForkJoinPool
ForkJoinPool 是用于执行 ForkJoinTask 任务的执行池,不再是传统执行池 Worker+Queue 的组合式,而是维护了一个队列数组 WorkQueue(WorkQueue[]),这样在提交任务和线程任务的时候大幅度减少碰撞。
构造函数
/**
* ForkJoinPool中有【四个核心参数】,用于控制线程池。1.并行数、2.工作线程的创建、3.异常处理、4.模式指定
* 1.parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定【工作线程的数量】。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别
* 2.ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是ThreadFactory。如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作;
* 3.UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理
* 4.boolean asyncMode:设置队列的工作模式:asyncMode ? FIFO_QUEUE : LIFO_QUEUE。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式
**/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
提交任务方式
提交方式 | 返回值 | 方法 |
---|---|---|
提交异步执行 | void | execute(ForkJoinTask task) execute(Runnable task) |
等待并获取结果 | T | invoke(ForkJoinTask task) |
提交执行获取Future结果 | ForkJoinTask | submit(ForkJoinTask task) submit(Callable task) submit(Runnable task) submit(Runnable task, T result) |
ForkJoinPool不仅允许提交ForkJoinTask类型任务,还允许提交Runnable任务;Runnable执行过程?
执行Runnable类型任务时,将会转换为ForkJoinTask类型。由于任务是不可切分的,所以这类任务无法获得任务拆分这方面的效益,不过仍然可以获得任务窃取带来的好处和性能提升
ForkJoinTask
ForkJoinPool的核心之一,任务的实际载体,定义了任务执行时的具体逻辑和拆分逻辑。ForkJoinTask继承了Future接口,所以也可以将其看作是轻量级的Future。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。
fork()——提交子任务
fork()方法向当前的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。
join()——获取任务执行结果
join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。
通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下三个子类:
- **RecursiveAction**:用于递归执行但不需要返回结果的任务。
- **RecursiveTask** :用于递归执行需要返回结果的任务。
- CountedCompleter :在任务完成执行后会触发执行一个自定义的钩子函数
使用案例
public static void main(String[] args) throws ExecutionException, InterruptedException {
int[] array = getRandomArray(10000000, 10);
//递归任务 用于计算数组总和
LongSum ls = new LongSum(array, 0, array.length);
// 构建ForkJoinPool
ForkJoinPool fjp = new ForkJoinPool(12);
//ForkJoin计算数组总和
ForkJoinTask<Long> result = fjp.submit(ls);
System.out.println(result.get());
}
private static int[] getRandomArray(int arrayLength, int maxNum) {
int[] array = new int[arrayLength];
for (int i = 0; i < array.length; i++) {
array[i] = (int) (Math.random() * maxNum);
}
return array;
}
public class LongSum extends RecursiveTask<Long> {
// 任务拆分最小阈值
static final int SEQUENTIAL_THRESHOLD = 10000;
int low;
int high;
int[] array;
LongSum(int[] arr, int lo, int hi) {
array = arr;
low = lo;
high = hi;
}
@Override
protected Long compute() {
//当任务拆分到小于等于阀值时开始求和
if (high - low <= SEQUENTIAL_THRESHOLD) {
long sum = 0;
for (int i = low; i < high; ++i) {
sum += array[i];
}
return sum;
} else { // 任务过大继续拆分
int mid = low + (high - low) / 2;
LongSum left = new LongSum(array, low, mid);
LongSum right = new LongSum(array, mid, high);
// 提交任务
left.fork();
right.fork();
//获取任务的执行结果,将阻塞当前线程直到对应的子任务完成运行并返回结果
long rightAns = right.join();
long leftAns = left.join();
return leftAns + rightAns;
}
}
}
ForkJoinTask使用限制
ForkJoinTask最适合用于纯粹的计算任务,计算过程中的对象都是独立的,对外部没有依赖。提交到ForkJoinPool中的任务应避免执行阻塞I/O。
三、工作原理
- ForkJoinPool 内部有多个工作队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个工作队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的工作队列中。
- ForkJoinPool 的每个工作线程(ForkJoinWorkThread)都维护着一个工作队列(WorkQueue),是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
- 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的top,并且工作线程在处理自己的工作队列时,使用的是 LIFO (后进先出)方式,也就是说每次从top取出任务来执行。
- 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务,窃取的任务位于其他线程的工作队列的base,也就是说工作线程在窃取其他工作线程的任务时,使用的是FIFO(先进先出) 方式。
- 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
-
工作窃取
与ThreadPoolExecutor有个很大的不同之处在于,ForkJoinPool引入了工作窃取设计,它是其性能保证的关键之一。
工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。最大限度地减少了线程竞争任务的可能性。
ForkJoinPool的大部分操作都发生在工作窃取队列(work-stealing queues ) 中,该队列由内部类WorkQueue实现。它是Deques的特殊形式,但仅支持三种操作方式:push、pop和poll(也称为窃取)。在ForkJoinPool中,队列的读取有着严格的约束,push和pop仅能从其所属线程调用,而poll则可以从其他线程调用。
优缺点- 优点:充分利用线程进行并行计算,并减少了线程间的竞争;
- 缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
工作队列WorkQueue
- WorkQueue 是**双向**列表,用于任务的有序执行,如果 WorkQueue 用于自己的执行线程 Thread,线程默认将会从尾端选取任务用来执行 LIFO。
- 每个 ForkJoinWorkThread **都有属于自己的 WorkQueue**,但不是每个 WorkQueue 都有对应的 ForkJoinWorkThread。
- **没有 ForkJoinWorkThread 的 WorkQueue** 保存的是 submission,来自外部提交,在WorkQueues[] 的下标是 **偶数** 位。
ForkJoinWorkThread
ForkJoinWorkThread 是用于执行任务的线程,用于区别使用非 ForkJoinWorkThread 线程提交task。启动一个该 Thread,会自动注册一个 WorkQueue 到 Pool,拥有 Thread 的 WorkQueue 只能出现在 WorkQueues[] 的 奇数 位。
总结
Fork/Join是一种基于分治算法的模型,在并发处理计算型任务时有着显著的优势。其效率的提升主要得益于两个方面:
- **任务切分**:将大的任务分割成更小粒度的小任务,让更多的线程参与执行;
- **任务窃取**:通过任务窃取,充分地利用空闲线程,并减少竞争。
在使用ForkJoinPool时,需要特别注意任务的类型是否为纯函数计算类型,也就是这些任务不应该关心状态或者外界的变化,这样才是最安全的做法。如果是阻塞类型任务,那么你需要谨慎评估技术方案。虽然ForkJoinPool也能处理阻塞类型任务,但可能会带来复杂的管理成本。
四、源码分析
submit(ForkJoinTask task)方法
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
/**
* 尝试将给定任务添加到当前提交者线程对应的队列中。在筛选是否需要externalSubmit时,此方法仅直接处理(非常)常见的路径
**/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
//尝试将任务添加到队列中(非首次执行,才会进入if内部逻辑)
if ((ws = workQueues) != null //工作队列【数组】不为空
&& (m = (ws.length - 1)) >= 0 //队列【数组】
&& (q = ws[m & r & SQMASK]) != null //当前线程在队列【数组】中是否存在队列
&& r != 0
&& rs > 0
&& U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
//任务入工作队列
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
//首次执行直接执行此方法。1.初始化工作队列数组,2.创建工作队列,3.任务放入工作队列
externalSubmit(task);
}
externalSubmit(task)
//1.初始化工作队列数组,2.创建工作队列,3.任务放入工作队列,4.唤醒工作线程,执行任务
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
//1.初始化工作队列数组
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
//3.任务放入工作队列
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
//4.唤醒工作线程,执行任务
if (submitted) {
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
//2.创建工作队列
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
signalWork(WorkQueue[] ws, WorkQueue q)
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active
if ((sp = (int)c) == 0) { // no idle workers。不存在空闲线程,则尝试创建工作线程
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}