CompletableFuture替代future的原因

future需要等待isDone为true才能知道任务跑完了。或者就是用get方法调用的时候会出现阻塞。而使用completableFuture的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。

CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

runAsync

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. ExecutorService executorService = Executors.newCachedThreadPool();
  3. final CompletableFuture completableFuture3 = CompletableFuture.runAsync(() -> {
  4. System.out.println("completableFuture3 start");
  5. System.out.println("completableFuture3是否为守护线程 : " + Thread.currentThread().isDaemon());
  6. });
  7. final CompletableFuture completableFuture4 = CompletableFuture.runAsync(() -> {
  8. System.out.println("completableFuture4 start");
  9. System.out.println("completableFuture4是否为守护线程 : " + Thread.currentThread().isDaemon());
  10. },executorService);
  11. System.out.println("main end");
  12. }
  13. //输出:
  14. completableFuture3 start
  15. completableFuture3是否为守护线程 : true
  16. completableFuture4 start
  17. main end
  18. completableFuture4是否为守护线程 : false
  • runAsync没有返回值
  • 如果不指定线程池,那么默认线程池为forkJoinPool,并且为守护线程,其他所有非守护线程完成后,其也会中止

守护线程

  1. public static void main(String[] args) {
  2. Thread thread = new Thread(new Runnable() {
  3. @Override
  4. public void run() {
  5. System.out.println("thread start");
  6. while(true){
  7. System.out.println("thread is alive");
  8. try {
  9. TimeUnit.SECONDS.sleep(2);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. }
  15. });
  16. thread.setDaemon(true);
  17. thread.start();
  18. Thread thread1 = new Thread(new Runnable() {
  19. @Override
  20. public void run() {
  21. while(true){
  22. System.out.println("thread1 is alive");
  23. try {
  24. TimeUnit.SECONDS.sleep(2);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }
  30. });
  31. thread1.start();
  32. System.out.println("main end");
  33. }

thread为守护线程,并且不断输出alive,thread1不是守护线程,而main线程开始就会很快结束。
此时thread不会因为main的结束而结束,而是继续输出,因为非守护线程thread1还在运行。

supplyAsync

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. ExecutorService executorService = Executors.newCachedThreadPool();
  3. final CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
  4. System.out.println("completableFuture1启动");
  5. System.out.println("completableFuture1 是否为守护线程 " + Thread.currentThread().isDaemon());
  6. try {
  7. TimeUnit.SECONDS.sleep(2);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. System.out.println("this lambda is executed by forkJoinPool");
  12. return "result2";
  13. });
  14. final CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
  15. System.out.println("completableFuture2启动");
  16. System.out.println("completableFuture2 使用executorService 时是否为守护线程 : " + Thread.currentThread().isDaemon());
  17. try {
  18. TimeUnit.SECONDS.sleep(2);
  19. } catch (InterruptedException e) {
  20. System.out.println("aaaa");
  21. e.printStackTrace();
  22. }
  23. System.out.println("completableFuture2 end");
  24. return "result3";
  25. }, executorService);
  26. System.out.println(completableFuture1.get());
  27. System.out.println(completableFuture2.get());
  28. executorService.shutdown();
  29. System.out.println("main end");
  30. }
  • supplyAsync有返回值
  • 没有指定线程池的同样会被设为守护线程

allOf

allOf就是所有任务都完成时返回。但是是个Void的返回值

  1. final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {
  2. try {
  3. Thread.sleep(3000);
  4. } catch (InterruptedException e) {
  5. System.out.println("futureOne InterruptedException");
  6. }
  7. return "futureOneResult";
  8. });
  9. final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {
  10. try {
  11. Thread.sleep(6000);
  12. } catch (InterruptedException e) {
  13. System.out.println("futureTwo InterruptedException");
  14. }
  15. return "futureTwoResult";
  16. });
  17. CompletableFuture future = CompletableFuture.allOf(futureOne, futureTwo);
  18. System.out.println(future.get());
  19. System.out.println("main end");
  20. //输出:
  21. null
  22. main end

anyOf

anyOf是当入参的completableFuture组中有一个任务执行完毕就返回。返回结果是第一个完成的任务的结果。

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {
  3. try {
  4. Thread.sleep(3000);
  5. } catch (InterruptedException e) {
  6. System.out.println("futureOne InterruptedException");
  7. }
  8. return "futureOneResult";
  9. });
  10. final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {
  11. try {
  12. Thread.sleep(6000);
  13. } catch (InterruptedException e) {
  14. System.out.println("futureTwo InterruptedException");
  15. }
  16. return "futureTwoResult";
  17. });
  18. CompletableFuture future = CompletableFuture.anyOf(futureOne, futureTwo);
  19. System.out.println(future.get());
  20. System.out.println("main end");
  21. }
  22. //输出:
  23. futureOneResult
  24. main end

thenAccept和exceptionally

  1. public static void main(String[] args) throws ExecutionException, InterruptedException {
  2. ExecutorService executorService = Executors.newCachedThreadPool();
  3. CompletableFuture.supplyAsync(() -> {
  4. try {
  5. Thread.sleep(300);
  6. } catch (InterruptedException e) {
  7. System.out.println("futureOne InterruptedException");
  8. }
  9. //int a = 1/0;
  10. return "futureOneResult";
  11. },executorService).thenAccept((result) ->{
  12. System.out.println("result:"+result);
  13. }).exceptionally(e-> {
  14. System.out.println("有异常");
  15. e.printStackTrace();
  16. return null;
  17. });
  18. System.out.println("main end");
  19. executorService.shutdown();
  20. }

CompletableFuture可以很好的和lamda表达式和流式处理配合