Callable

类比 Runnable,都可以创建线程,但是区别是有返回值
需要搭配Future FutureTask

  1. //无返回值
  2. //不能抛出异常
  3. @FunctionalInterface
  4. public interface Runnable {
  5. public abstract void run();
  6. }
  7. @FunctionalInterface
  8. public interface Callable<V> {
  9. V call() throws Exception;
  10. }

使用示例

  1. FutureTask task = new FutureTask(new Callable() {
  2. @Override
  3. public Object call() throws Exception {
  4. System.out.println("通过Callable方式执行任务");
  5. Thread.sleep(3000);
  6. return "返回任务结果";
  7. }
  8. });
  9. new Thread(task).start();

Future

接受Callable的任务,对这个返回的任务的操作

  • boolean cancel (boolean mayInterruptIfRunning) 取消任务,是立即取消还是等执行完取消
  • boolean isCancelled () 判断是否已取消
  • boolean isDone () 判断是否已完成
  • V get () 获取任务,阻塞式的获取结果
  • V get (long timeout, TimeUnit unit) ,同上,制定了阻塞的时间限制

image.png

FutureTask与Future,

实现关系
image.png
FutureTask既可以当做Runnable执行,也可以当做Future接受Callable的返回结果

使用示例

  1. public class FutureTaskDemo {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. Task task = new Task();
  4. //构建futureTask
  5. FutureTask<Integer> futureTask = new FutureTask<>(task);
  6. //作为Runnable入参
  7. new Thread(futureTask).start();
  8. System.out.println("task运行结果:"+futureTask.get());
  9. }
  10. static class Task implements Callable<Integer> {
  11. @Override
  12. public Integer call() throws Exception {
  13. System.out.println("子线程正在计算");
  14. int sum = 0;
  15. for (int i = 0; i < 100; i++) {
  16. sum += i;
  17. }
  18. return sum;
  19. }
  20. }
  21. }

使用案例:促销活动中的尚宁信息查询

image.png

  1. public class FutureTaskDemo2 {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. FutureTask<String> ft1 = new FutureTask<>(new T1Task());
  4. FutureTask<String> ft2 = new FutureTask<>(new T2Task());
  5. FutureTask<String> ft3 = new FutureTask<>(new T3Task());
  6. FutureTask<String> ft4 = new FutureTask<>(new T4Task());
  7. FutureTask<String> ft5 = new FutureTask<>(new T5Task());
  8. //构建线程池
  9. ExecutorService executorService = Executors.newFixedThreadPool(5);
  10. executorService.submit(ft1);
  11. executorService.submit(ft2);
  12. executorService.submit(ft3);
  13. executorService.submit(ft4);
  14. executorService.submit(ft5);
  15. //获取执行结果
  16. System.out.println(ft1.get());
  17. System.out.println(ft2.get());
  18. System.out.println(ft3.get());
  19. System.out.println(ft4.get());
  20. System.out.println(ft5.get());
  21. executorService.shutdown();
  22. }
  23. static class T1Task implements Callable<String> {
  24. @Override
  25. public String call() throws Exception {
  26. System.out.println("T1:查询商品基本信息...");
  27. TimeUnit.MILLISECONDS.sleep(50);
  28. return "商品基本信息查询成功";
  29. }
  30. }
  31. static class T2Task implements Callable<String> {
  32. @Override
  33. public String call() throws Exception {
  34. System.out.println("T2:查询商品价格...");
  35. TimeUnit.MILLISECONDS.sleep(50);
  36. return "商品价格查询成功";
  37. }
  38. }
  39. static class T3Task implements Callable<String> {
  40. @Override
  41. public String call() throws Exception {
  42. System.out.println("T3:查询商品库存...");
  43. TimeUnit.MILLISECONDS.sleep(50);
  44. return "商品库存查询成功";
  45. }
  46. }
  47. static class T4Task implements Callable<String> {
  48. @Override
  49. public String call() throws Exception {
  50. System.out.println("T4:查询商品图片...");
  51. TimeUnit.MILLISECONDS.sleep(50);
  52. return "商品图片查询成功";
  53. }
  54. }
  55. static class T5Task implements Callable<String> {
  56. @Override
  57. public String call() throws Exception {
  58. System.out.println("T5:查询商品销售状态...");
  59. TimeUnit.MILLISECONDS.sleep(50);
  60. return "商品销售状态查询成功";
  61. }
  62. }
  63. }

Future注意事项

建议使用get加timeout
思考: 使用Callable 和Future 产生新的线程了吗? 单独使用时并没有
局限性

  1. 并发执行多任务时:get是阻塞等待获取的
  2. 无法对多个任务进行链式调用,下面有方法可以实现
  3. 无法组合任务,下面的也可以实现
  4. 无异常处理,

    CompletionService

    image.png
    主要功能就是:一边生成任务,一边获取任务,让两个事分开执行,任务直接不会相互阻塞,不再依赖任务顺序

CompletionService原理

内部通过阻塞队列+FutureTask,先完成的可以先获取,
内部队列保存执行完成的Future,
take, poll获取一个已经执行完成的Future,再get结果

使用案

向不同平台询价,再保存

  1. // 创建线程池
  2. ExecutorService executor = Executors.newFixedThreadPool(3);
  3. // 异步向电商S1询价
  4. Future<Integer> f1 = executor.submit(()->getPriceByS1());
  5. // 异步向电商S2询价
  6. Future<Integer> f2= executor.submit(()->getPriceByS2());
  7. // 获取电商S1报价并异步保存
  8. executor.execute(()->save(f1.get()));
  9. // 获取电商S2报价并异步保存
  10. executor.execute(()->save(f2.get())
  11. //问题,就是f1会阻挡f2
  1. //创建线程池
  2. ExecutorService executor = Executors.newFixedThreadPool(10);
  3. //创建CompletionService
  4. CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
  5. //异步向电商S1询价
  6. cs.submit(() -> getPriceByS1());
  7. //异步向电商S2询价
  8. cs.submit(() -> getPriceByS2());
  9. //异步向电商S3询价
  10. cs.submit(() -> getPriceByS3());
  11. //将询价结果异步保存到数据库
  12. for (int i = 0; i < 3; i++) {
  13. Integer r = cs.take().get();
  14. executor.execute(() -> save(r));
  15. }

实现类似 Dubbo 的 Forking Cluster场景
Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行地调用多个服务实例,只要有一个成功就返回结果。

  1. // 创建线程池
  2. ExecutorService executor = Executors.newFixedThreadPool(3);
  3. // 创建CompletionService
  4. CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
  5. // 用于保存Future对象
  6. List<Future<Integer>> futures = new ArrayList<>(3);
  7. //提交异步任务,并保存future到futures
  8. futures.add(cs.submit(()->geocoderByS1()));
  9. futures.add(cs.submit(()->geocoderByS2()));
  10. futures.add(cs.submit(()->geocoderByS3()));
  11. // 获取最快返回的任务执行结果
  12. Integer r = 0;
  13. try {
  14. // 只要有一个成功返回,则break
  15. for (int i = 0; i < 3; ++i) {
  16. r = cs.take().get();
  17. //简单地通过判空来检查是否成功返回
  18. if (r != null) {
  19. break;
  20. }
  21. }
  22. } finally {
  23. //取消所有任务
  24. for(Future<Integer> f : futures)
  25. f.cancel(true);
  26. }
  27. // 返回结果

应用场景

  1. 批量提交异步任务时,有效各个
  2. 让异步的执行结果有序化,先执行完的先入阻塞队列
  3. 线程池隔离,各个业务不干扰

    CompletableFuture,重要

    让业务逻辑处理并行啊,聚合,依赖的,扩展,增强,
    image.png
    对Future扩展,增强,还有任务的编排能力
    CompletionStage: 默认使用时ForkJoinPool.commonPool线程池

重要API

依赖场景

  1. thenApply(),前面执行完,执行后面
  2. thenCompose(),链接有依赖关系的任务,

描述and聚合关系

  1. thenCombine: 任务合并
  2. thenAccepetBoth: 两个如未能执行完成交给后面的任务,无返回
  3. runAfterBoth,:l两个任务完成,执行下一个操作

描述or的关系

  1. applyToEither:两个任务谁快,执行谁
  2. acceptEither: 谁快消耗那个的结果
  3. runAfterEither::谁先完成,就下一步

并行

  1. anyOf
  2. allOf

创建异步操作
CompletableFuture 提供了四个静态方法来创建一个异步操作:

  1. //
  2. public static CompletableFuture<Void> runAsync(Runnable runnable)
  3. //不指定就用ForkJoinPool.commonPool() 指定就用executor
  4. //强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰
  5. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  6. //Supplier 接口的 get() 方法是有返回值的(会阻塞)
  7. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  8. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync&supplyAsync

  1. Runnable runnable = () -> System.out.println("执行无返回结果的异步任务");
  2. CompletableFuture.runAsync(runnable);
  3. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  4. System.out.println("执行有返回值的异步任务");
  5. try {
  6. Thread.sleep(5000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. return "Hello World";
  11. });
  12. String result = future.get();
  13. System.out.println(result);

获取结果
join&get,都是获取异步的结果,抛的异常不一样

whenComplete&exceptionally

  1. public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
  2. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  3. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
  • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
  • 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常

whenComplete&exceptionally

  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  2. try {
  3. TimeUnit.SECONDS.sleep(1);
  4. } catch (InterruptedException e) {
  5. }
  6. if (new Random().nextInt(10) % 2 == 0) {
  7. int i = 12 / 0;
  8. }
  9. System.out.println("执行结束!");
  10. return "test";
  11. });
  12. future.whenComplete(new BiConsumer<String, Throwable>() {
  13. @Override
  14. public void accept(String t, Throwable action) {
  15. System.out.println(t+" 执行完成!");
  16. }
  17. });
  18. //失败之后的处理
  19. future.exceptionally(new Function<Throwable, String>() {
  20. @Override
  21. public String apply(Throwable t) {
  22. System.out.println("执行失败:" + t.getMessage());
  23. return "异常xxxx";
  24. }
  25. }

thenApply,上个任务的结果作为入参

  1. public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  2. public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  1. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  2. int result = 100;
  3. System.out.println("一阶段:" + result);
  4. return result;
  5. }).thenApply(number -> {
  6. // number是上任务的结果
  7. int result = number * 3;
  8. System.out.println("二阶段:" + result);
  9. return result;
  10. });

thenCompose 返回的函数实现计算步骤的记过看实例d

  1. public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
  2. public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
  1. CompletableFuture<Integer> future = CompletableFuture
  2. .supplyAsync(new Supplier<Integer>() {
  3. @Override
  4. public Integer get() {
  5. int number = new Random().nextInt(30);
  6. System.out.println("第一阶段:" + number);
  7. return number;
  8. }
  9. })
  10. .thenCompose(new Function<Integer, CompletionStage<Integer>>() {
  11. @Override
  12. //入参param 技术是上个任务返回的number
  13. public CompletionStage<Integer> apply(Integer param) {
  14. return CompletableFuture.supplyAsync(new Supplier<Integer>() {
  15. @Override
  16. public Integer get() {
  17. int number = param * 2;
  18. System.out.println("第二阶段:" + number);
  19. return number;
  20. }
  21. });
  22. }
  23. });
  24. }
  25. System.out.println("最终结果: " + future.get());

thenApply 和 thenCompose的区别,有点类似

  • thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture;
  • thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个
    CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生
    成一个新的CompletableFuture。
  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
  2. CompletableFuture<String> result1 = future.thenApply(param -> param + " World");
  3. CompletableFuture<String> result2 = future
  4. .thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));
  5. System.out.println(result1.get());
  6. System.out.println(result2.get());

下面的是结果消费,只是执行action,不返回CompletableFuture

  • thenAccept系列:对单个结果进行消费
  • thenAcceptBoth系列:对两个结果进行消费
  • thenRun系列:不关心结果,只对结果执行Action

    thenAccept,没有返回值了

    ```java public CompletionStage thenAccept(Consumer<? super T> action); public CompletionStage thenAcceptAsync(Consumer<? super T> action);
  1. ```java
  2. CompletableFuture<Void> future = CompletableFuture
  3. .supplyAsync(() -> {
  4. int number = new Random().nextInt(10);
  5. System.out.println("第一阶段:" + number);
  6. return number;
  7. }).thenAccept(number ->
  8. System.out.println("第二阶段:" + number * 5));
  9. System.out.println("最终结果:" + future.get());
  10. //fuure 泛型是void

thenAcceptBoth,前面的两个都执行玩,执行action

  1. public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
  2. public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
  1. CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
  2. @Override
  3. public Integer get() {
  4. int number = new Random().nextInt(3) + 1;
  5. try {
  6. TimeUnit.SECONDS.sleep(number);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println("第一阶段:" + number);
  11. return number;
  12. }
  13. });
  14. CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
  15. @Override
  16. public Integer get() {
  17. int number = new Random().nextInt(3) + 1;
  18. try {
  19. TimeUnit.SECONDS.sleep(number);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. System.out.println("第二阶段:" + number);
  24. return number;
  25. }
  26. });
  27. //就时连个future 结束时在处理
  28. futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {
  29. @Override
  30. public void accept(Integer x, Integer y) {
  31. System.out.println("最终结果:" + (x + y));
  32. }
  33. }).join();

thenRun,再执行一段另一个逻辑,与上一个CompletabelFuture无关了

  1. public CompletionStage<Void> thenRun(Runnable action);
  2. public CompletionStage<Void> thenRunAsync(Runnable action);
  1. CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
  2. int number = new Random().nextInt(10);
  3. System.out.println("第一阶段:" + number);
  4. return number;
  5. }).thenRun(() ->
  6. System.out.println("thenRun 执行"));
  7. System.out.println("最终结果:" + future.get());

thenCombine 合并连个任务的结果,下一步处理

  1. public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
  2. public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
  1. CompletableFuture<Integer> future1 = CompletableFuture
  2. .supplyAsync(new Supplier<Integer>() {
  3. @Override
  4. public Integer get() {
  5. int number = new Random().nextInt(10);
  6. System.out.println("第一阶段:" + number);
  7. return number;
  8. }
  9. });
  10. CompletableFuture<Integer> future2 = CompletableFuture
  11. .supplyAsync(new Supplier<Integer>() {
  12. @Override
  13. public Integer get() {
  14. int number = new Random().nextInt(10);
  15. System.out.println("第二阶段:" + number);
  16. return number;
  17. }
  18. });
  19. //跟thenApply没啥区别
  20. CompletableFuture<Integer> result = future1
  21. .thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {
  22. @Override
  23. public Integer apply(Integer x, Integer y) {
  24. return x + y;
  25. }
  26. });
  27. System.out.println("最终结果:" + result.get());

applyToEither

  1. CompletableFuture<Integer> future1 = CompletableFuture
  2. .supplyAsync(new Supplier<Integer>() {
  3. @Override
  4. public Integer get() {
  5. int number = new Random().nextInt(10);
  6. System.out.println("第一阶段start:" + number);
  7. try {
  8. TimeUnit.SECONDS.sleep(number);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. System.out.println("第一阶段end:" + number);
  13. return number;
  14. }
  15. });
  16. CompletableFuture<Integer> future2 = CompletableFuture
  17. .supplyAsync(new Supplier<Integer>() {
  18. @Override
  19. public Integer get() {
  20. int number = new Random().nextInt(10);
  21. System.out.println("第二阶段start:" + number);
  22. try {
  23. TimeUnit.SECONDS.sleep(number);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. System.out.println("第二阶段end:" + number);
  28. return number;
  29. }
  30. });
  31. 就是获取运行比较快的那个任务的结果
  32. future1.applyToEither(future2, new Function<Integer, Integer>() {
  33. @Override
  34. public Integer apply(Integer number) {
  35. System.out.println("最快结果:" + number);
  36. return number * 2;
  37. }
  38. }).join();

acceptEither,获取执行最快的任务的结果,并有下一步的处理

  1. CompletableFuture<Integer> future1 = CompletableFuture
  2. .supplyAsync(new Supplier<Integer>() {
  3. @Override
  4. public Integer get() {
  5. int number = new Random().nextInt(10) + 1;
  6. try {
  7. TimeUnit.SECONDS.sleep(number);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. System.out.println("第一阶段:" + number);
  12. return number;
  13. }
  14. });
  15. CompletableFuture<Integer> future2 = CompletableFuture
  16. .supplyAsync(new Supplier<Integer>() {
  17. @Override
  18. public Integer get() {
  19. int number = new Random().nextInt(10) + 1;
  20. try {
  21. TimeUnit.SECONDS.sleep(number);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println("第二阶段:" + number);
  26. return number;
  27. }
  28. });
  29. future1.acceptEither(future2, new Consumer<Integer>() {
  30. @Override
  31. public void accept(Integer number) {
  32. System.out.println("最快结果:" + number);
  33. }
  34. }).join();

runAfterEither,两个任务,有一个完成,就执行下一步

  1. CompletableFuture<Integer> future1 = CompletableFuture
  2. .supplyAsync(new Supplier<Integer>() {
  3. @Override
  4. public Integer get() {
  5. int number = new Random().nextInt(5);
  6. try {
  7. TimeUnit.SECONDS.sleep(number);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. System.out.println("第一阶段:" + number);
  12. return number;
  13. }
  14. });
  15. CompletableFuture<Integer> future2 = CompletableFuture
  16. .supplyAsync(new Supplier<Integer>() {
  17. @Override
  18. public Integer get() {
  19. int number = new Random().nextInt(5);
  20. try {
  21. TimeUnit.SECONDS.sleep(number);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println("第二阶段:" + number);
  26. return number;
  27. }
  28. });
  29. future1.runAfterEither(future2, new Runnable() {
  30. @Override
  31. public void run() {
  32. System.out.println("已经有一个任务完成了");
  33. }
  34. }).join();

runAfterBoth 两个都完成菜执行下一步

  1. CompletableFuture<Integer> future1 = CompletableFuture
  2. .supplyAsync(new Supplier<Integer>() {
  3. @Override
  4. public Integer get() {
  5. try {
  6. TimeUnit.SECONDS.sleep(1);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. System.out.println("第一阶段:1");
  11. return 1;
  12. }
  13. });
  14. CompletableFuture<Integer> future2 = CompletableFuture
  15. .supplyAsync(new Supplier<Integer>() {
  16. @Override
  17. public Integer get() {
  18. try {
  19. TimeUnit.SECONDS.sleep(2);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. System.out.println("第二阶段:2");
  24. return 2;
  25. }
  26. });
  27. future1.runAfterBoth(future2, new Runnable() {
  28. @Override
  29. public void run() {
  30. System.out.println("上面两个任务都执行完成了。");
  31. }
  32. }).get();

anyOf 多个completableFuture ,有一个完成就返回这个的Future结果

  1. Random random = new Random();
  2. CompletableFuture<String> future1 = CompletableFuture
  3. .supplyAsync(() -> {
  4. try {
  5. TimeUnit.SECONDS.sleep(random.nextInt(5));
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. return "hello";
  10. });
  11. CompletableFuture<String> future2 = CompletableFuture
  12. .supplyAsync(() -> {
  13. try {
  14. TimeUnit.SECONDS.sleep(random.nextInt(1));
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. return "world";
  19. });
  20. CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
  21. System.out.println(result.get());

allOf
allOf方法用来实现多 CompletableFuture 的同时返回。

  1. CompletableFuture<String> future1 = CompletableFuture
  2. .supplyAsync(() -> {
  3. try {
  4. TimeUnit.SECONDS.sleep(2);
  5. } catch (InterruptedException e) {
  6. e.printStackTrace();
  7. }
  8. System.out.println("future1完成!");
  9. return "future1完成!";
  10. });
  11. CompletableFuture<String> future2 = CompletableFuture
  12. .supplyAsync(() -> {
  13. System.out.println("future2完成!");
  14. return "future2完成!";
  15. });
  16. CompletableFuture<Void> combindFuture = CompletableFuture
  17. .allOf(future1, future2);
  18. try {
  19. combindFuture.get();
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. } catch (ExecutionException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println("future1: " + future1.isDone() + ",future2: " +
  26. future2.isDone());

CompletableFuture常用方法总结
image.png

使用案例:实现最优的“烧水泡茶”程序

  1. public class FutureTaskDemo3{
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. // 创建任务T2的FutureTask
  4. FutureTask<String> ft2 = new FutureTask<>(new T2Task());
  5. // 创建任务T1的FutureTask
  6. FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));
  7. // 线程T1执行任务ft1
  8. Thread T1 = new Thread(ft1);
  9. T1.start();
  10. // 线程T2执行任务ft2
  11. Thread T2 = new Thread(ft2);
  12. T2.start();
  13. // 等待线程T1执行结果
  14. System.out.println(ft1.get());
  15. }
  16. }
  17. // T1Task需要执行的任务:
  18. // 洗水壶、烧开水、泡茶
  19. class T1Task implements Callable<String> {
  20. FutureTask<String> ft2;
  21. // T1任务需要T2任务的FutureTask
  22. T1Task(FutureTask<String> ft2){
  23. this.ft2 = ft2;
  24. }
  25. @Override
  26. public String call() throws Exception {
  27. System.out.println("T1:洗水壶...");
  28. TimeUnit.SECONDS.sleep(1);
  29. System.out.println("T1:烧开水...");
  30. TimeUnit.SECONDS.sleep(15);
  31. // 获取T2线程的茶叶
  32. String tf = ft2.get();
  33. System.out.println("T1:拿到茶叶:"+tf);
  34. System.out.println("T1:泡茶...");
  35. return "上茶:" + tf;
  36. }
  37. }
  38. // T2Task需要执行的任务:
  39. // 洗茶壶、洗茶杯、拿茶叶
  40. class T2Task implements Callable<String> {
  41. @Override
  42. public String call() throws Exception {
  43. System.out.println("T2:洗茶壶...");
  44. TimeUnit.SECONDS.sleep(1);
  45. System.out.println("T2:洗茶杯...");
  46. TimeUnit.SECONDS.sleep(2);
  47. System.out.println("T2:拿茶叶...");
  48. TimeUnit.SECONDS.sleep(1);
  49. return "龙井";
  50. }
  51. }
  1. public class CompletableFutureDemo2 {
  2. public static void main(String[] args) {
  3. //任务1:洗水壶->烧开水
  4. CompletableFuture<Void> f1 = CompletableFuture
  5. .runAsync(() -> {
  6. System.out.println("T1:洗水壶...");
  7. sleep(1, TimeUnit.SECONDS);
  8. System.out.println("T1:烧开水...");
  9. sleep(15, TimeUnit.SECONDS);
  10. });
  11. //任务2:洗茶壶->洗茶杯->拿茶叶
  12. CompletableFuture<String> f2 = CompletableFuture
  13. .supplyAsync(() -> {
  14. System.out.println("T2:洗茶壶...");
  15. sleep(1, TimeUnit.SECONDS);
  16. System.out.println("T2:洗茶杯...");
  17. sleep(2, TimeUnit.SECONDS);
  18. System.out.println("T2:拿茶叶...");
  19. sleep(1, TimeUnit.SECONDS);
  20. return "龙井";
  21. });
  22. //任务3:任务1和任务2完成后执行:泡茶
  23. CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
  24. System.out.println("T1:拿到茶叶:" + tf);
  25. System.out.println("T1:泡茶...");
  26. return "上茶:" + tf;
  27. });
  28. //等待任务3执行结果
  29. System.out.println(f3.join());
  30. }
  31. static void sleep(int t, TimeUnit u){
  32. try {
  33. u.sleep(t);
  34. } catch (InterruptedException e) {
  35. }
  36. }
  37. }