一、任务类型
我们调整线程池中的线程数量的最主要的目的是为了充分并合理地使用 CPU 和内存等资源,从而最大限度地提高程序的性能。在实际工作中,我们需要根据任务类型的不同选择对应的策略。
1、CPU密集型任务
CPU密集型任务也叫计算密集型任务,比如加密、解密、压缩、计算等一系列需要大量耗费 CPU 资源的任务。对于这样的任务最佳的线程数为 CPU 核心数的 1~2 倍,如果设置过多的线程数,实际上并不会起到很好的效果。此时假设我们设置的线程数量是 CPU 核心数的 2 倍以上,因为计算任务非常重,会占用大量的 CPU 资源,所以这时 CPU 的每个核心工作基本都是满负荷的,而我们又设置了过多的线程,每个线程都想去利用 CPU 资源来执行自己的任务,这就会造成不必要的上下文切换,此时线程数的增多并没有让性能提升,反而由于线程数量过多 会导致性能下降。
2、IO密集型任务
IO密集型任务,比如数据库、文件的读写,网络通信等任务,这种任务的特点是并不会特别消耗 CPU 资源,但是 IO 操作很耗时,总体会占用比较多的时间。对于这种任务最大线程数一般会大于 CPU 核心数很多倍,因为 IO 读写速度相比于 CPU 的速度而言是比较慢的,如果我们设置过少的线程数,就可能导致 CPU 资源的浪费。而如果我们设置更多的线程数,那么当 一部分线程正在等待 IO 的时候,它们此时并不需要 CPU 来计算,那么另外的线程便可以利用 CPU 去执行其他的任务,互不影响,这样的话在工作队列中等待的任务就会减少,可以更好地利用资源。
3、线程数计算方法
《Java并发编程实战》的作者 Brain Goetz 推荐的计算方法:线程数 = CPU 核心数 *(1+平均等待时间/平均工作时间),平均等待时间为请求的整个相应时间,平均工作时间为CPU的执行时间。
通过这个公式,我们可以计算出一个合理的线程数量,如果任务的平均等待时间长,线程数就随之增加,而如果平均工作时间长,也就是对于我们上面的 CPU 密集型任务,线程数就随之减少。
太少的线程数会使得程序整体性能降低,而过多的线程也会消耗内存等其他资源,所以如果想要更准确的话,可以进行压测,监控 JVM 的线程情况以及 CPU 的负载情况,根据实际情况衡量应该创建的线程数,合理并充分利用资源。
4、算法题
如何充分利用多核CPU的性能,计算一个很大数组中所有整数的和?
最简单的方法可以使用一个循环,从数组的第一个元素开始累加,直到最后一个元素。但是这种方法只使用到了一个CPU,平没有最大的提高性能,而且随着数组大小的增加,计算时间也会随之增加。
第二种方法,使用多线程,每个线程负责计算数组中的一部分数据,最终把所有线程的结果合并,这种算法就是分治算法(归并算法也是属于此种算法)。
二、分治算法
分治算法的基本思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题性质相同。求出子问题的解,就可得到原问题的解。
1、分解
2、求解
3、合并
按原问题的要求,将子问题的解逐层合并构成原问题的解。
在分治法中,子问题一般是相互独立的,因此,经常通过递归调用算法来求解子问题。
public static class RecursiveSumTask implements Callable<Long> {//拆分的粒度public static final int SEQUENTIAL_CUTOFF = 100000;int lo;int hi;int[] arr; // argumentsExecutorService executorService;RecursiveSumTask(ExecutorService executorService, int[] a, int l, int h) {this.executorService = executorService;this.arr = a;this.lo = l;this.hi = h;}@Overridepublic Long call() throws Exception {System.out.format("%s range [%d-%d] begin to compute %n",Thread.currentThread().getName(), lo, hi);long result = 0;//最小拆分的阈值if (hi - lo <= SEQUENTIAL_CUTOFF) {for (int i = lo; i < hi; i++) {result += arr[i];}System.out.format("%s range [%d-%d] begin to finished %n",Thread.currentThread().getName(), lo, hi);} else {RecursiveSumTask left = new RecursiveSumTask(executorService, arr, lo, (hi + lo) / 2);RecursiveSumTask right = new RecursiveSumTask(executorService, arr, (hi + lo) / 2, hi);Future<Long> lr = executorService.submit(left);Future<Long> rr = executorService.submit(right);result = lr.get() + rr.get();System.out.format("%s range [%d-%d] finished to compute %n",Thread.currentThread().getName(), lo, hi);}return result;}}public static long sum(int[] arr) throws Exception {//思考: 用 Executors.newFixedThreadPool可以吗? 定长线程的饥饿// ExecutorService executorService = Executors.newFixedThreadPool(12);ExecutorService executorService = Executors.newCachedThreadPool();//递归任务 求和RecursiveSumTask task = new RecursiveSumTask(executorService, arr, 0, arr.length);//返回结果long result = executorService.submit(task).get();executorService.shutdown();return result;}public static void main(String[] args) throws Exception {//准备数组int[] arr = Utils.buildRandomIntArray(100000000);System.out.printf("The array length is: %d\n", arr.length);Instant now = Instant.now();//数组求和long result = sum(arr);System.out.println("执行时间:"+ Duration.between(now,Instant.now()).toMillis());System.out.printf("The result is: %d\n", result);}
在代码中,把数组不断的拆分,每次都把数组从中间拆成两个数组,判断这两个是否达到了最小粒度的长度,如果没有达到则将两个数组分别继续拆分,这就导致每个拆分任务都需要一个线程处理,拆分的粒度越小需要的线程数量就越大,Executors.newCachedThreadPool() 线城池是使用消费者和生产者一对一的阻塞队列,没有核心线程,非核心线程为Integer的最大值,那么如果使用固定线程数量的线程池,则会导致线程数量不够的情况。
4、应用场景
分治思想在很多领域都有广泛的应用,例如算法领域有分治算法(归并排序、快速排序都 属于分治算法,二分法查找也是一种分治算法);大数据领域知名的计算框架 MapReduce 背后的思想也是分治。既然分治这种任务模型如此普遍,那 Java 显然也需要支持,Java 并发包里提供了一种叫做 Fork/Join 的并行计算框架,就是用来支持分治这种任务模型的。
三、Fork/Join框架
上面的算法题中,如果使用第二种方法,使用多线程(线程池)来分别计算,最终合并结果,虽然可以得到结果,但是也会出现两个问题,一是无法对大任务进行拆分,对于某个任务只能由单线程执行;二是工作线程从队列中获取任务时存在竞争情况。这两个缺点都会影响任务的执行效率,而且如果把任务拆分的粒度越小,就需要越多的线程来执行任务,假设使用固定线程数量的线程池,那么会出现线程不够用的情况。
为了解决传统线程池的缺陷,Java7中引入Fork/Join框架,并在Java8中得到广泛应用。Fork/Join 框架的核心是 ForkJoinPool 类,它是对 AbstractExecutorService 类的扩展。ForkJoinPool 允许其他线程向它提交任务,并根据设定将这些任务拆分为粒度更细的子任务,这些子任务将由 ForkJoinPool 内部的工作线程来并行执行,并且工作线程之间可以窃取彼此之间的任务。
ForkJoinPool最适合计算密集型任务,而且最好是非阻塞任务。ForkJoinPool是 ThreadPoolExecutor 线程池的一种补充,是对计算密集型场景的加强。
根据经验和实验,任务总数、单任务执行耗时以及并行数都会影响到Fork/Join的性能。所以,当你使用Fork/Join框架时,你需要谨慎评估这三个指标,最好能通过模拟对比评估,不要凭感觉冒然在生产环境使用。
1、Fork/Join的使用
Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是 分治任务 ForkJoinTask。
1)ForkJoinPool
是用于执行 ForkJoinTask 任务的执行池,不再是传统执行池 Worker+Queue 的组合式,而是维护了一个队列数组 WorkQueue(WorkQueue[]),这样在提交任务和线程任务的时候大幅度减少碰撞。
ForkJoinPool 有四个构造方法,其中有一个是private的:
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);}public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);}public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory),handler,asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();}private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);}
ForkJoinPool中有四个核心参数,用于控制线程池的并行数、工作线程的创建、异常处理和模式指定等。
int parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用 Runtime.getRuntime().availableProcessors()来设置并行级别,一般这个参数是逻辑核数的数量。
ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过 factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是 ThreadFactory。如果你不指定factory,那么将由默认的 DefaultForkJoinWorkerThreadFactory负责线程的创建工作。
UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理。
boolean asyncMode:设置队列的工作模式:asyncMode ? FIFO_QUEUE : LIFO_QUEUE。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式,也是默认模式,因为后进的任务一般是被拆分过的任务,相对较小,而且对于计算机硬件来讲,后进的任务很有可能还保存在寄存器中,可以避免再次读取任务。
任务提交是 ForkJoinPool 的核心能力之一,提交任务有三种方式:
// 提交异步执行,没有返回值void execute(ForkJoinTask<?> task);void execute(Runnable task);// 等待并获取结果<T> T invoke(ForkJoinTask<T> task);// 提交执行获取Future结果<T> ForkJoinTask<T> submit(ForkJoinTask<T> task);<T> ForkJoinTask<T> submit(Callable<T> task);<T> ForkJoinTask<T> submit(Runnable task, T result);ForkJoinTask<?> submit(Runnable task);
execute类型的方法在提交任务后,不会返回结果。ForkJoinPool不仅允许提交ForkJoinTask类型任务,还允许提交Runnable任务,执行Runnable类型任务时,将会转换为ForkJoinTask类型。由于任务是不可切分的,所以这类任务无 法获得任务拆分这方面的效益,不过仍然可以获得任务窃取带来的好处和性能提升。
invoke方法接受ForkJoinTask类型的任务,并在任务执行结束后,返回泛型结果。 如果提交的任务是null,将抛出空指针异常。
submit方法支持三种类型的任务提交:ForkJoinTask类型、Callable类型和 Runnable类型。在提交任务后,将返回ForkJoinTask类型的结果。如果提交的任务是 null,将抛出空指针异常,并且当任务不能按计划执行的话,将抛出任务拒绝异常。
//递归任务 用于计算数组总和LongSum ls = new LongSum(array, 0, array.length);// 构建ForkJoinPoolForkJoinPool fjp = new ForkJoinPool(NCPU);//ForkJoin计算数组总和ForkJoinTask<Long> result = fjp.submit(ls);fjp.shutdown();
2)ForkJoinTask
ForkJoinTask是ForkJoinPool的核心之一,它是任务的实际载体,定义了任务执行时的具体逻辑和拆分逻辑。ForkJoinTask继承了Future接口,所以也可以将其看作是轻量级的 Future。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。
fork()——提交任务:
fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是 ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。
join()——获取任务执行结果:
join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。
通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join框架提供了以下三个子类:
RecursiveAction:用于递归执行但不需要返回结果的任务。
RecursiveTask :用于递归执行需要返回结果的任务。
CountedCompleter :在任务完成执行后会触发执行一个自定义的钩子函数
我们在提交任务给ForkJoinPool时,提交的任务需要实现以上三个类的其中一个,并重写其方法,以RecursiveTask 类为例,需要重写compute()方法:
public class LongSum extends RecursiveTask<Long> {// 任务拆分最小阈值static final int SEQUENTIAL_THRESHOLD = 10000000;int low;int high;int[] array;LongSum(int[] arr, int lo, int hi) {array = arr;low = lo;high = hi;}@Overrideprotected Long compute() {//当任务拆分到小于等于阀值时开始求和if (high - low <= SEQUENTIAL_THRESHOLD) {long sum = 0;for (int i = low; i < high; ++i) {sum += array[i];}return sum;} else { // 任务过大继续拆分int mid = low + (high - low) / 2;LongSum left = new LongSum(array, low, mid);LongSum right = new LongSum(array, mid, high);// 提交任务left.fork();right.fork();//获取任务的执行结果,将阻塞当前线程直到对应的子任务完成运行并返回结果long rightAns = right.join();long leftAns = left.join();return leftAns + rightAns;}}}
3)ForkJoinTask使用限制
ForkJoinTask最适合用于纯粹的计算任务,也就是纯函数计算,计算过程中的对象都是独立的,对外部没有依赖。提交到ForkJoinPool中的任务应避免执行阻塞I/O。
2、ForkJoinPool 的工作原理
ForkJoinPool 内部有多个工作队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个工作队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的工作队列中。
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的top,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从top取出任务来执行(栈结构)。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务,窃取的任务位于其他线程的工作队列的base,也就是说工作线程在窃取其他工作线程的任务时,使用的是FIFO 方式,从base窃取任务。
在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
在既没有自己的任务,也没有可以窃取的任务时,进入休眠 。
3、工作窃取
ForkJoinPool与ThreadPoolExecutor有个很大的不同之处在于,ForkJoinPool存在引入了工作窃取设计,它是其性能保证的关键之一。工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。
ForkJoinPool的大部分操作都发生在工作窃取队列(work-stealing queues ) 中,该队列由内部类WorkQueue实现。它是Deques的特殊形式,但仅支持三种操作方式:push、pop 和poll(也称为窃取)。在ForkJoinPool中,队列的读取有着严格的约束,push和pop仅能从其所属线程调用,而poll则可以从其他线程调用。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,也存在多个线程窃取同一个工作队列的任务,或者工作队列中只有一个任务时都会产生竞争,使用CAS来保证线程安全。
工作窃取算法缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
这样做的主要原因是为了提高性能,通过始终选择最近提交的任务,可以增加资源仍分配在CPU缓存中的机会,这样CPU处理起来要快一些。而窃取者之所以从尾部获取任务,则是为 了降低线程之间的竞争可能,毕竟大家都从一个部分拿任务,竞争的可能要大很多。
此外,这样的设计还有一种考虑。由于任务是可分割的,那队列中较旧的任务最有可能粒度较大,因为它们可能还没有被分割,而空闲的线程则相对更有“精力”来完成这些粒度较大的任务。
4、工作队列WorkQueue
WorkQueue 是双向列表,用于任务的有序执行,如果 WorkQueue 用于自己的执行线程 Thread,线程默认将会从尾端选取任务用来执行 LIFO。
每个 ForkJoinWorkThread 都有属于自己的 WorkQueue,但不是每个 WorkQueue 都有对应的 ForkJoinWorkThread。
没有 ForkJoinWorkThread 的 WorkQueue 保存的是 submission,来自外部提交,在WorkQueues[] 的下标是偶数位。
在WorkQueue类中,qlock是用来加锁的,base是栈底,使用volatile关键字,因为存在工作窃取,所以需要保证可见性,top是栈顶,ForkJoinTask[] array是用来保存任务的,所以在工作队列中使用的是数组。
5、ForkJoinWorkThread
ForkJoinWorkThread 是用于执行任务的线程,用于区别使用非 ForkJoinWorkThread 线程提交task。启动一个该 Thread,会自动注册一个 WorkQueue 到 Pool,拥有 Thread 的 WorkQueue 只能出现在 WorkQueues[] 的奇数位。
6、ForkJoinPool源码
在ForkJoinPool中的属性中可以看到一个WorkQueue[] 数组,用来保存工作队列,每个线程在创建完成后会绑定一个工作队列,并保存到这个数组的奇数下标位置。
ForkJoinWorkerThreadFactory是用来创建线程的工厂接口,由DefaultForkJoinWorkerThreadFactory实现类调用newThread()方法来创建线程。
线程工厂创建 ForkJoinWorkerThread 线程。
static final class DefaultForkJoinWorkerThreadFactoryimplements ForkJoinWorkerThreadFactory {public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return new ForkJoinWorkerThread(pool);}}
在 ForkJoinWorkerThread 的构造方法中,会调用ForkJoinPool 的 registerWorker(this) 方法绑定一个工作队列,创建一个WorkQueue w = new WorkQueue(this, wt)。
protected ForkJoinWorkerThread(ForkJoinPool pool) {// Use a placeholder until a useful name can be set in registerWorkersuper("aForkJoinWorkerThread");this.pool = pool;this.workQueue = pool.registerWorker(this);}
在 WorkQueue 的构造方法中,绑定线程 owner 和 ForkJoinPool。
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {this.pool = pool;this.owner = owner;// Place indices in the center of array (that is not yet allocated)base = top = INITIAL_QUEUE_CAPACITY >>> 1;}
在上面的代码中,有一个外部的提交任务,ForkJoinTask
在compute()方法中,会进行任务的拆分,然后调用fork()方法把当前拆分的任务入队。
对于join方法,就是获取RecursiveTask中的result,也就是compute()的返回结果,join()方法会调用getRawResult()方法。
