CompletableFuture,默认依靠fork/join框架启动新的线程实现异步与并发的。
它提供了函数式编程的能力,可以通过回调函数的方式处理返回结果,并且提供了转换和组合CompletableFuture的方法。
0.测试用例
创建一个DeptService,模拟根据Id获取部门的方法getById(Integer id)
public class DeptService {
public Dept getById(Integer id) {
System.out.println("线程:" + Thread.currentThread().getName() + " getById(" + id + ")");
if (id == 1){
return new Dept(1, "研发一部");
} else if (id == 2){
return new Dept(2, "研发二部");
} else {
throw null;
}
}
}
创建一个UserService ,模拟getById()和save()这2个方法
public class UserService {
//根据Id获取User
public User getById(Integer id) throws Exception {
System.out.println("线程:" + Thread.currentThread().getName() + " getById(" + id + ")");
if (id == 1){
return new User(1, "meizuna", 20);
} else if (id == 2){
return new User(2, "misa", 18);
} else {
throw new Exception("未能找到人员");
}
}
//保存User
public User save(User user){
System.out.println("线程:" + Thread.currentThread().getName() + " save()," + user.toString());
return user;
}
}
1. supplyAsync、runAsync
CompletableFuture创建线程有2种方式:supplyAsync(有返回值)和:runAsync(无返回值)
1.1 supplyAsync(有返回值)
supplyAsync有2种,第二个需要多传1个线程池的实现:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public class Thread01_SupplyAsync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeptService deptService = new DeptService();
CompletableFuture<Dept> deptCompletableFuture = CompletableFuture.supplyAsync(() -> {
Dept dept = deptService.getById(1);
return dept;
});
System.out.println("线程:" + Thread.currentThread().getName() +
" 结果:" + deptCompletableFuture.get());
}
}
//线程:ForkJoinPool.commonPool-worker-1 getById(1)
//线程:main 结果:Dept{id=1, name='研发一部'}
1.2 runAsync(无返回值)
unAsync适用无返回值的情况
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
测试代码
public class Thread02_RunAsync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeptService deptService = new DeptService();
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
deptService.getById(1);
});
System.out.println("线程:" + Thread.currentThread().getName() +
" 结果:" + voidCompletableFuture.get());
}
}
//线程:ForkJoinPool.commonPool-worker-1 getById(1)
//线程:main 结果:null
2. thenApply、thenAccept、thenRun
2.1 thenApply 转换结果
thenApply是同步的
thenApplyAsync是异步的
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
Function<? super T, ? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
测试用例
public class Thread03_SupplyAsync_ThenApply {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeptService deptService = new DeptService();
UserService userService = new UserService();
User user = new User(1, "meizuna", 20);
CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {
Dept dept = deptService.getById(1);
return dept;
}).thenApplyAsync(dept -> {
//注意这里用到了上个线程的返回值dept
user.setDeptId(dept.getId());
user.setDeptName(dept.getName());
return userService.save(user);
});
System.out.println("线程:" + Thread.currentThread().getName() +
" 结果:" + userCompletableFuture.get().toString());
}
}
2.2 thenAccept 消费结果
thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是回调方法无返回值
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
测试用例
public class Thread04_SupplyAsync_ThenAccept {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeptService deptService = new DeptService();
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
Dept dept = deptService.getById(1);
return dept;
})
.thenAcceptAsync(dept -> {
//注意这里用到了上个线程的返回值dept
System.out.println("线程:" + Thread.currentThread().getName() +
"假设把dept作为日志记录发给Kafka: " + dept.toString());
//thenAccept是没有返回值的
});
System.out.println("线程:" + Thread.currentThread().getName() +
" 结果:" + voidCompletableFuture.get());
}
}
2.3 thenRun 任务完成后触发的回调
thenRun 是上一个任务完成后触发的回调,没有入参和返回值
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
测试用例
public class Thread05_SupplyAsync_ThenRun {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeptService deptService = new DeptService();
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
Dept dept = deptService.getById(1);
return dept;
})
.thenRun(() -> {//注意没有入参
System.out.println("线程:" + Thread.currentThread().getName() + " do something");
//thenRun注意没有入参,也没有返回值
});
System.out.println("线程:" + Thread.currentThread().getName() +
" 结果:" + voidCompletableFuture.get());
}
}
3. exceptionally
在用CompletableFuture编写多线程时,如果需要处理异常,可以用exceptionally,它的作用相当于catch
exceptionally的特点:
当出现异常时,会触发回调方法exceptionally
exceptionally中可指定默认返回结果,如果出现异常,则返回默认的返回结果
测试用例
public class Thread01_Exceptionally {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("抛出异常");
}
System.out.println("正常结束");
return 1.1;
})
.thenApply(result -> {
System.out.println("thenApply接收到的参数 = " + result);
return result;
})
.exceptionally(new Function<Throwable, Double>() {
@Override
public Double apply(Throwable throwable) {
System.out.println("异常:" + throwable.getMessage());
return 0.0;
}
});
System.out.println("最终返回的结果 = " + future.get());
}
}
4. whenComplete
当CompletableFuture的任务不论是正常完成还是出现异常都会调用whenComplete
正常完成:whenComplete返回结果和上级任务一致,异常为null;
出现异常:whenComplete返回结果为null,异常为上级任务的异常;
即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要处理该异常
public class Thread02_WhenComplete {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("出错了");
}
System.out.println("正常结束");
return 0.11;
}).whenComplete(new BiConsumer<Double, Throwable>() {
@Override
public void accept(Double aDouble, Throwable throwable) {
if (aDouble == null) {
System.out.println("whenComplete aDouble is null");
} else {
System.out.println("whenComplete aDouble is " + aDouble);
}
if (throwable == null) {
System.out.println("whenComplete throwable is null");
} else {
System.out.println("whenComplete throwable is " + throwable.getMessage());
}
}
});
System.out.println("最终返回的结果 = " + future.get());
}
}
4.1 whenComplete + exceptionally
public class Thread03_WhenComplete_Exceptionally {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("出错了");
}
System.out.println("正常结束");
return 0.11;
}).whenComplete(new BiConsumer<Double, Throwable>() {
@Override
public void accept(Double aDouble, Throwable throwable) {
if (aDouble == null){
System.out.println("whenComplete aDouble is null");
} else {
System.out.println("whenComplete aDouble is " + aDouble);
}
if (throwable == null){
System.out.println("whenComplete throwable is null");
} else {
System.out.println("whenComplete throwable is " + throwable.getMessage());
}
}
}).exceptionally(new Function<Throwable, Double>() {
@Override
public Double apply(Throwable throwable) {
System.out.println("exceptionally中异常:" + throwable.getMessage());
return 0.0;
}
});
System.out.println("最终返回的结果 = " + future.get());
}
}
5. handle()
不论正常返回还是出异常都会进入handle,类似whenComplete
handle()一般接收new BiFunction
T:就是任务传入的对象类型
Throwable:就是任务传入的异常
R:就是handle自己返回的对象类型
5.1 handle和thenApply的区别
thenApply:任务出现异常就不会进入thenApply
handle:任务出现异常也会进入handle,可对异常处理
5.2 handle和whenComplete的区别
handle对传入值进行转换,并产生自己的返回结果,T -> R
whenComplete的返回值和上级任务传入的结果一致,不能对其转换
测试用例
public class Thread04_Handle {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeptService deptService = new DeptService();
UserService userService = new UserService();
CompletableFuture<User> future = CompletableFuture
.supplyAsync(() -> {
//int a = 1 / 0;//如果出现异常,那么thenApply则不执行
return deptService.getById(1);
}
)
.handle(new BiFunction<Dept, Throwable, User>() {
@Override
public User apply(Dept dept, Throwable throwable) {
if (throwable != null){
System.out.println(throwable.getMessage());
return null;
} else {
User user = new User(1, "winter", 32, dept.getId(), dept.getName());
return userService.save(user);
}
}
}
);
System.out.println("线程:" + Thread.currentThread().getName() +
" 结果:" + future.get());
}
}
//正常
//线程:ForkJoinPool.commonPool-worker-1 Dept.getById(1)
//线程:main User.save(),User(id=1, name=winter, age=32, deptId=1, deptName=研发一部)
//线程:main 结果:User(id=1, name=winter, age=32, deptId=1, deptName=研发一部)
//异常
//java.lang.ArithmeticException: / by zero
//线程:main 结果:null
6. thenCompose、thenCombine
6.1 thenCompose
thenCompose 可以用于组合多个CompletableFuture,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
6.2 thenApply和thenCompose的区别
thenApply转换的是泛型中的类型,是同一个CompletableFuture,相当于将CompletableFuture
thenCompose用来连接两个CompletableFuture,是生成一个新的CompletableFuture
测试用例
public class Thread06_SupplyAsync_ThenCompose {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeptService deptService = new DeptService();
UserService userService = new UserService();
User user = new User(1, "meizuna", 20);
CompletableFuture<User> userCompletableFuture = CompletableFuture.supplyAsync(() -> {
Dept dept = deptService.getById(1);
return dept;
})
.thenCompose(dept -> CompletableFuture.supplyAsync(() -> {
//注意这里用到了上个线程的返回值dept
user.setDeptId(dept.getId());
user.setDeptName(dept.getName());
return userService.save(user);
}));
System.out.println("线程:" + Thread.currentThread().getName() +
" 结果:" + userCompletableFuture.get().toString());
}
}
6.3 thenCombine
thenCombine会在两个任务都执行完成后,把两个任务的结果合并
注意:
两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果
两个任务是并行执行的,它们之间并没有先后依赖顺序
测试用例
public class Thread10_ThenCombine {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeptService deptService = new DeptService();
UserService userService = new UserService();
//第1个任务:获取id=1的部门
CompletableFuture<Dept> deptFuture = CompletableFuture
.supplyAsync(() -> {
return deptService.getById(1);
}
);
//第2个任务:获取id=1的人员
CompletableFuture<User> userFuture = CompletableFuture
.supplyAsync(() -> {
try {
//int a = 1 / 0;//出了异常就报错
return userService.getById(1);
} catch (Exception e) {
e.printStackTrace();
}
return null;
});
//将上面2个任务的返回结果dept和user合并,返回新的user
CompletableFuture<User> resultFuture = deptFuture
.thenCombine(userFuture,
new BiFunction<Dept, User, User>() {
@Override
public User apply(Dept dept, User user) {
user.setDeptId(dept.getId());
user.setDeptName(dept.getName());
return userService.save(user);
}
}
);
System.out.println("线程:" + Thread.currentThread().getName() + " 结果:" + resultFuture.get());
}
}
7. allOf
场景:当有一批任务交给线程池执行,我们需要获取所有线程的返回结果
- Future的get()时阻塞的,如果循环get()每一个线程的结果,一个线程会卡住后面所有线程
- CompletionService的take().get()虽然不会因为某个线程阻塞后面的线程,但是功能不丰富
- CompletableFuture提供的功能丰富,使用简单,代码优雅
- Java9中CompletableFuture还添加了completeOnTimeout、orTimeout,方便对超时任务的处理
测试用例
- 一串数字1, 2, 3, 4, 5, 6, 7, 8, 9, 10
- 开启线程,执行乘以2的计算
- 其中任务2会抛出异常
- 要获取所有线程的返回结果和异常结果
这里用handle方法来处理线程的结果
public class Thread17_AllOf {
public static void main(String[] args) {
System.out.println("==========begin==========");
//记录开始时间
Long start = System.currentTimeMillis();
//定长10线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
//任务
final List<Integer> taskList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<String> resultList = new ArrayList<>();
Map<String, String> errorList = new HashMap<>();
Stream<CompletableFuture<String>> completableFutureStream = taskList.stream()
.map(num -> CompletableFuture
.supplyAsync(() -> getDouble(num), executor)
.handle((s, throwable) -> {
if (throwable == null) {
System.out.println("任务" + num + "完成! result=" + s + ", " + new Date());
resultList.add(s.toString());
} else {
System.out.println("任务" + num + "异常! e=" + throwable + ", " + new Date());
errorList.put(num.toString(), throwable.getMessage());
}
return "";
})
);
CompletableFuture[] completableFutures = completableFutureStream.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(completableFutures)
.whenComplete((v, th) -> {
System.out.println("所有任务执行完成触发\n resultList=" + resultList + "\n errorList=" + errorList + "\n耗时=" + (System.currentTimeMillis() - start));
}).join();
System.out.println("==========end==========");
//根据数字判断线程休眠的时间
public static Integer getDouble(Integer i) {
try {
if (i == 1) {
//任务1耗时3秒
Thread.sleep(3000);
} else if (i == 2) {
//任务2耗时1秒,还出错
Thread.sleep(1000);
throw new RuntimeException("出异常了");
} else {
//其它任务耗时1秒
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2 * i;
}
}