CompletableFuture替代future的原因
future需要等待isDone为true才能知道任务跑完了。或者就是用get方法调用的时候会出现阻塞。而使用completableFuture的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
runAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final CompletableFuture completableFuture3 = CompletableFuture.runAsync(() -> {
System.out.println("completableFuture3 start");
System.out.println("completableFuture3是否为守护线程 : " + Thread.currentThread().isDaemon());
});
final CompletableFuture completableFuture4 = CompletableFuture.runAsync(() -> {
System.out.println("completableFuture4 start");
System.out.println("completableFuture4是否为守护线程 : " + Thread.currentThread().isDaemon());
},executorService);
System.out.println("main end");
}
//输出:
completableFuture3 start
completableFuture3是否为守护线程 : true
completableFuture4 start
main end
completableFuture4是否为守护线程 : false
- runAsync没有返回值
- 如果不指定线程池,那么默认线程池为forkJoinPool,并且为守护线程,其他所有非守护线程完成后,其也会中止
守护线程
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread start");
while(true){
System.out.println("thread is alive");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
thread.setDaemon(true);
thread.start();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
while(true){
System.out.println("thread1 is alive");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
thread1.start();
System.out.println("main end");
}
thread为守护线程,并且不断输出alive,thread1不是守护线程,而main线程开始就会很快结束。
此时thread不会因为main的结束而结束,而是继续输出,因为非守护线程thread1还在运行。
supplyAsync
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture1启动");
System.out.println("completableFuture1 是否为守护线程 " + Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("this lambda is executed by forkJoinPool");
return "result2";
});
final CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("completableFuture2启动");
System.out.println("completableFuture2 使用executorService 时是否为守护线程 : " + Thread.currentThread().isDaemon());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
System.out.println("aaaa");
e.printStackTrace();
}
System.out.println("completableFuture2 end");
return "result3";
}, executorService);
System.out.println(completableFuture1.get());
System.out.println(completableFuture2.get());
executorService.shutdown();
System.out.println("main end");
}
- supplyAsync有返回值
- 没有指定线程池的同样会被设为守护线程
allOf
allOf就是所有任务都完成时返回。但是是个Void的返回值
final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("futureOne InterruptedException");
}
return "futureOneResult";
});
final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
System.out.println("futureTwo InterruptedException");
}
return "futureTwoResult";
});
CompletableFuture future = CompletableFuture.allOf(futureOne, futureTwo);
System.out.println(future.get());
System.out.println("main end");
//输出:
null
main end
anyOf
anyOf是当入参的completableFuture组中有一个任务执行完毕就返回。返回结果是第一个完成的任务的结果。
public static void main(String[] args) throws ExecutionException, InterruptedException {
final CompletableFuture<String> futureOne = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("futureOne InterruptedException");
}
return "futureOneResult";
});
final CompletableFuture<String> futureTwo = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
System.out.println("futureTwo InterruptedException");
}
return "futureTwoResult";
});
CompletableFuture future = CompletableFuture.anyOf(futureOne, futureTwo);
System.out.println(future.get());
System.out.println("main end");
}
//输出:
futureOneResult
main end
thenAccept和exceptionally
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
System.out.println("futureOne InterruptedException");
}
//int a = 1/0;
return "futureOneResult";
},executorService).thenAccept((result) ->{
System.out.println("result:"+result);
}).exceptionally(e-> {
System.out.println("有异常");
e.printStackTrace();
return null;
});
System.out.println("main end");
executorService.shutdown();
}
CompletableFuture可以很好的和lamda表达式和流式处理配合