概述

在实际开发中Runnable等等线程的run方法是没有返回值结果的,如果我需要拿到线程执行完的结果,就需要Callable方法了.

Callable概念

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call(),这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

  1. @FunctionalInterface
  2. public interface Callable<V> {
  3. /**
  4. * Computes a result, or throws an exception if unable to do so.
  5. *
  6. * @return computed result
  7. * @throws Exception if unable to compute a result
  8. */
  9. V call() throws Exception;
  10. }

Callable一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本.

  1. ExecutorServicesubmit的三个重载方法:
  2. <T> Future<T> submit(Callable<T> task);
  3. submit提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future
  4. <T> Future<T> submit(Runnable task, T result);
  5. submit提交一个实现Runnable接口的任务,并且指定了在调用Futureget方法时返回的result对象。
  6. Future<?> submit(Runnable task);
  7. submit提交一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future

因此只需要创建好实现Callable接口或者Runnable接口,通过ExecutorService的submit方法提交给线程池执行即可.
实现Callable的任务执行后可返回值,而Runnable的任务是不能返回值(是void)。

Future概念

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

  1. Futureget方法
  2. V get() throws InterruptedException, ExecutionException;
  3. V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

它声明这样的五个方法:

cancel方法用来取消任务,当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。

isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
isDone方法表示任务是否已经完成,若任务完成,则返回true;
get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。同时会抛异常出来.

也就是说Future提供了三种功能:
判断任务是否完成;
能够中断任务;
能够获取任务执行结果。

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

FutureTask概念

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

Future是一个接口

  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2. /**
  3. * Sets this Future to the result of its computation
  4. * unless it has been cancelled.
  5. */
  6. void run();
  7. }

FutureTask实现了RunnableFuture,而RunnableFuture继承了 Future

  1. public class FutureTask<V> implements RunnableFuture<V> {

FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
Callable、Future和FutureTask - 图1
因此我们通过一个线程运行Callable,但是Thread不支持构造方法中传递Callable的实例,所以我们需要通过FutureTask把一个Callable包装成Runnable,然后再通过这个FutureTask拿到Callable运行后的返回值。
要new一个FutureTask的实例,有两种方法
Callable、Future和FutureTask - 图2

  1. /* Task实现了Callable接口*/
  2. FutureTask futureTask = new FutureTask<>(new Task());
  3. new Thread(futureTask).start();

FutureTask三种状态

1)未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
2)已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
3)已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;
当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。
当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完
成状态时,执行FutureTask.cancel(…)方法将返回false。

三者关系总结

线程池有sublime方法,可以传入Callable类型的值,会返回一个Future的子类FutureTask, 也就是这个关系.

实现Runnable接口和实现Callable接口的区别

1、Runnable是自从java1.1就有了,而Callable是1.5之后才加上去的。

2、Callable规定的方法是call(),Runnable规定的方法是run()。

3、Callable的任务执行后可返回值,而Runnable的任务是不能返回值(是void)。

4、call方法可以抛出异常,run方法不可以。

5、运行Callable任务可以拿到一个Future对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。通过Future对象可以了解任务执行情况,可取消任务的执行,还可获取执行结果。

6、加入线程池运行,Runnable使用ExecutorService的execute方法,Callable使用submit方法。

代码演示

获取子线程返回的结果

UseCallable

  1. import java.util.concurrent.Callable;
  2. public class UseCallable implements Callable<Integer> {
  3. private int sum;
  4. @Override
  5. public Integer call() throws Exception {
  6. System.out.println("Callable子线程开始计算了");
  7. for (int i = 0; i < 5000; i++) {
  8. sum = sum + i;
  9. }
  10. System.out.println("Callable子线程计算结束!结果为 : " + sum);
  11. return sum;
  12. }
  13. }

Demo01

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class Demo01 {

    public static void main(String[] args) {
        FutureTask<Integer> integerFutureTask = new FutureTask<>(new UseCallable());

        new Thread(integerFutureTask).start();
        try {
            /*当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;
            当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。
            */
            Integer integer = integerFutureTask.get();
            System.out.println("任务是否完成 : " + integerFutureTask.isDone());
            System.out.println("子线程返回过来的结果是 = " + integer);


        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }
}

控制台打印:

Callable子线程开始计算了
Callable子线程计算结束!结果为 : 12497500
任务是否完成 : true
子线程返回过来的结果是 = 12497500

获取子线程返回的实体类

Entity


public class Entity {
    private String name;
    private int sum;

    //**********************************


    @Override
    public String toString() {
        return "Entity{" +
                "name='" + name + '\'' +
                ", sum=" + sum +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getSum() {
        return sum;
    }

    public void setSum(int sum) {
        this.sum = sum;
    }
}

TestEntityCallable


import entity.Entity;

import java.util.concurrent.Callable;

public class TestEntityCallable implements Callable<Entity> {
    @Override
    public Entity call()  {
        Entity entity = new Entity();
        entity.setName("这个是实体的名字");
        entity.setSum(1111111);
        return entity;
    }
}

Demo02

import  entity.Entity;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class Demo02 {
    public static void main(String[] args) {

        TestEntityCallable testEntityCallable = new TestEntityCallable();

        FutureTask<Entity> entityFutureTask = new FutureTask<>(testEntityCallable);
        new Thread(entityFutureTask).start();
        try {
            Entity entity = entityFutureTask.get();

            System.out.println("子线程返回的结果是 = " + entity);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }
}

控制台打印结果:

子线程返回的结果是 = Entity{name='这个是实体的名字', sum=1111111}

终止任务

TaskCallable


public class TaskCallable implements Callable<String> {

    @Override
    public String call() throws Exception {
        while (true) {
            System.out.println("子线程一直在执行\n");
            Thread.sleep(100);

            if (Thread.currentThread().isInterrupted()) {
                //在while 死循环 没有执行到这里
                System.out.println("主线程终止了我");
                return null;

            }

        }

    }

}

Demo01



import utils.SleepTools;

import java.util.concurrent.*;

/**
 * 使用Future的cancel()方法来取消已经提交给执行者的任务
 */
public class Demo01 {

    public static void main(String[] args) {
        /*Task实现了Callable接口*/
        FutureTask futureTask = new FutureTask<>(new TaskCallable());
        new Thread(futureTask).start();
        /*
         * 那么cancel是如何工作的呢?
         *
         * 当你想要取消你已提交给执行者的任务,使用Future接口的cancel()方法。
         * 根据cancel()方法参数和任务的状态不同,这个方法的行为将不同:
         *      1、如果这个任务已经完成或之前的已经被取消或由于其他原因不能被取消,
         *          那么这个方法将会返回false并且这个任务不会被取消。
         *      2、如果这个任务正在等待执行者获取执行它的线程,那么这个任务将被取消而且不会开始他的执行。
         *          如果这个任务已经正在运行,则视方法的参数情况而定。
         *          cancel()方法接收一个Boolean值参数。
         *          如果参数为true并且任务正在运行,那么这个任务将被取消。
         *          如果参数为false并且任务正在运行,那么这个任务将不会被取消。
         */
        SleepTools.second(3);//睡眠三秒
        boolean cancel = futureTask.cancel(true);//终止任务

        boolean cancelled = futureTask.isCancelled();
        System.out.printf("Main取消:%s\n", cancelled);
        boolean done = futureTask.isDone();

        System.out.printf("任务是否完成了:%s\n", done);


    }
}

控制台输出:


子线程一直在执行

子线程一直在执行

子线程一直在执行

子线程一直在执行

子线程一直在执行

Main取消:true
任务是否完成了:true

Process finished with exit code 0

一个线程获取另外一个线程的结果

RecommendTask 推荐接口


import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @类说明:   调用推荐接口获取数据
 * 张俊杰 2020年10月21日 20:53
 */
public class RecommendTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println("RecommendTask: 调用推荐接口获取数据...");
        TimeUnit.SECONDS.sleep(1);

        System.out.println("RecommendTask: 得到推荐接口数据...");
        TimeUnit.SECONDS.sleep(10);
        return " [RecommendTask 板块数据] ";
    }
}

SearchTask 搜索Task


import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

/**
 * @类说明:  SearchTask 调用搜索接口数据,同时需要t1推荐接口数据
 * 张俊杰 2020年10月21日 20:54
 */
public class SearchTask implements Callable<String> {
    FutureTask<String> ft1;

    // SearchTask 任务需要 RecommendTask 任务的 FutureTask 返回结果去重
    SearchTask(FutureTask<String> ft1) {
        this.ft1 = ft1;
    }

    @Override
    public String call() throws Exception {
        System.out.println("SearchTask: 调用搜索接口获取数据...");
        TimeUnit.SECONDS.sleep(1);

        System.out.println("SearchTask: 得到搜索接口的数据...");
        TimeUnit.SECONDS.sleep(5);
        // 获取 T2 线程的数据
        System.out.println("SearchTask: 调用 RecommendTask.get() 接口获取推荐数据");
        String tf1 = ft1.get();
        System.out.println("SearchTask: 获取到推荐接口数据:" + tf1);

        System.out.println("SearchTask: 将 RecommendTask 与 SearchTask 板块数据做去重处理");
        return "[RecommendTask 和 SearchTask 板块数据聚合结果]";
    }
}

Demo1


import java.util.concurrent.FutureTask;

public class Demo01 {
    public static void main(String[] args) throws Exception {
        FutureTask<String> stringFutureTask = new FutureTask<>(new RecommendTask());
        Thread T1 = new Thread(stringFutureTask);
        T1.start();


        FutureTask<String> ft2 = new FutureTask<>(new SearchTask(stringFutureTask));
        Thread T2 = new Thread(ft2);
        T2.start();
        System.out.println("主线程获取结果"+ft2.get());
    }


}

搜索Task需要获取推荐Task的结果的值拼接在一起, 最后主线程获取搜索Task的接口

控制台打印:

RecommendTask: 调用推荐接口获取数据...
SearchTask: 调用搜索接口获取数据...
RecommendTask: 得到推荐接口数据...
SearchTask: 得到搜索接口的数据...
SearchTask: 调用 RecommendTask.get() 接口获取推荐数据
SearchTask: 获取到推荐接口数据: [RecommendTask 板块数据] 
SearchTask: 将 RecommendTask 与 SearchTask 板块数据做去重处理
主线程获取结果[RecommendTask 和 SearchTask 板块数据聚合结果]