Callable和Runnable的区别

Runnable :

  • 不能返回一个返回值
  • 不能抛出 Exception

    Callable:

  • call方法可以有返回值,可以声明抛出异常。和 Callable 配合的有一个 Future 类。

    1. new Thread(new Runnable() {
    2. @Override
    3. public void run() {
    4. System.out.println("Runnable 创建的任务");
    5. }
    6. }).start();
    7. FutureTask task = new FutureTask(new Callable() {
    8. @Override
    9. public Object call() throws Exception {
    10. System.out.println("Callable 创建的任务");
    11. Thread.sleep(3000);
    12. return "callable结果";
    13. }
    14. });
    15. new Thread(task).start();
    16. System.out.println("返回:" + task.get());

    截屏2022-04-06 23.29.18.png

Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

主要方法

  1. //取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束
  2. boolean cancel(boolean mayInterruptIfRunning)
  3. //任务是否已经取消,任务正常完成前将其取消,则返回 true
  4. boolean isCancelled ()
  5. //任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
  6. boolean isDone ()
  7. //等待任务执行结束,然后获得V类型的结果
  8. V get ()
  9. //等待任务执行结束,然后获得V类型的结果,可设置最长的等待时间,超过时间抛出异常
  10. V get (long timeout, TimeUnit unit)

使用案例-商品信息查询

在传统的查询商品中,如果我们要查询商品基本信息、商品价格、商品库存、商品图片、商品销售状态这5个接口的信息,我们一般用同步方式(也就是查完一个接口再调用另外的接口),这样我们查询完5个接口需要的时间是00ms-300ms之间。我们可以用Future并发查询来优化这些调用查询。

  1. public static void main(String[] args) throws InterruptedException, ExecutionException {
  2. FutureTask<String> ft1 = new FutureTask<>(new T1Task());
  3. FutureTask<String> ft2 = new FutureTask<>(new T2Task());
  4. FutureTask<String> ft3 = new FutureTask<>(new T3Task());
  5. FutureTask<String> ft4 = new FutureTask<>(new T4Task());
  6. FutureTask<String> ft5 = new FutureTask<>(new T5Task());
  7. //构建线程池
  8. ExecutorService executorService = Executors.newFixedThreadPool(5);
  9. executorService.submit(ft1);
  10. executorService.submit(ft2);
  11. executorService.submit(ft3);
  12. executorService.submit(ft4);
  13. executorService.submit(ft5);
  14. //获取执行结果
  15. System.out.println(ft1.get());
  16. System.out.println(ft2.get());
  17. System.out.println(ft3.get());
  18. System.out.println(ft4.get());
  19. System.out.println(ft5.get());
  20. executorService.shutdown();
  21. }
  22. static class T1Task implements Callable<String> {
  23. @Override
  24. public String call() throws Exception {
  25. System.out.println("T1:查询商品基本信息...");
  26. TimeUnit.MILLISECONDS.sleep(50);
  27. return "商品基本信息查询成功";
  28. }
  29. }
  30. static class T2Task implements Callable<String> {
  31. @Override
  32. public String call() throws Exception {
  33. System.out.println("T2:查询商品价格...");
  34. TimeUnit.MILLISECONDS.sleep(50);
  35. return "商品价格查询成功";
  36. }
  37. }
  38. static class T3Task implements Callable<String> {
  39. @Override
  40. public String call() throws Exception {
  41. System.out.println("T3:查询商品库存...");
  42. TimeUnit.MILLISECONDS.sleep(50);
  43. return "商品库存查询成功";
  44. }
  45. }
  46. static class T4Task implements Callable<String> {
  47. @Override
  48. public String call() throws Exception {
  49. System.out.println("T4:查询商品图片...");
  50. TimeUnit.MILLISECONDS.sleep(50);
  51. return "商品图片查询成功";
  52. }
  53. }
  54. static class T5Task implements Callable<String> {
  55. @Override
  56. public String call() throws Exception {
  57. System.out.println("T5:查询商品销售状态...");
  58. TimeUnit.MILLISECONDS.sleep(50);
  59. return "商品销售状态查询成功";
  60. }
  61. }

Future 注意事项

  • 当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制
  • Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来

Future的局限性

  • 并发执行多任务:虽然Future是并发执行多个任务,但是在调用get的时候是阻塞的,返回结果依赖get的顺序。例如上面的ft1.get()的时候,如果ft1拿到结果的时间很长,那下面的任务也要等ft1完成才能完成。
  • 无法对多个任务进行链式调用:例如你想在拿到商品信息后,再执行其他的逻辑,这点是没有的。
  • 无法组合多个任务:例如你想把商品价格、商品库存、商品图片组成在一起调用,这点也是没有的
  • 没有异常处理

CompletionService

CompletionService补充了Future的局限性。主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。
CompletionService内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果

使用案例-向不同电商平台询价

  1. //创建线程池
  2. ExecutorService executor = Executors.newFixedThreadPool(10);
  3. //创建CompletionService
  4. CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
  5. //异步向电商S1询价
  6. cs.submit(() -> getPriceByS1());
  7. //异步向电商S2询价
  8. cs.submit(() -> getPriceByS2());
  9. //异步向电商S3询价
  10. cs.submit(() -> getPriceByS3());
  11. //将询价结果异步保存到数据库
  12. for (int i = 0; i < 3; i++) {
  13. Integer r = cs.take().get();//这里会把最早完成的任务结果返回
  14. executor.execute(() -> save(r));
  15. }

CompletableFuture

CompletableFuture是Future接口的扩展和增强。在Future的基础上,提供依赖,聚合,并行执行等功能。

创建异步操作

  1. //runAsync 方法以Runnable函数式接口类型为参数,没有返回结果,executor为线程池
  2. public static CompletableFuture<Void> runAsync(Runnable runnable)
  3. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  4. //supplyAsync 方法Supplier函数式接口类型为参数,返回结果类型为U
  5. //Supplier 接口的 get() 方法是有返回值的(会阻塞),executor为线程池
  6. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  7. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

获得结果

join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck异常(即未经检查的异常)。get()方法抛出的是经过检查的异常。

结果处理

CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action

  1. public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
  2. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  3. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

结果转换

结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象

  1. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  2. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)