FutureTask源码结构

image.png

  • 作为线程:实现Runnable接口
  • 异步处理:实现了Future接口
  • 有返回值:构造注入Callable 提供了Callable功能

    1. /**
    2. * Creates a {@code FutureTask} that will, upon running, execute the
    3. * given {@code Callable}.
    4. *
    5. * @param callable the callable task
    6. * @throws NullPointerException if the callable is null
    7. */
    8. public FutureTask(Callable<V> callable) {
    9. if (callable == null)
    10. throw new NullPointerException();
    11. this.callable = callable;
    12. this.state = NEW; // ensure visibility of callable
    13. }

    FutureTask 实例用法

    FutureTask 往往搭配Callable接口使用:

    1. @Test
    2. void contextLoads() throws ExecutionException, InterruptedException {
    3. // 匿名内部类的方式,实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
    4. FutureTask<String> futureTask = new FutureTask<>(() -> {
    5. System.out.println(Thread.currentThread().getName() + " -----come in");
    6. TimeUnit.SECONDS.sleep(5);
    7. return "task over";
    8. });
    9. Thread t1 = new Thread(futureTask, "t1");
    10. t1.start();
    11. System.out.println(Thread.currentThread().getName()+" 忙其他任务");
    12. // 阻塞等待整个线程完成,获取返回结果
    13. System.out.println(futureTask.get());
    14. }

    get()缺点

  • futureTask.get() 调用get()方法求结果,如果计算机没有完成容易导致程序阻塞

    实际开发这并不这样写,这样的写发并不优雅,调get()容易造成阻塞,可以写成,多线程任务什么时候完成了isDone(),什么时候去调用

  1. @Test
  2. void contextLoads() throws ExecutionException, InterruptedException {
  3. // 匿名内部类的方式,实现Callable接口 + FutureTask (可以拿到返回结果,可以处理异常)
  4. FutureTask<String> futureTask = new FutureTask<>(() -> {
  5. System.out.println(Thread.currentThread().getName() + " -----come in");
  6. TimeUnit.SECONDS.sleep(5);
  7. return "task over";
  8. });
  9. Thread t1 = new Thread(futureTask, "t1");
  10. t1.start();
  11. System.out.println(Thread.currentThread().getName()+" 忙其他任务");
  12. // 阻塞等待整个线程完成,获取返回结果
  13. // System.out.println(futureTask.get());
  14. // 每500 毫秒判断是否完成
  15. while (true){
  16. if (futureTask.isDone()){
  17. System.out.println(futureTask.get());
  18. break;
  19. }else {
  20. TimeUnit.MILLISECONDS.sleep(500);
  21. System.out.println("正在处理中");
  22. }
  23. }
  24. }

isDone()缺点

  • 轮询的方式会耗费无谓的cpu资源,而且也不见得能及时得到计算结果,如果想要异步获取结果,通常都会以轮询的方式获取结果尽量不要阻塞

    Future对于结果的获取不是很友好,只能通过阻塞或轮询方式得到任务结果

CompletableFuture

get()方法在Future计算完成之前会一直处于阻塞状态下,
isDone()方法容易消耗CPU资源
对于真正的异步处理,希望是可以通过传入回调函数,在Future结束时自动调用该回调函数,这样我们就不用等待结果
阻塞的方式和异步编程的设计理念相违背,而轮询的方式耗费无谓的CPU资源。因此
JDK8设计CompaletableFuture。
CompltetableFuture提供了一种观模式类似的机制,可以让任务执行完成后通知监听的一方。

类架构说明

image.png

  1. public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

CompletionStage

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成后可能会触发另外一个阶段
  • 一个阶段的计算执行是一个Function,Consumer或者Runnable。比如:stage.thenApply(
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

代表异步计算过成中的某一个阶段,一个阶段完成后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符穿参数。

介绍

在java8中,CompletabeleFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。
它可能代表一个明确完成的Future,也可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
它实现了Future和CompletionStage接口

创建方式

没有指定Executor的方法,直接用默认的ForkJoinPool.commonPool()作为它的线程池执行异步代码。
如果指定线程池,则使用自定义或者特别指定的线程池执行异步代码

runAsync 无返回值

  • public static CompletableFuture runAsync(Runnable runnable)
  • public static CompletableFuture runAsync(Runnable runnable,Executor executor)
    1. @Test
    2. public void test1() throws ExecutionException, InterruptedException {
    3. CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
    4. System.out.println(Thread.currentThread().getName());
    5. try {
    6. TimeUnit.SECONDS.sleep(1);
    7. } catch (InterruptedException e) {
    8. throw new RuntimeException(e);
    9. }
    10. });
    11. System.out.println(voidCompletableFuture.get());
    12. }

supplyAsync 有返回值

public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)

CompletableFuture 回调方法

whenComplete

    1. public static void main(String[] args) {
    2. ExecutorService threadPool = Executors.newFixedThreadPool(3);
    3. try {
    4. CompletableFuture.supplyAsync(() -> {
    5. System.out.println(Thread.currentThread().getName() + "---come in");
    6. int i = ThreadLocalRandom.current().nextInt(10);
    7. try {
    8. TimeUnit.SECONDS.sleep(1);
    9. } catch (InterruptedException e) {
    10. throw new RuntimeException(e);
    11. }
    12. System.out.println("---- 1秒后出结果" + i);
    13. return i;
    14. }, threadPool).whenComplete((v, e) -> {
    15. if (e == null) {
    16. System.out.println("----计算完成,更新系统:" + v);
    17. }
    18. }).exceptionally(e -> {
    19. e.printStackTrace();
    20. System.out.println("一场情况" + e.getCause() + "\t" + e.getMessage());
    21. return null;
    22. });
    23. System.out.println(Thread.currentThread().getName() + "线程去忙其他任务了");
    24. } catch (Exception e) {
    25. e.printStackTrace();
    26. } finally {
    27. threadPool.shutdown();
    28. }
    29. }