java中多线程的开发我们可以自己启用多线程,线程池,还可以使用forkjoin,forkjoin可以让我们不去了解诸如:Thread、Runnable等相关的知识,只要遵循forkjoin的开发模式,就可以写出很好的多线程并发程序。

任务类型

我们调整线程池中的线程数量的最主要的目的是为了充分并合理地使用 CPU 和内存等资源,从而最大限度地提高程序的性能。在实际工作中,我们需要根据任务类型的不同选择对应的策略。

CPU密集型任务

也叫计算密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍以上,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降。

IO密集型任务

比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。对于这种任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在工作队列中等待的任务就会减少,可以更好地利用资源。

线程数计算方法

《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法:

  1. 线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间)

分治算法

分治算法的基本思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题性质相同。求出子问题的解,就可得到原问题的解。
分治算法的步骤如下:
1. 分解:将要解决的问题划分成若干规模较小的同类问题; fork
2. 求解:当子问题划分得足够小时,用较简单的方法解决;
3. 合并:按原问题的要求,将子问题的解逐层合并构成原问题的解。 join
image.png

Fork/Join框架介绍

传统线程池ThreadPoolExecutor有两个明显的缺点:一是无法对大任务进行拆分,对于某个任务只能由单线程执行二是工作线程从队列中获取任务时存在竞争情况。这两个缺点都会 影响任务的执行效率。为了解决传统线程池的缺陷,Java7中引入Fork/Join框架,并在Java8中得到广泛应用。
Fork/Join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool允许其他线程向它提交任务,并根据设定将这些任务拆分为粒度更细的子任务,这些子任务将由ForkJoinPool内部的工作线程来并行执行,并且工作线程之间可以窃取彼此之间的任务。
image.png
Fork/Join计算框架主要包含两部分,一部分是分治任务的线程池ForkJoinPool,另一部分是分治任务 ForkJoinTask

ForkJoinPool

ForkJoinPool是用于执行 ForkJoinTask 任务的执行池,不再是传统执行池Worker+Queue的组合式,而是维护了一个队列数组 WorkQueue(WorkQueue[]),这样在提交任务和线程任务的时候大幅度减少碰撞。
任务提交是ForkJoinPool的核心能力之一,提交任务有三种方式:

返回值 方法
提交异步执行 void execute(ForkJoinTask task) execute(Runnable task)
等待并获取结果 T invoke(ForkJoinTask task)
提交执行获取Future结果 ForkJoinTask submit(ForkJoinTask task) submit(Callable task) submit(Runnable task) submit(Runnable task, T result)

ForkJoinTask

ForkJoinTask是ForkJoinPool的核心之一,它是任务的实际载体,定义了任务执行时的具体逻辑和拆分逻辑。ForkJoinTask继承了Future接口,所以也可以将其看作是轻量级的Future。
ForkJoinTask是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。
fork()——提交任务
fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。
join()——获取任务执行结果
join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任 务完成运行并返回结果。
通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下三个子类:
RecursiveAction:用于递归执行但不需要返回结果的任务。
RecursiveTask :用于递归执行需要返回结果的任务。
CountedCompleter :在任务完成执行后会触发执行一个自定义的钩子函数。

  1. public class LongSum extends RecursiveTask<Long> {
  2. // 任务拆分最小阈值
  3. static final int SEQUENTIAL_THRESHOLD = 10000;
  4. int low;
  5. int high;
  6. int[] array;
  7. LongSum(int[] arr, int lo, int hi) {
  8. array = arr;
  9. low = lo;
  10. high = hi;
  11. }
  12. @Override
  13. protected Long compute() {
  14. //当任务拆分到小于等于阀值时开始求和
  15. if (high low <= SEQUENTIAL_THRESHOLD) {
  16. long sum = 0;
  17. for (int i = low; i < high; ++i) {
  18. sum += array[i];
  19. }
  20. return sum;
  21. } else { // 任务过大继续拆分
  22. int mid = low + (high low) / 2;
  23. LongSum left = new LongSum(array, low, mid);
  24. LongSum right = new LongSum(array, mid, high);
  25. // 提交任务
  26. left.fork();
  27. right.fork();
  28. //获取任务的执行结果,将阻塞当前线程直到对应的子任务完成运行并返回结果
  29. long rightAns = right.join();
  30. long leftAns = left.join();
  31. return leftAns + rightAns;
  32. }
  33. }
  34. }

ForkJoinTask使用限制
ForkJoinTask最适合用于纯粹的计算任务,也就是纯函数计算,计算过程中的对象都是独立的,对外部没有依赖。提交到ForkJoinPool中的任务应避免执行阻塞I/O。

ForkJoinPool 的工作原理

ForkJoinPool 内部有多个工作队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个工作队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的工作队列中。
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的top,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次 从top取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务,窃取的任务位于其他线程的工作队列的base,也就是说工作线程在窃取其他工作线程的任务时,使用的 是FIFO方式。
在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
在既没有自己的任务,也没有可以窃取的任务时,进入休眠 。

工作窃取

ForkJoinPool与ThreadPoolExecutor有个很大的不同之处在于,ForkJoinPool存在引入了工作窃取设计,它是其性能保证的关键之一。工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。
ForkJoinPool的大部分操作都发生在工作窃取队列(work-stealing queues ) 中,该队列由内部类WorkQueue实现。它是Deques的特殊形式,但仅支持三种操作方式:push、pop 和poll(也称为窃取)。在ForkJoinPool中,队列的读取有着严格的约束,push和pop仅能从 其所属线程调用,而poll则可以从其他线程调用。
工作窃取的运行流程如下图所示 :
image.png
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争;
工作窃取算法缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

工作队列WorkQueue

WorkQueue 是双向列表,用于任务的有序执行,如果 WorkQueue 用于自己的执行线程 Thread,线程默认将会从尾端选取任务用来执行LIFO。
每个 ForkJoinWorkThread 都有属于自己的 WorkQueue,但不是每个 WorkQueue 都有对应的 ForkJoinWorkThread。
没有 ForkJoinWorkThread 的 WorkQueue 保存的是 submission,来自外部提交,在WorkQueues[] 的下标是偶数位。
image.png

ForkJoinWorkThread

ForkJoinWorkThread 是用于执行任务的线程,用于区别使用非 ForkJoinWorkThread 线程提交task。启动一个该 Thread,会自动注册一个WorkQueue到Pool,拥有Thread的 WorkQueue 只能出现在WorkQueues[] 的奇数位。
image.png

ForkJoinPool执行流程

image.png

问题一:CPU密集型任务和IO密集型任务有什么区别

CPU密集型任务
也叫计算密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍以上,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多会导致性能下降。
IO密集型任务
比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。对于这种任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在工作队列中等待的任务就会减少,可以更好地利用资源。

问题二:谈谈对Fork/Join的理解

传统线程池ThreadPoolExecutor有两个明显的缺点:一是无法对大任务进行拆分,对于某个任务只能由单线程执行;二是工作线程从队列中获取任务时存在竞争情况。这两个缺点都会影响任务的执行效率。为了解决传统线程池的缺陷,Java7中引入Fork/Join框架,并在Java8中得到广泛应用。
Fork/Join是一种基于分治算法的模型,在并发处理计算型任务时有着显著的优势。其效率的提升主要得益于两个方面:
任务切分:将大的任务分割成更小粒度的小任务,让更多的线程参与执行;
任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争。
ForkJoinPool最适合计算密集型任务,而且最好是非阻塞任务。ForkJoinPool是ThreadPoolExecutor线程池的一种补充,是对计算密集型场景的加强。

问题三:SafePoint是什么?

如GC的时候必须要等到Java线程都进入到safepoint的时候VMThread才能开始执行GC
1. 循环的末尾 (防止大循环的时候一直不进入safepoint,而其他线程在等待它进入safepoint)
2. 方法返回前
3. 调用方法的call之后
4. 抛出异常的位置

问题四:ForkJoinPool为什么这么设计,工作线程总是从头部获取任务,窃取线程从尾部获取任务?

这样做的主要原因是为了提高性能,通过始终选择最近提交的任务,可以增加资源仍分配 在CPU缓存中的机会,这样CPU处理起来要快一些。而窃取者之所以从尾部获取任务,则是为 了降低线程之间的竞争可能,毕竟大家都从一个部分拿任务,竞争的可能要大很多。
此外,这样的设计还有一种考虑。由于任务是可分割的,那队列中较旧的任务最有可能粒度较大,因为它们可能还没有被分割,而空闲的线程则相对更有“精力”来完成这些粒度较大的任务。