直接继承Thread或者实现Runnable接口都可以创建线程,都有一个问题就是:没有返回值,也就是不能获取执行完的结果。
java1.5就提供了Callable接口,而Future和FutureTask就可以和Callable接口配合起来使用
Runnable缺陷:
- 不能返回一个返回值
- 不能抛出 checked Exception
Callable扩展
Callable的call方法可以有返回值,可以声明抛出异常。和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果
一、Callable&Future&FutureTask
Callable执行原理
FutureTask底层重写了run方法,线程启动后,正常调用run方法,run方法内部调用call方法。Callable在获取结果前,是通过park方法阻塞的,
Future 的主要功能
Future就是对具体的Runnable或者Callable任务,执行任务取消、查询是否完成、结果获取。必要时可以通过get方法获取执行结果,get()会阻塞直到任务返回结果。
- boolean cancel (boolean mayInterruptIfRunning)** 取消任务**的执行。参数指定是否立即中断任务执行,或者等等任务结束
- boolean isCancelled () 任务**是否已经取消**,任务正常完成前将其取消,则返回 true
- boolean isDone () 任务**是否已经完成**。需要注意的是如果任务正常终止、异常或取消,都将返回true
- V get () throws InterruptedException, ExecutionException **等待**任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
- V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一样,多了设置超时时间。参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException
FutureTask
Future的实现类,FutureTask是消费者和生产者的桥梁,消费者通过FutureTask存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。
FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
具体使用
FutureTask,将生产者和消费者隔离;生产者仅仅需要讲任务封装成futuretask,提交给线程池执行;消费者通过futuretask对象,异步获取执行结果
//1.把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,
//2.然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,
//3.最后还可以通过 FutureTask 获取任务执行的结果
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 普通线程使用futureTask
*/
FutureTask futureTask = new FutureTask(new Callable() {
@Override
public Object call() throws Exception {
return "测试";
}
});
new Thread(futureTask).start();
System.out.println(futureTask.get());
/**
* 线程池使用futureTask
*/
Future<?> submit1 = Executors.newCachedThreadPool().submit(futureTask);
System.out.println(submit1.get());
System.out.println(futureTask.get());
/**
* 线程池使用Callable
*/
Future<String> submit = Executors.newCachedThreadPool().submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "线程池测试";
}
});
System.out.println(submit.get());
}
执行结果:
测试
null
测试
线程池测试
由结果可以看出,线程池使用futureTask,返回值无意义
将串行操作,改造成并行,时间消耗执行时间最长的任务
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> ft1 = new FutureTask<>(new T1Task());
FutureTask<String> ft2 = new FutureTask<>(new T2Task());
FutureTask<String> ft3 = new FutureTask<>(new T3Task());
FutureTask<String> ft4 = new FutureTask<>(new T4Task());
FutureTask<String> ft5 = new FutureTask<>(new T5Task());
//构建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(ft1);
executorService.submit(ft2);
executorService.submit(ft3);
executorService.submit(ft4);
executorService.submit(ft5);
//获取执行结果
System.out.println(ft1.get());
System.out.println(ft2.get());
System.out.println(ft3.get());
System.out.println(ft4.get());
System.out.println(ft5.get());
executorService.shutdown();
}
static class T1Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T1:查询商品基本信息...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品基本信息查询成功";
}
}
static class T2Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T2:查询商品价格...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品价格查询成功";
}
}
static class T3Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T3:查询商品库存...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品库存查询成功";
}
}
static class T4Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T4:查询商品图片...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品图片查询成功";
}
}
static class T5Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T5:查询商品销售状态...");
TimeUnit.MILLISECONDS.sleep(50);
return "商品销售状态查询成功";
}
}
Future 注意事项
- 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制
- Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来
Future的局限性
从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:
- **并发执行多任务**:Future只提供了get()方法来必须按照提交任务顺序,获取结果,并且是阻塞的。即:获取第一个结果后,才能获取第二个结果;
- **无法对多个任务进行链式调用**:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- **无法组合多个任务**:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;(可使用CountDownLatch或者CyaclicBarrier)
- **没有异常处理**:Future接口中没有关于异常处理的方法;
二、CompletionService使用
CompletionService解决了Callable+Future情况下需要按照执行顺序获取结果问题。
CompletionService主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序
CompletionService原理
内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果
应用场景总结
- 当需要批量提交异步任务的时,建议使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
- CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
- 线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
三、CompletableFuture使用
CompletableFuture是Future接口的扩展和增强,CompletableFuture实现了对任务的编排能力。可以轻松地组织不同任务的运行顺序、规则以及方式。
CompletionStage接口: 执行某一个阶段,可向下执行后续阶段。异步执行,默认线程池是ForkJoinPool.commonPool()
应用场景
依赖关系:
- thenApply() 把前面异步任务的结果,交给后面的Function
- thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
and聚合关系:
- thenCombine:任务合并,有返回值
- thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
- runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。
or聚合关系:
- applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。
- acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
- runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。
并行执行:
- CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行