这篇文章主要讲 submit()、Future()、FutureTask()、

在上一篇文章《22 | Executor 与线程池:如何创建正确的线程池?》中,我们详细介绍了如何创建正确的线程池,那创建完线程池,我们该如何使用线程池呢?
在上一篇文章中,我们仅仅介绍了 ThreadPoolExecutor 的 void execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。而很多场景下,我们又都是需要获取任务的执行结果的。
那 ThreadPoolExecutor 是否提供了获取任务的执行结果相关功能呢?这么重要的功能当然提供了。
下面我们就来介绍一下使用 ThreadPoolExecutor 的时候,如何获取任务执行结果。

如何获取任务执行结果

Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。

  1. // 提交Runnable任务
  2. Future<?> submit(Runnable task);
  3. // 提交Callable任务
  4. <T> Future<T> submit(Callable<T> task);
  5. // 提交Runnable任务及结果引用
  6. <T> Future<T> submit(Runnable task, T result);

Future 接口你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是:

  • 取消任务的方法 cancel()
  • 判断任务是否已取消的方法 isCancelled()
  • 判断任务是否已结束的方法 isDone()
  • 2 个获得任务执行结果的 get() 和 get(timeout, unit)【get(timeout, unit) 支持超时机制】

通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

  1. // 取消任务
  2. boolean cancel(boolean mayInterruptIfRunning);
  3. // 判断任务是否已取消
  4. boolean isCancelled();
  5. // 判断任务是否已结束
  6. boolean isDone();
  7. // 获得任务执行结果
  8. get();
  9. // 获得任务执行结果,支持超时
  10. get(long timeout, TimeUnit unit);

这 3 个 submit() 方法之间的区别在于方法参数不同,下面我们简要介绍一下。

  1. 提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。【我的疑问:那调用 Future 对象的 get() 会发生什么? 】
  2. 提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。
  3. 提交 Runnable 任务及结果引用 submit(Runnable task, T result):这个方法很有意思,假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result。这个方法该怎么用呢?

下面这段示例代码展示了它的经典用法。需要你注意的是 Runnable 接口的实现类 Task 声明了一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过 result 主子线程可以共享数据。

  1. ExecutorService executor = Executors.newFixedThreadPool(1);
  2. // 创建Result对象r
  3. Result r = new Result();
  4. r.setAAA(a);
  5. // 提交任务
  6. Future<Result> future = executor.submit(new Task(r), r);
  7. Result fr = future.get();
  8. // 下面等式成立
  9. fr === r;
  10. fr.getAAA() === a;
  11. fr.getXXX() === x;
  12. class Task implements Runnable {
  13. Result r;
  14. // 通过构造函数传入 result
  15. Task(Result r) {
  16. this.r = r;
  17. }
  18. void run() {
  19. // 可以操作 result
  20. a = r.getAAA();
  21. r.setXXX(x);
  22. }
  23. }

下面我们再来介绍 FutureTask 工具类。前面我们提到的 Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似,所以这里我就不再赘述了。

  1. FutureTask(Callable<V> callable);
  2. FutureTask(Runnable runnable, V result);

那如何使用 FutureTask 呢?
其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。

  1. // 创建FutureTask
  2. FutureTask<Integer> futureTask = new FutureTask<>(() -> 1 + 2);
  3. // 创建线程池
  4. ExecutorService es = Executors.newCachedThreadPool();
  5. // 提交FutureTask(同样也有返回值)
  6. es.submit(futureTask);
  7. // 获取计算结果
  8. Integer result = futureTask.get();

FutureTask 对象直接被 Thread 执行的示例代码如下所示。
相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。

  1. // 创建FutureTask
  2. FutureTask<Integer> futureTask = new FutureTask<>(() -> 1 + 2);
  3. // 创建并启动线程
  4. Thread T1 = new Thread(futureTask);
  5. T1.start();
  6. // 获取计算结果
  7. Integer result = futureTask.get();

实现最优的“烧水泡茶”程序

使用 FutureTask 实现最优的“烧水泡茶”程序

记得以前初中语文课文里有一篇著名数学家华罗庚先生的文章《统筹方法》,这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:
image.png
下面我们用程序来模拟一下这个最优工序。
我们专栏前面曾经提到,并发编程可以总结为三个核心问题:分工、同步和互斥。
编写并发程序,首先要做的就是分工,所谓分工指的是:如何高效地拆解任务并分配给线程。
对于烧水泡茶这个程序,一种最优的分工方案可以是下图所示的这样:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。
对于 T1 的这个等待动作,你应该可以想出很多种办法,例如 Thread.join()、CountDownLatch,甚至阻塞队列都可以解决,不过今天我们用 Future 特性来实现。
image.png

下面的示例代码就是用这一章提到的 Future 特性来实现的。
首先,我们创建了两个 FutureTask——ft1 和 ft2:

  • ft1 完成洗水壶、烧开水、泡茶的任务;
  • ft2 完成洗茶壶、洗茶杯、拿茶叶的任务。

这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。

  1. // 创建任务T2的FutureTask
  2. FutureTask<String> ft2 = new FutureTask<>(new T2Task());
  3. // 创建任务T1的FutureTask
  4. FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));
  5. // 线程T1执行任务ft1
  6. Thread T1 = new Thread(ft1);
  7. T1.start();
  8. // 线程T2执行任务ft2
  9. Thread T2 = new Thread(ft2);
  10. T2.start();
  11. // 等待线程T1执行结果
  12. System.out.println(ft1.get());
  13. // T1Task需要执行的任务:
  14. // 洗水壶、烧开水、泡茶
  15. class T1Task implements Callable<String> {
  16. FutureTask<String> ft2;
  17. // T1任务需要T2任务的FutureTask
  18. T1Task(FutureTask<String> ft2) {
  19. this.ft2 = ft2;
  20. }
  21. @Override
  22. String call() throws Exception {
  23. System.out.println("T1:洗水壶...");
  24. TimeUnit.SECONDS.sleep(1);
  25. System.out.println("T1:烧开水...");
  26. TimeUnit.SECONDS.sleep(15);
  27. // 获取T2线程的茶叶
  28. String tf = ft2.get();
  29. System.out.println("T1:拿到茶叶:" + tf);
  30. System.out.println("T1:泡茶...");
  31. return "上茶:" + tf;
  32. }
  33. }
  34. // T2Task需要执行的任务:
  35. // 洗茶壶、洗茶杯、拿茶叶
  36. class T2Task implements Callable<String> {
  37. @Override
  38. String call() throws Exception {
  39. System.out.println("T2:洗茶壶...");
  40. TimeUnit.SECONDS.sleep(1);
  41. System.out.println("T2:洗茶杯...");
  42. TimeUnit.SECONDS.sleep(2);
  43. System.out.println("T2:拿茶叶...");
  44. TimeUnit.SECONDS.sleep(1);
  45. return "龙井";
  46. }
  47. }
  48. // 一次执行结果:
  49. T1:洗水壶...
  50. T2:洗茶壶...
  51. T1:烧开水...
  52. T2:洗茶杯...
  53. T2:拿茶叶...
  54. T1:拿到茶叶:龙井
  55. T1:泡茶...
  56. 上茶:龙井

总结

利用 Java 并发包提供的 Future 可以很容易获得异步任务的执行结果,无论异步任务是通过线程池 ThreadPoolExecutor 执行的,还是通过手工创建子线程来执行的。
Future 可以类比为现实世界里的提货单,比如去蛋糕店订生日蛋糕,蛋糕店都是先给你一张提货单,你拿到提货单之后,没有必要一直在店里等着,可以先去干点其他事,比如看场电影;等看完电影后,基本上蛋糕也做好了,然后你就可以凭提货单领蛋糕了。
利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。在分析这种问题的过程中,建议你用有向图描述一下任务之间的依赖关系,同时将线程的分工也做好,类似于烧水泡茶最优分工方案那幅图。对照图来写代码,好处是更形象,且不易出错。

课后思考

不久前听说小明要做一个询价应用,这个应用需要从三个电商询价,然后保存在自己的数据库里。
核心示例代码如下所示,由于是串行的,所以性能很慢,你来试着优化一下吧。

  1. // 向电商S1询价,并保存
  2. r1 = getPriceByS1();
  3. save(r1);
  4. // 向电商S2询价,并保存
  5. r2 = getPriceByS2();
  6. save(r2);
  7. // 向电商S3询价,并保存
  8. r3 = getPriceByS3();
  9. save(r3);