对传统线程池的增强;也是对传统线程池的补充。适合CPU计算密集型。
传统线程池,仅单纯执行提交的任务,不会对提交的任务,进行切分(但也可以通过递归改造);也没有窃取功能。而forkjoin可以对提交的任务,按指定的逻辑进行切分

传统线程池隐患
利用递归改造任务+传统线程池,因为递归一次,都会新建一个线程。线程数最大值,不可预估,可能达到最大,导致无法线程池无法继续创建,最终致线程池饥饿挂起。
ForkJoinPool
按照指定逻辑拆分任务,拆分的任务,会放入工作队列中。
image.png

一、合理设置线程数

CPU密集型任务

CPU密集型任务也叫计算密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍以上,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降。

IO密集型任务

IO密集型任务,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。对于这种任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在工作队列中等待的任务就会减少,可以更好地利用资源。

线程数计算公式

线程数 = CPU 核心数 (1+平均等待时间/平均工作时间)
可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少。
太少的线程数会使得程序整体性能降低,而过多的线程也会消耗内存等其他资源,所以*如果想要更准确的话,可以进行压测
,监控 JVM 的线程情况以及 CPU 的负载情况,根据实际情况衡量应该创建的线程数,合理并充分利用资源。

二、Fork/Join使用

Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。
多个线程,每个线程维护一个双端工作队列(当前线程提交的任务,如果拆分,则存入工作队列中)。不同线程的多个工作队列使用工作队列数组存储(在数组中下标位置,使用类hash算法确定)。

线程创建时,会绑定一个工作队列

ForkJoinPool

ForkJoinPool 是用于执行 ForkJoinTask 任务的执行池,不再是传统执行池 Worker+Queue 的组合式,而是维护了一个队列数组 WorkQueue(WorkQueue[]),这样在提交任务和线程任务的时候大幅度减少碰撞

构造函数

image.png

  1. /**
  2. * ForkJoinPool中有【四个核心参数】,用于控制线程池。1.并行数、2.工作线程的创建、3.异常处理、4.模式指定
  3. * 1.parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定【工作线程的数量】。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别
  4. * 2.ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是ThreadFactory。如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作;
  5. * 3.UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理
  6. * 4.boolean asyncMode:设置队列的工作模式:asyncMode ? FIFO_QUEUE : LIFO_QUEUE。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式
  7. **/
  8. public ForkJoinPool(int parallelism,
  9. ForkJoinWorkerThreadFactory factory,
  10. UncaughtExceptionHandler handler,
  11. boolean asyncMode) {
  12. this(checkParallelism(parallelism),
  13. checkFactory(factory),
  14. handler,
  15. asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
  16. "ForkJoinPool-" + nextPoolId() + "-worker-");
  17. checkPermission();
  18. }

提交任务方式

提交方式 返回值 方法
提交异步执行 void execute(ForkJoinTask task)
execute(Runnable task)
等待并获取结果 T invoke(ForkJoinTask task)
提交执行获取Future结果 ForkJoinTask submit(ForkJoinTask task)
submit(Callable task)
submit(Runnable task)
submit(Runnable task, T result)

ForkJoinPool不仅允许提交ForkJoinTask类型任务,还允许提交Runnable任务;Runnable执行过程?
执行Runnable类型任务时,将会转换为ForkJoinTask类型。由于任务是不可切分的,所以这类任务无法获得任务拆分这方面的效益,不过仍然可以获得任务窃取带来的好处和性能提升

ForkJoinTask

ForkJoinPool的核心之一,任务的实际载体,定义了任务执行时的具体逻辑和拆分逻辑。ForkJoinTask继承了Future接口,所以也可以将其看作是轻量级的Future。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。
image.png

fork()——提交子任务

fork()方法向当前的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。

join()——获取任务执行结果

join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。
通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下三个子类:

  1. - **RecursiveAction**:用于递归执行但不需要返回结果的任务。
  2. - **RecursiveTask** :用于递归执行需要返回结果的任务。
  3. - CountedCompleter :在任务完成执行后会触发执行一个自定义的钩子函数

使用案例

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. int[] array = getRandomArray(10000000, 10);
  3. //递归任务 用于计算数组总和
  4. LongSum ls = new LongSum(array, 0, array.length);
  5. // 构建ForkJoinPool
  6. ForkJoinPool fjp = new ForkJoinPool(12);
  7. //ForkJoin计算数组总和
  8. ForkJoinTask<Long> result = fjp.submit(ls);
  9. System.out.println(result.get());
  10. }
  11. private static int[] getRandomArray(int arrayLength, int maxNum) {
  12. int[] array = new int[arrayLength];
  13. for (int i = 0; i < array.length; i++) {
  14. array[i] = (int) (Math.random() * maxNum);
  15. }
  16. return array;
  17. }
  18. public class LongSum extends RecursiveTask<Long> {
  19. // 任务拆分最小阈值
  20. static final int SEQUENTIAL_THRESHOLD = 10000;
  21. int low;
  22. int high;
  23. int[] array;
  24. LongSum(int[] arr, int lo, int hi) {
  25. array = arr;
  26. low = lo;
  27. high = hi;
  28. }
  29. @Override
  30. protected Long compute() {
  31. //当任务拆分到小于等于阀值时开始求和
  32. if (high - low <= SEQUENTIAL_THRESHOLD) {
  33. long sum = 0;
  34. for (int i = low; i < high; ++i) {
  35. sum += array[i];
  36. }
  37. return sum;
  38. } else { // 任务过大继续拆分
  39. int mid = low + (high - low) / 2;
  40. LongSum left = new LongSum(array, low, mid);
  41. LongSum right = new LongSum(array, mid, high);
  42. // 提交任务
  43. left.fork();
  44. right.fork();
  45. //获取任务的执行结果,将阻塞当前线程直到对应的子任务完成运行并返回结果
  46. long rightAns = right.join();
  47. long leftAns = left.join();
  48. return leftAns + rightAns;
  49. }
  50. }
  51. }

ForkJoinTask使用限制
ForkJoinTask最适合用于纯粹的计算任务,计算过程中的对象都是独立的,对外部没有依赖。提交到ForkJoinPool中的任务应避免执行阻塞I/O。

三、工作原理

  • ForkJoinPool 内部有多个工作队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个工作队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的工作队列中。
  • ForkJoinPool 的每个工作线程(ForkJoinWorkThread)都维护着一个工作队列(WorkQueue),是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
  • 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的top,并且工作线程在处理自己的工作队列时,使用的是 LIFO (后进先出)方式,也就是说每次从top取出任务来执行。
  • 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务,窃取的任务位于其他线程的工作队列的base,也就是说工作线程在窃取其他工作线程的任务时,使用的是FIFO(先进先出) 方式。
  • 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
  • 在既没有自己的任务,也没有可以窃取的任务时,进入休眠 。

    工作窃取

    与ThreadPoolExecutor有个很大的不同之处在于,ForkJoinPool引入了工作窃取设计,它是其性能保证的关键之一。
    工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。最大限度地减少了线程竞争任务的可能性。
    ForkJoinPool的大部分操作都发生在工作窃取队列(work-stealing queues ) 中,该队列由内部类WorkQueue实现。它是Deques的特殊形式,但仅支持三种操作方式:push、pop和poll(也称为窃取)。在ForkJoinPool中,队列的读取有着严格的约束,push和pop仅能从其所属线程调用,而poll则可以从其他线程调用。
    优缺点

    1. - 优点:充分利用线程进行并行计算,并减少了线程间的竞争;
    2. - 缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

    工作队列WorkQueue

    1. - WorkQueue 是**双向**列表,用于任务的有序执行,如果 WorkQueue 用于自己的执行线程 Thread,线程默认将会从尾端选取任务用来执行 LIFO
    2. - 每个 ForkJoinWorkThread **都有属于自己的 WorkQueue**,但不是每个 WorkQueue 都有对应的 ForkJoinWorkThread
    3. - **没有 ForkJoinWorkThread WorkQueue** 保存的是 submission,来自外部提交,在WorkQueues[] 的下标是 **偶数** 位。

image.png

ForkJoinWorkThread

ForkJoinWorkThread 是用于执行任务的线程,用于区别使用非 ForkJoinWorkThread 线程提交task。启动一个该 Thread,会自动注册一个 WorkQueue 到 Pool,拥有 Thread 的 WorkQueue 只能出现在 WorkQueues[] 的 奇数 位。
image.png

总结

Fork/Join是一种基于分治算法的模型,在并发处理计算型任务时有着显著的优势。其效率的提升主要得益于两个方面:

  1. - **任务切分**:将大的任务分割成更小粒度的小任务,让更多的线程参与执行;
  2. - **任务窃取**:通过任务窃取,充分地利用空闲线程,并减少竞争。

在使用ForkJoinPool时,需要特别注意任务的类型是否为纯函数计算类型,也就是这些任务不应该关心状态或者外界的变化,这样才是最安全的做法。如果是阻塞类型任务,那么你需要谨慎评估技术方案。虽然ForkJoinPool也能处理阻塞类型任务,但可能会带来复杂的管理成本。

四、源码分析

submit(ForkJoinTask task)方法

  1. public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
  2. if (task == null)
  3. throw new NullPointerException();
  4. externalPush(task);
  5. return task;
  6. }
  7. /**
  8. * 尝试将给定任务添加到当前提交者线程对应的队列中。在筛选是否需要externalSubmit时,此方法仅直接处理(非常)常见的路径
  9. **/
  10. final void externalPush(ForkJoinTask<?> task) {
  11. WorkQueue[] ws; WorkQueue q; int m;
  12. int r = ThreadLocalRandom.getProbe();
  13. int rs = runState;
  14. //尝试将任务添加到队列中(非首次执行,才会进入if内部逻辑)
  15. if ((ws = workQueues) != null //工作队列【数组】不为空
  16. && (m = (ws.length - 1)) >= 0 //队列【数组】
  17. && (q = ws[m & r & SQMASK]) != null //当前线程在队列【数组】中是否存在队列
  18. && r != 0
  19. && rs > 0
  20. && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
  21. ForkJoinTask<?>[] a; int am, n, s;
  22. if ((a = q.array) != null &&
  23. (am = a.length - 1) > (n = (s = q.top) - q.base)) {
  24. int j = ((am & s) << ASHIFT) + ABASE;
  25. //任务入工作队列
  26. U.putOrderedObject(a, j, task);
  27. U.putOrderedInt(q, QTOP, s + 1);
  28. U.putIntVolatile(q, QLOCK, 0);
  29. if (n <= 1)
  30. signalWork(ws, q);
  31. return;
  32. }
  33. U.compareAndSwapInt(q, QLOCK, 1, 0);
  34. }
  35. //首次执行直接执行此方法。1.初始化工作队列数组,2.创建工作队列,3.任务放入工作队列
  36. externalSubmit(task);
  37. }

externalSubmit(task)

  1. //1.初始化工作队列数组,2.创建工作队列,3.任务放入工作队列,4.唤醒工作线程,执行任务
  2. private void externalSubmit(ForkJoinTask<?> task) {
  3. int r; // initialize caller's probe
  4. if ((r = ThreadLocalRandom.getProbe()) == 0) {
  5. ThreadLocalRandom.localInit();
  6. r = ThreadLocalRandom.getProbe();
  7. }
  8. for (;;) {
  9. WorkQueue[] ws; WorkQueue q; int rs, m, k;
  10. boolean move = false;
  11. if ((rs = runState) < 0) {
  12. tryTerminate(false, false); // help terminate
  13. throw new RejectedExecutionException();
  14. }
  15. else if ((rs & STARTED) == 0 || // initialize
  16. ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
  17. int ns = 0;
  18. rs = lockRunState();
  19. try {
  20. if ((rs & STARTED) == 0) {
  21. U.compareAndSwapObject(this, STEALCOUNTER, null,
  22. new AtomicLong());
  23. // create workQueues array with size a power of two
  24. int p = config & SMASK; // ensure at least 2 slots
  25. int n = (p > 1) ? p - 1 : 1;
  26. n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
  27. n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
  28. //1.初始化工作队列数组
  29. workQueues = new WorkQueue[n];
  30. ns = STARTED;
  31. }
  32. } finally {
  33. unlockRunState(rs, (rs & ~RSLOCK) | ns);
  34. }
  35. }
  36. else if ((q = ws[k = r & m & SQMASK]) != null) {
  37. if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
  38. ForkJoinTask<?>[] a = q.array;
  39. int s = q.top;
  40. boolean submitted = false; // initial submission or resizing
  41. try { // locked version of push
  42. if ((a != null && a.length > s + 1 - q.base) ||
  43. (a = q.growArray()) != null) {
  44. int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
  45. //3.任务放入工作队列
  46. U.putOrderedObject(a, j, task);
  47. U.putOrderedInt(q, QTOP, s + 1);
  48. submitted = true;
  49. }
  50. } finally {
  51. U.compareAndSwapInt(q, QLOCK, 1, 0);
  52. }
  53. //4.唤醒工作线程,执行任务
  54. if (submitted) {
  55. signalWork(ws, q);
  56. return;
  57. }
  58. }
  59. move = true; // move on failure
  60. }
  61. else if (((rs = runState) & RSLOCK) == 0) { // create new queue
  62. //2.创建工作队列
  63. q = new WorkQueue(this, null);
  64. q.hint = r;
  65. q.config = k | SHARED_QUEUE;
  66. q.scanState = INACTIVE;
  67. rs = lockRunState(); // publish index
  68. if (rs > 0 && (ws = workQueues) != null &&
  69. k < ws.length && ws[k] == null)
  70. ws[k] = q; // else terminated
  71. unlockRunState(rs, rs & ~RSLOCK);
  72. }
  73. else
  74. move = true; // move if busy
  75. if (move)
  76. r = ThreadLocalRandom.advanceProbe(r);
  77. }
  78. }

signalWork(WorkQueue[] ws, WorkQueue q)

  1. final void signalWork(WorkQueue[] ws, WorkQueue q) {
  2. long c; int sp, i; WorkQueue v; Thread p;
  3. while ((c = ctl) < 0L) { // too few active
  4. if ((sp = (int)c) == 0) { // no idle workers。不存在空闲线程,则尝试创建工作线程
  5. if ((c & ADD_WORKER) != 0L) // too few workers
  6. tryAddWorker(c);
  7. break;
  8. }
  9. if (ws == null) // unstarted/terminated
  10. break;
  11. if (ws.length <= (i = sp & SMASK)) // terminated
  12. break;
  13. if ((v = ws[i]) == null) // terminating
  14. break;
  15. int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
  16. int d = sp - v.scanState; // screen CAS
  17. long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
  18. if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
  19. v.scanState = vs; // activate v
  20. if ((p = v.parker) != null)
  21. U.unpark(p);
  22. break;
  23. }
  24. if (q != null && q.base == q.top) // no more work
  25. break;
  26. }
  27. }

image.png