22 | Executor与线程池:如何创建正确的线程池?

创建对象,仅仅是在 JVM 的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本就很高了,所以线程是一个重量级的对象,应该避免频繁创建和销毁。

线程池是一种生产者 - 消费者模式

简化的线程池代码逻辑如下:

  1. //简化的线程池,仅用来说明工作原理
  2. class MyThreadPool{
  3. //利用阻塞队列实现生产者-消费者模式
  4. BlockingQueue<Runnable> workQueue;
  5. //保存内部工作线程
  6. List<WorkerThread> threads = new ArrayList<>();
  7. // 构造方法
  8. MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){
  9. this.workQueue = workQueue;
  10. // 创建工作线程
  11. for(int idx=0; idx<poolSize; idx++) {
  12. WorkerThread work = new WorkerThread();
  13. work.start();
  14. threads.add(work);
  15. }
  16. }
  17. // 提交任务
  18. void execute(Runnable command){
  19. workQueue.put(command);
  20. }
  21. // 工作线程负责消费任务,并执行任务
  22. class WorkerThread extends Thread{
  23. public void run() {
  24. //循环取任务并执行
  25. while(true){
  26. Runnable task = workQueue.take();
  27. task.run();
  28. }
  29. }
  30. }
  31. }
  32. /** 下面是使用示例 **/
  33. // 创建有界阻塞队列
  34. BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
  35. // 创建线程池
  36. MyThreadPool pool = new MyThreadPool(10, workQueue);
  37. // 提交任务
  38. pool.execute(()->{
  39. System.out.println("hello");
  40. });

ThreadPoolExecutor

ThreadPoolExecutor 的构造函数非常复杂,如下面代码所示,这个最完备的构造函数有 7 个参数。

  1. ThreadPoolExecutor(
  2. int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,
  7. ThreadFactory threadFactory,
  8. RejectedExecutionHandler handler)
  • corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
  • maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
  • keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
  • workQueue:工作队列,和上面示例代码的工作队列同义。
  • threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
  • handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
    • CallerRunsPolicy:提交任务的线程自己去执行该任务。
    • AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
    • DiscardPolicy:直接丢弃任务,没有任何异常抛出。
    • DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。


使用线程池要注意些什么

考虑到 ThreadPoolExecutor 的构造函数实在是有些复杂,所以 Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors 了。不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列
使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,你可以参考下面的示例代码。

  1. try {
  2. //业务逻辑
  3. } catch (RuntimeException x) {
  4. //按需处理
  5. } catch (Throwable x) {
  6. //按需处理
  7. }

课后思考

使用线程池,默认情况下创建的线程名字都类似pool-1-thread-2这样,没有业务含义。而很多情况下为了便于诊断问题,都需要给线程赋予一个有意义的名字,那你知道有哪些办法可以给线程池里的线程指定名字吗?

    1. 给线程池设置名称前缀 ```java
  1. 给线程池设置名称前缀 ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setThreadNamePrefix(“CUSTOM_NAME_PREFIX”);

  2. 在ThreadFactory中自定义名称前缀 class CustomThreadFactory implements ThreadFactory {

    1. @Override
    2. public Thread newThread(Runnable r) {
    3. Thread thread = new Thread("CUSTOM_NAME_PREFIX");
    4. return thread;
    5. }

    }

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new CustomThreadFactory(), new ThreadPoolExecutor.AbortPolicy() );

  1. <a name="ghVF0"></a>
  2. # 23 | Future:如何用多线程实现最优的“烧水泡茶”程序?
  3. ThreadPoolExecutor 的 void execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。
  4. <a name="wtKFc"></a>
  5. ## 如何获取任务执行结果
  6. Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。
  7. ```java
  8. // 提交Runnable任务
  9. //这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()
  10. Future<?> submit(Runnable task);
  11. // 提交Callable任务
  12. //这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果
  13. <T> Future<T> submit(Callable<T> task);
  14. // 提交Runnable任务及结果引用
  15. // 返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result
  16. <T> Future<T> submit(Runnable task, T result);

返回值都是 Future 接口,Future 接口有 5 个方法

  1. // 取消任务
  2. boolean cancel(boolean mayInterruptIfRunning);
  3. // 判断任务是否已取消
  4. boolean isCancelled();
  5. // 判断任务是否已结束
  6. boolean isDone();
  7. // 获得任务执行结果
  8. get();
  9. // 获得任务执行结果,支持超时
  10. get(long timeout, TimeUnit unit);

submit(Runnable task, T result) 使用方法

  1. ExecutorService executor = Executors.newFixedThreadPool(1);
  2. // 创建Result对象r
  3. Result r = new Result();
  4. r.setAAA(a);
  5. // 提交任务
  6. Future<Result> future = executor.submit(new Task(r), r);
  7. Result fr = future.get();
  8. // 下面等式成立
  9. //fr === r;
  10. //fr.getAAA() === a;
  11. //fr.getXXX() === x
  12. class Task implements Runnable{
  13. Result r;
  14. //通过构造函数传入result
  15. Task(Result r){
  16. this.r = r;
  17. }
  18. void run() {
  19. //可以操作result
  20. a = r.getAAA();
  21. r.setXXX(x);
  22. }
  23. }

FutureTask 工具类

两个构造函数

  1. FutureTask(Callable<V> callable);
  2. FutureTask(Runnable runnable, V result);

使用方法

FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。

  1. // 创建FutureTask
  2. FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
  3. // 创建线程池
  4. ExecutorService es = Executors.newCachedThreadPool();
  5. // 提交FutureTask
  6. es.submit(futureTask);
  7. // 获取计算结果
  8. Integer result = futureTask.get();

FutureTask 对象直接被 Thread 执行的示例代码如下所示。

  1. FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
  2. // 创建并启动线程
  3. Thread T1 = new Thread(futureTask);
  4. T1.start();
  5. // 获取计算结果
  6. Integer result = futureTask.get();

实现最优的“烧水泡茶”程序

22-27 并发工具类 - 图1
首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。

  1. // 创建任务T2的FutureTask
  2. FutureTask<String> ft2 = new FutureTask<>(new T2Task());
  3. // 创建任务T1的FutureTask
  4. FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));
  5. // 线程T1执行任务ft1
  6. Thread T1 = new Thread(ft1);
  7. T1.start();
  8. // 线程T2执行任务ft2
  9. Thread T2 = new Thread(ft2);
  10. T2.start();
  11. // 等待线程T1执行结果
  12. System.out.println(ft1.get());
  13. // T1Task需要执行的任务:
  14. // 洗水壶、烧开水、泡茶
  15. class T1Task implements Callable<String>{
  16. FutureTask<String> ft2;
  17. // T1任务需要T2任务的FutureTask
  18. T1Task(FutureTask<String> ft2){
  19. this.ft2 = ft2;
  20. }
  21. @Override
  22. String call() throws Exception {
  23. System.out.println("T1:洗水壶...");
  24. TimeUnit.SECONDS.sleep(1);
  25. System.out.println("T1:烧开水...");
  26. TimeUnit.SECONDS.sleep(15);
  27. // 获取T2线程的茶叶
  28. String tf = ft2.get();
  29. System.out.println("T1:拿到茶叶:"+tf);
  30. System.out.println("T1:泡茶...");
  31. return "上茶:" + tf;
  32. }
  33. }
  34. // T2Task需要执行的任务:
  35. // 洗茶壶、洗茶杯、拿茶叶
  36. class T2Task implements Callable<String> {
  37. @Override
  38. String call() throws Exception {
  39. System.out.println("T2:洗茶壶...");
  40. TimeUnit.SECONDS.sleep(1);
  41. System.out.println("T2:洗茶杯...");
  42. TimeUnit.SECONDS.sleep(2);
  43. System.out.println("T2:拿茶叶...");
  44. TimeUnit.SECONDS.sleep(1);
  45. return "龙井";
  46. }
  47. }
  48. // 一次执行结果:
  49. T1:洗水壶...
  50. T2:洗茶壶...
  51. T1:烧开水...
  52. T2:洗茶杯...
  53. T2:拿茶叶...
  54. T1:拿到茶叶:龙井
  55. T1:泡茶...
  56. 上茶:龙井

24 | CompletableFuture:异步编程没那么难

描述串行关系

  1. CompletionStage<R> thenApply(fn);
  2. CompletionStage<R> thenApplyAsync(fn);
  3. CompletionStage<Void> thenAccept(consumer);
  4. CompletionStage<Void> thenAcceptAsync(consumer);
  5. CompletionStage<Void> thenRun(action);
  6. CompletionStage<Void> thenRunAsync(action);
  7. CompletionStage<R> thenCompose(fn);
  8. CompletionStage<R> thenComposeAsync(fn);

描述 AND 汇聚关系

  1. CompletionStage<R> thenCombine(other, fn);
  2. CompletionStage<R> thenCombineAsync(other, fn);
  3. CompletionStage<Void> thenAcceptBoth(other, consumer);
  4. CompletionStage<Void> thenAcceptBothAsync(other, consumer);
  5. CompletionStage<Void> runAfterBoth(other, action);
  6. CompletionStage<Void> runAfterBothAsync(other, action);

描述 OR 汇聚关系

  1. CompletionStage applyToEither(other, fn);
  2. CompletionStage applyToEitherAsync(other, fn);
  3. CompletionStage acceptEither(other, consumer);
  4. CompletionStage acceptEitherAsync(other, consumer);
  5. CompletionStage runAfterEither(other, action);
  6. CompletionStage runAfterEitherAsync(other, action);

26 | Fork/Join:单机版的MapReduce

Fork/Join 的使用

Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。
ForkJoinTask 有两个子类——RecursiveAction RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法 compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。

  1. static void main(String[] args){
  2. //创建分治任务线程池
  3. ForkJoinPool fjp = new ForkJoinPool(4);
  4. //创建分治任务
  5. Fibonacci fib = new Fibonacci(30);
  6. //启动分治任务
  7. Integer result = fjp.invoke(fib);
  8. //输出结果
  9. System.out.println(result);
  10. }
  11. //递归任务
  12. static class Fibonacci extends RecursiveTask<Integer> {
  13. final int n;
  14. Fibonacci(int n) {this.n = n;}
  15. protected Integer compute(){
  16. if (n <= 1)
  17. return n;
  18. Fibonacci f1 = new Fibonacci(n - 1);
  19. //创建子任务
  20. f1.fork();
  21. Fibonacci f2 = new Fibonacci(n - 2);
  22. //等待子任务结果,并合并结果
  23. return f2.compute() + f1.join();
  24. }
  25. }

27 | 并发工具类模块热点问题答疑