ForkJoinTask

ForkJoinTask是任务的实际载体,定义了任务执行时的具体逻辑和拆分逻辑。ForkJoinTask继承了Future接口。ForkJoinTask 是一个抽象类,最核心的是 fork() 方法和 join() 方法。

  • fork,提交任务
    • fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。
  • join(),获取任务执行结果
    • join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果
  1. public class LongSum extends RecursiveTask<Long> {
  2. // 任务拆分最小阈值
  3. static final int SEQUENTIAL_THRESHOLD = 20000000;
  4. int low;
  5. int high;
  6. int[] array;
  7. LongSum(int[] arr, int lo, int hi) {
  8. array = arr;
  9. low = lo;
  10. high = hi;
  11. }
  12. @Override
  13. protected Long compute() {
  14. //当任务拆分到小于等于阀值时开始求和
  15. if (high - low <= SEQUENTIAL_THRESHOLD) {
  16. long sum = 0;
  17. for (int i = low; i < high; ++i) {
  18. sum += array[i];
  19. }
  20. return sum;
  21. } else { // 任务过大继续拆分
  22. int mid = low + (high - low) / 2;
  23. LongSum left = new LongSum(array, low, mid);
  24. LongSum right = new LongSum(array, mid, high);
  25. // 提交任务
  26. left.fork();
  27. right.fork();
  28. //获取任务的执行结果,将阻塞当前线程直到对应的子任务完成运行并返回结果
  29. long rightAns = right.compute();
  30. long leftAns = left.join();
  31. return leftAns + rightAns;
  32. }
  33. }
  34. }
  35. public class LongSumMain {
  36. // 获取逻辑处理器数量 12
  37. static final int NCPU = Runtime.getRuntime().availableProcessors();
  38. static long calcSum;
  39. public static void main(String[] args) throws Exception {
  40. //准备数组
  41. int[] array = Utils.buildRandomIntArray(100000000);
  42. Instant now = Instant.now();
  43. // 单线程计算数组总和
  44. calcSum = seqSum(array);
  45. System.out.println("seq sum=" + calcSum);
  46. System.out.println("执行时间:"+ Duration.between(now,Instant.now()).toMillis());
  47. //递归任务
  48. LongSum ls = new LongSum(array, 0, array.length);
  49. // 构建ForkJoinPool
  50. ForkJoinPool fjp = new ForkJoinPool(NCPU);
  51. now = Instant.now();
  52. //ForkJoin计算数组总和
  53. ForkJoinTask<Long> result = fjp.submit(ls);
  54. System.out.println("forkjoin sum=" + result.get());
  55. System.out.println("执行时间:"+ Duration.between(now,Instant.now()).toMillis());
  56. fjp.shutdown();
  57. now = Instant.now();
  58. //并行流计算数组总和
  59. Long sum = (Long) IntStream.of(array).asLongStream().parallel().sum();
  60. System.out.println("IntStream sum="+sum);
  61. System.out.println("执行时间:"+ Duration.between(now,Instant.now()).toMillis());
  62. }
  63. static long seqSum(int[] array) {
  64. long sum = 0;
  65. for (int i = 0; i < array.length; ++i) {
  66. sum += array[i];
  67. }
  68. return sum;
  69. }
  70. }

截屏2022-04-05 18.04.48.png

ForkJoinTask使用限制

ForkJoinTask最适合用于纯粹的计算任务,也就是纯函数计算,计算过程中的对象都是独立的,对外部没有依赖。提交到ForkJoinPool中的任务应避免执行阻塞I/O。

ForkJoinPool 的工作原理

  • ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列,里面存放的对象是任务(ForkJoinTask)。
  • 每个工作线程在运行中产生新的任务(通常是因为调用了 fork)时,会放入工作队列的top(队头),并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从top(队头)取出任务来执行。
  • 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务,窃取的任务位于其他线程的工作队列的base(队尾),也就是说工作线程在窃取其他工作线程的任务时,使用的是FIFO 方式
  • 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
  • 在既没有自己的任务,也没有可以窃取的任务时,进入休眠 。

工作窃取

工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。

  • 工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争s
  • 工作窃取算法缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列

04-04 ForkJoin工作原理(二) - 图2

工作队列WorkQueue

  • WorkQueue 是双向列表,用于任务的有序执行
  • 每个 ForkJoinWorkThread 都有属于自己的 WorkQueue,但不是每个 WorkQueue 都有对应的 ForkJoinWorkThread
  • 没有 ForkJoinWorkThread 的 WorkQueue 保存的是 submission,来自外部提交,在WorkQueues[] 的下标是 偶数 位

ForkJoinWorkThread

ForkJoinWorkThread 是用于执行任务的线程,用于区别使用非 ForkJoinWorkThread 线程提交task。启动一个该 Thread,会自动注册一个 WorkQueue 到 Pool,拥有 Thread 的 WorkQueue 只能出现在 WorkQueues[] 的 奇数位

Fork/Join的优势

  • 任务切分:将大的任务分割成更小粒度的小任务,让更多的线程参与执行
  • 任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争