22 | Executor与线程池:如何创建正确的线程池?
创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。
线程池是一种生产者 - 消费者模式
简化的线程池代码逻辑如下:
//简化的线程池,仅用来说明工作原理class MyThreadPool{//利用阻塞队列实现生产者-消费者模式BlockingQueue<Runnable> workQueue;//保存内部工作线程List<WorkerThread> threads = new ArrayList<>();// 构造方法MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){this.workQueue = workQueue;// 创建工作线程for(int idx=0; idx<poolSize; idx++) {WorkerThread work = new WorkerThread();work.start();threads.add(work);}}// 提交任务void execute(Runnable command){workQueue.put(command);}// 工作线程负责消费任务,并执行任务class WorkerThread extends Thread{public void run() {//循环取任务并执行while(true){ ①Runnable task = workQueue.take();task.run();}}}}/** 下面是使用示例 **/// 创建有界阻塞队列BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);// 创建线程池MyThreadPool pool = new MyThreadPool(10, workQueue);// 提交任务pool.execute(()->{System.out.println("hello");});
ThreadPoolExecutor
ThreadPoolExecutor 的构造函数非常复杂,如下面代码所示,这个最完备的构造函数有 7 个参数。
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
- corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
- maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
- keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
- workQueue:工作队列,和上面示例代码的工作队列同义。
- threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
- handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
- CallerRunsPolicy:提交任务的线程自己去执行该任务。
- AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
- DiscardPolicy:直接丢弃任务,没有任何异常抛出。
- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
使用线程池要注意些什么
考虑到 ThreadPoolExecutor 的构造函数实在是有些复杂,所以 Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors 了。不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。
使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,你可以参考下面的示例代码。
try {//业务逻辑} catch (RuntimeException x) {//按需处理} catch (Throwable x) {//按需处理}
课后思考
使用线程池,默认情况下创建的线程名字都类似pool-1-thread-2这样,没有业务含义。而很多情况下为了便于诊断问题,都需要给线程赋予一个有意义的名字,那你知道有哪些办法可以给线程池里的线程指定名字吗?
- 给线程池设置名称前缀 ```java
给线程池设置名称前缀 ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setThreadNamePrefix(“CUSTOM_NAME_PREFIX”);
在ThreadFactory中自定义名称前缀 class CustomThreadFactory implements ThreadFactory {
@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread("CUSTOM_NAME_PREFIX");return thread;}
}
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new CustomThreadFactory(), new ThreadPoolExecutor.AbortPolicy() );
<a name="ghVF0"></a># 23 | Future:如何用多线程实现最优的“烧水泡茶”程序?ThreadPoolExecutor 的 void execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。<a name="wtKFc"></a>## 如何获取任务执行结果Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。```java// 提交Runnable任务//这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()Future<?> submit(Runnable task);// 提交Callable任务//这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果<T> Future<T> submit(Callable<T> task);// 提交Runnable任务及结果引用// 返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result<T> Future<T> submit(Runnable task, T result);
返回值都是 Future 接口,Future 接口有 5 个方法
// 取消任务boolean cancel(boolean mayInterruptIfRunning);// 判断任务是否已取消boolean isCancelled();// 判断任务是否已结束boolean isDone();// 获得任务执行结果get();// 获得任务执行结果,支持超时get(long timeout, TimeUnit unit);
submit(Runnable task, T result) 使用方法
ExecutorService executor = Executors.newFixedThreadPool(1);// 创建Result对象rResult r = new Result();r.setAAA(a);// 提交任务Future<Result> future = executor.submit(new Task(r), r);Result fr = future.get();// 下面等式成立//fr === r;//fr.getAAA() === a;//fr.getXXX() === xclass Task implements Runnable{Result r;//通过构造函数传入resultTask(Result r){this.r = r;}void run() {//可以操作resulta = r.getAAA();r.setXXX(x);}}
FutureTask 工具类
两个构造函数
FutureTask(Callable<V> callable);FutureTask(Runnable runnable, V result);
使用方法
FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。
// 创建FutureTaskFutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);// 创建线程池ExecutorService es = Executors.newCachedThreadPool();// 提交FutureTaskes.submit(futureTask);// 获取计算结果Integer result = futureTask.get();
FutureTask 对象直接被 Thread 执行的示例代码如下所示。
FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);// 创建并启动线程Thread T1 = new Thread(futureTask);T1.start();// 获取计算结果Integer result = futureTask.get();
实现最优的“烧水泡茶”程序

首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。
// 创建任务T2的FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 创建任务T1的FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 线程T1执行任务ft1Thread T1 = new Thread(ft1);T1.start();// 线程T2执行任务ft2Thread T2 = new Thread(ft2);T2.start();// 等待线程T1执行结果System.out.println(ft1.get());// T1Task需要执行的任务:// 洗水壶、烧开水、泡茶class T1Task implements Callable<String>{FutureTask<String> ft2;// T1任务需要T2任务的FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@OverrideString call() throws Exception {System.out.println("T1:洗水壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T1:烧开水...");TimeUnit.SECONDS.sleep(15);// 获取T2线程的茶叶String tf = ft2.get();System.out.println("T1:拿到茶叶:"+tf);System.out.println("T1:泡茶...");return "上茶:" + tf;}}// T2Task需要执行的任务:// 洗茶壶、洗茶杯、拿茶叶class T2Task implements Callable<String> {@OverrideString call() throws Exception {System.out.println("T2:洗茶壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2:拿茶叶...");TimeUnit.SECONDS.sleep(1);return "龙井";}}// 一次执行结果:T1:洗水壶...T2:洗茶壶...T1:烧开水...T2:洗茶杯...T2:拿茶叶...T1:拿到茶叶:龙井T1:泡茶...上茶:龙井
24 | CompletableFuture:异步编程没那么难
描述串行关系
CompletionStage<R> thenApply(fn);CompletionStage<R> thenApplyAsync(fn);CompletionStage<Void> thenAccept(consumer);CompletionStage<Void> thenAcceptAsync(consumer);CompletionStage<Void> thenRun(action);CompletionStage<Void> thenRunAsync(action);CompletionStage<R> thenCompose(fn);CompletionStage<R> thenComposeAsync(fn);
描述 AND 汇聚关系
CompletionStage<R> thenCombine(other, fn);CompletionStage<R> thenCombineAsync(other, fn);CompletionStage<Void> thenAcceptBoth(other, consumer);CompletionStage<Void> thenAcceptBothAsync(other, consumer);CompletionStage<Void> runAfterBoth(other, action);CompletionStage<Void> runAfterBothAsync(other, action);
描述 OR 汇聚关系
CompletionStage applyToEither(other, fn);CompletionStage applyToEitherAsync(other, fn);CompletionStage acceptEither(other, consumer);CompletionStage acceptEitherAsync(other, consumer);CompletionStage runAfterEither(other, action);CompletionStage runAfterEitherAsync(other, action);
26 | Fork/Join:单机版的MapReduce
Fork/Join 的使用
Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。
ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法 compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。
static void main(String[] args){//创建分治任务线程池ForkJoinPool fjp = new ForkJoinPool(4);//创建分治任务Fibonacci fib = new Fibonacci(30);//启动分治任务Integer result = fjp.invoke(fib);//输出结果System.out.println(result);}//递归任务static class Fibonacci extends RecursiveTask<Integer> {final int n;Fibonacci(int n) {this.n = n;}protected Integer compute(){if (n <= 1)return n;Fibonacci f1 = new Fibonacci(n - 1);//创建子任务f1.fork();Fibonacci f2 = new Fibonacci(n - 2);//等待子任务结果,并合并结果return f2.compute() + f1.join();}}
