直接继承Thread或者实现Runnable接口都可以创建线程,都有一个问题就是:没有返回值,也就是不能获取执行完的结果。
java1.5就提供了Callable接口,而Future和FutureTask就可以和Callable接口配合起来使用

Runnable缺陷:

  • 不能返回一个返回值
  • 不能抛出 checked Exception

Callable扩展
Callable的call方法可以有返回值,可以声明抛出异常。和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果

一、Callable&Future&FutureTask

Callable执行原理

FutureTask底层重写了run方法,线程启动后,正常调用run方法,run方法内部调用call方法。Callable在获取结果前,是通过park方法阻塞的,

Future 的主要功能

Future就是对具体的Runnable或者Callable任务,执行任务取消、查询是否完成、结果获取。必要时可以通过get方法获取执行结果,get()会阻塞直到任务返回结果。

  1. - boolean cancel (boolean mayInterruptIfRunning)** 取消任务**的执行。参数指定是否立即中断任务执行,或者等等任务结束
  2. - boolean isCancelled () 任务**是否已经取消**,任务正常完成前将其取消,则返回 true
  3. - boolean isDone () 任务**是否已经完成**。需要注意的是如果任务正常终止、异常或取消,都将返回true
  4. - V get () throws InterruptedException, ExecutionException **等待**任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
  5. - V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException

FutureTask

image.png
Future的实现类,FutureTask是消费者和生产者的桥梁消费者通过FutureTask存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。
FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果

具体使用

FutureTask,将生产者和消费者隔离;生产者仅仅需要讲任务封装成futuretask,提交给线程池执行;消费者通过futuretask对象,异步获取执行结果

  1. //1.把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,
  2. //2.然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,
  3. //3.最后还可以通过 FutureTask 获取任务执行的结果
  4. public static void main(String[] args) throws ExecutionException, InterruptedException {
  5. /**
  6. * 普通线程使用futureTask
  7. */
  8. FutureTask futureTask = new FutureTask(new Callable() {
  9. @Override
  10. public Object call() throws Exception {
  11. return "测试";
  12. }
  13. });
  14. new Thread(futureTask).start();
  15. System.out.println(futureTask.get());
  16. /**
  17. * 线程池使用futureTask
  18. */
  19. Future<?> submit1 = Executors.newCachedThreadPool().submit(futureTask);
  20. System.out.println(submit1.get());
  21. System.out.println(futureTask.get());
  22. /**
  23. * 线程池使用Callable
  24. */
  25. Future<String> submit = Executors.newCachedThreadPool().submit(new Callable<String>() {
  26. @Override
  27. public String call() throws Exception {
  28. return "线程池测试";
  29. }
  30. });
  31. System.out.println(submit.get());
  32. }
  33. 执行结果:
  34. 测试
  35. null
  36. 测试
  37. 线程池测试
  38. 由结果可以看出,线程池使用futureTask,返回值无意义

将串行操作,改造成并行,时间消耗执行时间最长的任务

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. FutureTask<String> ft1 = new FutureTask<>(new T1Task());
  3. FutureTask<String> ft2 = new FutureTask<>(new T2Task());
  4. FutureTask<String> ft3 = new FutureTask<>(new T3Task());
  5. FutureTask<String> ft4 = new FutureTask<>(new T4Task());
  6. FutureTask<String> ft5 = new FutureTask<>(new T5Task());
  7. //构建线程池
  8. ExecutorService executorService = Executors.newFixedThreadPool(5);
  9. executorService.submit(ft1);
  10. executorService.submit(ft2);
  11. executorService.submit(ft3);
  12. executorService.submit(ft4);
  13. executorService.submit(ft5);
  14. //获取执行结果
  15. System.out.println(ft1.get());
  16. System.out.println(ft2.get());
  17. System.out.println(ft3.get());
  18. System.out.println(ft4.get());
  19. System.out.println(ft5.get());
  20. executorService.shutdown();
  21. }
  22. static class T1Task implements Callable<String> {
  23. @Override
  24. public String call() throws Exception {
  25. System.out.println("T1:查询商品基本信息...");
  26. TimeUnit.MILLISECONDS.sleep(50);
  27. return "商品基本信息查询成功";
  28. }
  29. }
  30. static class T2Task implements Callable<String> {
  31. @Override
  32. public String call() throws Exception {
  33. System.out.println("T2:查询商品价格...");
  34. TimeUnit.MILLISECONDS.sleep(50);
  35. return "商品价格查询成功";
  36. }
  37. }
  38. static class T3Task implements Callable<String> {
  39. @Override
  40. public String call() throws Exception {
  41. System.out.println("T3:查询商品库存...");
  42. TimeUnit.MILLISECONDS.sleep(50);
  43. return "商品库存查询成功";
  44. }
  45. }
  46. static class T4Task implements Callable<String> {
  47. @Override
  48. public String call() throws Exception {
  49. System.out.println("T4:查询商品图片...");
  50. TimeUnit.MILLISECONDS.sleep(50);
  51. return "商品图片查询成功";
  52. }
  53. }
  54. static class T5Task implements Callable<String> {
  55. @Override
  56. public String call() throws Exception {
  57. System.out.println("T5:查询商品销售状态...");
  58. TimeUnit.MILLISECONDS.sleep(50);
  59. return "商品销售状态查询成功";
  60. }
  61. }

Future 注意事项

  1. - 循环批量获取 Future 的结果时容易 blockget 方法调用时应使用 timeout 限制
  2. - Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来

Future的局限性

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

  1. - **并发执行多任务**:Future只提供了get()方法来必须按照提交任务顺序,获取结果,并且是阻塞的。即:获取第一个结果后,才能获取第二个结果;
  2. - **无法对多个任务进行链式调用**:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  3. - **无法组合多个任务**:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;(可使用CountDownLatch或者CyaclicBarrier
  4. - **没有异常处理**:Future接口中没有关于异常处理的方法;

二、CompletionService使用

CompletionService解决了Callable+Future情况下需要按照执行顺序获取结果问题。
CompletionService主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序

CompletionService原理

内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果

应用场景总结

  1. - 当需要批量提交异步任务的时,建议使用CompletionServiceCompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
  2. - CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
  3. - 线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

三、CompletableFuture使用

CompletableFuture是Future接口的扩展和增强,CompletableFuture实现了对任务的编排能力。可以轻松地组织不同任务的运行顺序、规则以及方式。
CompletionStage接口: 执行某一个阶段,可向下执行后续阶段。异步执行,默认线程池是ForkJoinPool.commonPool()

应用场景

依赖关系:

  1. - thenApply() 把前面异步任务的结果,交给后面的Function
  2. - thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回

and聚合关系:

  1. - thenCombine:任务合并,有返回值
  2. - thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
  3. - runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。

or聚合关系:

  1. - applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。
  2. - acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
  3. - runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。

并行执行:

  1. - CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行