1.CompletableFuture简介

CompletableFuture 在 Java 里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture 实现了 Future, CompletionStage 接口,实现了 Future接口就可以兼容现在有线程池框架,而 CompletionStage 接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture 类。

2. Future与CcompletableFuture

Futrue 在 Java 里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个 Futrue,在 Future 里面有 isDone 方法来 判断任务是否处理结束,还有 get 方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future 的主要缺点如下:
(1)不支持手动完成
我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成
(2)不支持进一步的非阻塞调用
通过 Future 的 get 方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为 Future 不支持回调函数,所以无法实现这个功能
(3)不支持链式调用
对于 Future 的执行结果,我们想继续传到下一个 Future 处理使用,从而形成一个链式的 pipline 调用,这在 Future 中是没法实现的。
(4)不支持多个 Future 合并
比如我们有 10 个 Future 并行执行,我们想在所有的 Future 运行完毕之后,执行某些函数,是没法通过 Future 实现的。
(5)不支持异常处理
Future 的 API 没有任何的异常处理的 api,所以在异步运行时,如果出了问题是不好定位的。

3. CompletableFuture入门

3.1 使用CompletableFuture

场景:主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们在一个子线程中使其终止。

  1. /**
  2. * 主线程里面创建一个 CompletableFuture,然后主线程调用 get 方法会阻塞,最后我们
  3. 在一个子线程中使其终止
  4. * @param args
  5. */
  6. public static void main(String[] args) throws Exception{
  7. CompletableFuture<String> future = new CompletableFuture<>();
  8. new Thread(() -> {
  9. try{
  10. System.out.println(Thread.currentThread().getName() + "子线程开始干活");
  11. //子线程睡 5 秒
  12. Thread.sleep(5000);
  13. //在子线程中完成主线程
  14. future.complete("success");
  15. }catch (Exception e){
  16. e.printStackTrace();
  17. }
  18. }, "A").start();
  19. //主线程调用 get 方法阻塞
  20. System.out.println("主线程调用 get 方法获取结果为: " + future.get());
  21. System.out.println("主线程完成,阻塞结束!!!!!!");
  22. }

3.2 没有返回值的异步任务

  1. /**
  2. * 没有返回值的异步任务
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. //运行一个没有返回值的异步任务
  8. CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  9. try {
  10. System.out.println("子线程启动干活");
  11. Thread.sleep(5000);
  12. System.out.println("子线程完成");
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. }
  16. });
  17. //主线程阻塞
  18. future.get();
  19. System.out.println("主线程结束");
  20. }

3.3 有返回值的异步任务

  1. /**
  2. * 没有返回值的异步任务
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. //运行一个有返回值的异步任务
  8. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  9. try {
  10. System.out.println("子线程开始任务");
  11. Thread.sleep(5000);
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. }
  15. return "子线程完成了!";
  16. });
  17. //主线程阻塞
  18. String s = future.get();
  19. System.out.println("主线程结束, 子线程的结果为:" + s);
  20. }

3.4 线程依赖

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

  1. private static Integer num = 10;
  2. /**
  3. * 先对一个数加 10,然后取平方
  4. * @param args
  5. */
  6. public static void main(String[] args) throws Exception{
  7. System.out.println("主线程开始");
  8. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  9. try {
  10. System.out.println("加 10 任务开始");
  11. num += 10;
  12. } catch (Exception e) {
  13. e.printStackTrace();
  14. }
  15. return num;
  16. }).thenApply(integer -> {
  17. return num * num;
  18. });
  19. Integer integer = future.get();
  20. System.out.println("主线程结束, 子线程的结果为:" + integer);
  21. }

3.5 消费处理结果

thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture.supplyAsync(() -> {
  4. try {
  5. System.out.println("加 10 任务开始");
  6. num += 10;
  7. } catch (Exception e) {
  8. e.printStackTrace();
  9. }
  10. return num;
  11. }).thenApply(integer -> {
  12. return num * num;
  13. }).thenAccept(new Consumer<Integer>() {
  14. @Override
  15. public void accept(Integer integer) {
  16. System.out.println("子线程全部处理完成,最后调用了 accept,结果为:" +integer);
  17. }
  18. });
  19. }

3.6 异常处理

exceptionally 异常处理,出现异常时触发

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  4. int i= 1/0;
  5. System.out.println("加 10 任务开始");
  6. num += 10;
  7. return num;
  8. }).exceptionally(ex -> {
  9. System.out.println(ex.getMessage());
  10. return -1;
  11. });
  12. System.out.println(future.get());
  13. }

handle 类似于 thenAccept/thenRun 方法,是最后一步的处理调用,但是同时可以处理异常

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  4. System.out.println("加 10 任务开始");
  5. num += 10;
  6. return num;
  7. }).handle((i,ex) ->{
  8. System.out.println("进入 handle 方法");
  9. if(ex != null){
  10. System.out.println("发生了异常,内容为:" + ex.getMessage());
  11. return -1;
  12. }else{
  13. System.out.println("正常完成,内容为: " + i);
  14. return i;
  15. }
  16. });
  17. System.out.println(future.get());
  18. }

3.7 结果合并

thenCompose 合并两个有依赖关系的 CompletableFutures 的执行结果

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. //第一步加 10
  4. CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
  5. System.out.println("加 10 任务开始");
  6. num += 10;
  7. return num;
  8. });
  9. //合并
  10. CompletableFuture<Integer> future1 = future.thenCompose(i ->
  11. //再来一个 CompletableFuture
  12. CompletableFuture.supplyAsync(() -> {
  13. return i + 1;
  14. }));
  15. System.out.println(future.get());
  16. System.out.println(future1.get());
  17. }

thenCombine 合并两个没有依赖关系的 CompletableFutures 任务

  1. public static void main(String[] args) throws Exception{
  2. System.out.println("主线程开始");
  3. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  4. System.out.println("加 10 任务开始");
  5. num += 10;
  6. return num;
  7. });
  8. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  9. System.out.println("乘以 10 任务开始");
  10. num = num * 10;
  11. return num;
  12. });
  13. //合并两个结果
  14. CompletableFuture<Object> future = job1.thenCombine(job2, new BiFunction<Integer, Integer, List<Integer>>() {
  15. @Override
  16. public List<Integer> apply(Integer a, Integer b) {
  17. List<Integer> list = new ArrayList<>();
  18. list.add(a);
  19. list.add(b);
  20. return list;
  21. }
  22. });
  23. System.out.println("合并结果为:" + future.get());
  24. }

合并多个任务的结果 allOf 与 anyOf
allOf: 一系列独立的 future 任务,等其所有的任务执行完后做一些事情

  1. /**
  2. * 先对一个数加 10,然后取平方
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. List<CompletableFuture> list = new ArrayList<>();
  8. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  9. System.out.println("加 10 任务开始");
  10. num += 10;
  11. return num;
  12. });
  13. list.add(job1);
  14. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  15. System.out.println("乘以 10 任务开始");
  16. num = num * 10;
  17. return num;
  18. });
  19. list.add(job2);
  20. CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
  21. System.out.println("减以 10 任务开始");
  22. num = num * 10;
  23. return num;
  24. });
  25. list.add(job3);
  26. CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
  27. System.out.println("除以 10 任务开始");
  28. num = num * 10;
  29. return num;
  30. });
  31. list.add(job4);
  32. //多任务合并
  33. List<Integer> collect = list.stream().map(CompletableFuture<Integer>::join).collect(Collectors.toList());
  34. System.out.println(collect);
  35. }

anyOf: 只要在多个 future 里面有一个返回,整个任务就可以结束,而不需要等到每一个future 结束

  1. /**
  2. * 先对一个数加 10,然后取平方
  3. * @param args
  4. */
  5. public static void main(String[] args) throws Exception{
  6. System.out.println("主线程开始");
  7. CompletableFuture<Integer>[] futures = new CompletableFuture[4];
  8. CompletableFuture<Integer> job1 = CompletableFuture.supplyAsync(() -> {
  9. try{
  10. Thread.sleep(5000);
  11. System.out.println("加 10 任务开始");
  12. num += 10;
  13. return num;
  14. }catch (Exception e){
  15. return 0;
  16. }
  17. });
  18. futures[0] = job1;
  19. CompletableFuture<Integer> job2 = CompletableFuture.supplyAsync(() -> {
  20. try{
  21. Thread.sleep(2000);
  22. System.out.println("乘以 10 任务开始");
  23. num = num * 10;
  24. return num;
  25. }catch (Exception e){
  26. return 1;
  27. }
  28. });
  29. futures[1] = job2;
  30. CompletableFuture<Integer> job3 = CompletableFuture.supplyAsync(() -> {
  31. try{
  32. Thread.sleep(3000);
  33. System.out.println("减以 10 任务开始");
  34. num = num * 10;
  35. return num;
  36. }catch (Exception e){
  37. return 2;
  38. }
  39. });
  40. futures[2] = job3;
  41. CompletableFuture<Integer> job4 = CompletableFuture.supplyAsync(() -> {
  42. try{
  43. Thread.sleep(4000);
  44. System.out.println("除以 10 任务开始");num = num * 10;
  45. return num;
  46. }catch (Exception e){
  47. return 3;
  48. }
  49. });
  50. futures[3] = job4;
  51. CompletableFuture<Object> future = CompletableFuture.anyOf(futures);
  52. System.out.println(future.get());
  53. }