1.先给你一张凭据

假设有个任务需要执行比较长的的时间,通常需要等待任务执行结束或者出错才能返回结果, 在此期间调用者只能陷入阻塞苦苦等待, 对此, Future设计模式提供了一种凭据式的解决方案。在我们日常生活中,关于凭据的使用非常多见,比如你去某西服手工作坊想订做一身合体修身的西服,西服的制作过程比较漫长,少则一个礼拜,多则一个月,你不可能一直待在原地等待, 一般来说作坊会为你开一个凭据, 此凭据就是Future, 在接下来的任意日子里你可以凭借此凭据到作坊获取西服。在本章中,我们将通过程序的方式实现Future设计模式, 让读者体会这种设计的好处。

2.Future设计模式实现

Future设计模式所涉及的关键接口和它们之间的关系UML图
image.png

2.1接口定义

1.Future接口设计

Future提供了获取计算结果和判断任务是否完成的两个接口, 其中获取计算结果将会导致调用阻塞(在任务还未完成的情况下),相关代码如所示

  1. public interface Future <T>{
  2. // 返回计算后的结果,该方法会陷入阻塞状态
  3. T get() throws InterruptedException;
  4. // 判断任务是否已经执行完成
  5. boolean done();
  6. }

2.FutureService接口设计

FutureService主要用于提交任务, 提交的任务主要有两种, 第一种不需要返回值, 第二种则需要获得最终的计算结果。FutureService接口中提供了对FutureServiceImpl构建的工厂方法, JDK8中不仅支持default方法还支持静态方法, JDK 9甚至还支持接口私有方法。FutureService接口的设计代码如所示。

  1. public interface FutureService<IN, OUT> {
  2. // 提交不需要返回值的任务,Future.get方法返回的将会是null
  3. Future<?> submit(Runnable runnable);
  4. // 提交需要返回值的任务,其中Task接口代替了Runnable接口
  5. Future<OUT> submit(Task<IN,OUT> task, IN input);
  6. // 使用静态方法创建一个FutureService的实现
  7. static <T,R> FutureService<T,R> newService() {
  8. return new FutureServiceImpl<>();
  9. }
  10. }

3.Task接口设计

Task接口主要是提供给调用者实现计算逻辑之用的, 可以接受一个参数并且返回最终的计算结果, 这一点非常类似于JDK 1.5中的Callable接口, Task接口的设计代码如所示。

  1. @FunctionalInterface
  2. public interface Task<IN,OUT> {
  3. // 给定一个参数,经过计算返回结果
  4. OUT get(IN input);
  5. }

2.2 程序实现

1.FutureTask

FutureTask是Future的一个实现,除了实现Future中定义的get() 以及done() 方法, 还额外增加了protected方法finish, 该方法主要用于接收任务被完成的通知, FutureTask接口的设计代码如所示。

  1. public class FutureTask <T> implements Future<T> {
  2. // 计算结果
  3. private T result;
  4. // 任务是否完成
  5. private boolean isDone = false;
  6. // 定义对象锁
  7. private final Object LOCK = new Object();
  8. @Override
  9. public T get() throws InterruptedException {
  10. synchronized (LOCK) {
  11. // 当任务还没有完成时,调用get方法会被挂起而进入堵塞
  12. while(!isDone) {
  13. LOCK.wait();
  14. }
  15. }
  16. // 返回最终计算结果
  17. return result;
  18. }
  19. protected void finish(T result) {
  20. synchronized (LOCK) {
  21. // balking设计模式
  22. if ( isDone )
  23. return;
  24. // 计算完成,为result指定结果,并且将isDone设为true,同事唤醒阻塞中的线程
  25. this.result = result;
  26. this.isDone = true;
  27. LOCK.notifyAll();
  28. }
  29. }
  30. // 返回当前任务是否已经完成
  31. @Override
  32. public boolean done() {
  33. return isDone;
  34. }
  35. }

FutureTask中充分利用了线程间的通信wait和notifyAll, 当任务没有被完成之前通过get方法获取结果, 调用者会进入阻塞, 直到任务完成并接收到其他线程的唤醒信号, finish方法接收到了任务完成通知, 唤醒了因调用get而进入阻塞的线程。

2.FutureServiceImpl

  1. import java.util.concurrent.atomic.AtomicInteger;
  2. /*
  3. FutureServiceImpl的主要作用在于当提交任务时创建一个新的线程来受理该任务,进而达到任务异步执行的效果
  4. */
  5. public class FutureServiceImpl<IN, OUT> implements FutureService<IN, OUT>{
  6. // 为执行的线程指定名字前缀(再三强调,为线程起一个特殊的名字是一个非常好的编程习惯)
  7. private final static String FUTURE_THREAD_PREFIX = "FUTURE-";
  8. private final AtomicInteger nextCounter = new AtomicInteger(0);
  9. private String getNextName() {
  10. return FUTURE_THREAD_PREFIX + nextCounter.getAndIncrement();
  11. }
  12. @Override
  13. public Future<?> submit(Runnable runnable) {
  14. final FutureTask<Void> future = new FutureTask<>();
  15. new Thread(
  16. ()->{
  17. runnable.run();
  18. // 任务执行结束之后将null作为结果传给future
  19. future.finish(null);
  20. },getNextName()
  21. ).start();
  22. return future;
  23. }
  24. @Override
  25. public Future<OUT> submit(Task<IN, OUT> task, IN input) {
  26. final FutureTask<OUT> future = new FutureTask<>();
  27. new Thread(
  28. () -> {
  29. OUT result = task.get(input);
  30. // 任务执行结束之后,将真实的结果通过finish方法传递给future
  31. future.finish(result);
  32. }, getNextName()
  33. ).start();
  34. return future;
  35. }
  36. }

3.Future的使用以及技巧总结

Future直译是“未来”的意思, 主要是将一些耗时的操作交给一个线程去执行, 从而达到异步的目的,提交线程在提交任务和获得计算结果的过程中可以进行其他的任务执行,而不至于傻傻等待结果的返回。
我们提供了两种任务的提交(无返回值和有返回值)方式,在这里分别对其进行测试。无返回值的任务提交测试如下:

  1. import java.util.concurrent.TimeUnit;
  2. public class FutureServiceClient {
  3. public static void main(String[] args) throws InterruptedException {
  4. // 定义不需要返回值的FutureService
  5. FutureService<Void, Void> service = FutureService.newService();
  6. // submit方法为立即返回的方法
  7. Future<?> future = service.submit(
  8. ()-> {
  9. try {
  10. TimeUnit.SECONDS.sleep(10);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println("I am finish done.");
  15. }
  16. );
  17. // get方法会使当前线程进入阻塞
  18. future.get();
  19. }
  20. }

测试有返回值的

  1. import java.util.concurrent.TimeUnit;
  2. public class FutureServiceClient1 {
  3. public static void main(String[] args) throws InterruptedException {
  4. // 定义有返回值的FutureService
  5. FutureService<String,Integer> service = FutureService.newService();
  6. // submit方法会立即返回
  7. Future future = service.submit( input -> {
  8. try {
  9. TimeUnit.SECONDS.sleep(10);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. return input.length();
  14. }, "Hello"
  15. );
  16. // get方法使当前线程进入阻塞,最周会返回计算的结果
  17. System.out.println(future.get());
  18. }
  19. }

4.增强FutureService使其支持回调

使用任务完成时回调的机制可以让调用者不再进行显式地通过get的方式获得数据而导致进入阻塞, 可在提交任务的时候将回调接口一并注入, 在这里对FutureService接口稍作修改,修改代码如所示。

  1. // 增加回调接口Callback,当任务执行结束之后,Callback会得到执行
  2. @Override
  3. public Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback) {
  4. final FutureTask<OUT> future = new FutureTask<>();
  5. new Thread(
  6. () -> {
  7. OUT result = task.get(input);
  8. future.finish(result);
  9. // 执行回调
  10. if(null != callback)
  11. callback.call(result);
  12. }
  13. , getNextName()).start();
  14. return future;
  15. }

修改后的submit方法, 增加了一个Callback参数, 主要用来接受并处理任务的计算结果, 当提交的任务执行完成之后, 会将结果传递给Callback接口进行进一步的执行, 这样在提交任务之后不再会因为通过get方法获得结果而陷入阻塞。
Callback接口非常简单, 非常类似于JDK 8中的Consumer函数式接口, Callback接口的代码如所示。

  1. @FunctionalInterface
  2. public interface Callback <T>{
  3. // 任务完成后会调用该方法,其中T为任务执行后的结果
  4. void call(T t);
  5. }

测试代码:

  1. import java.util.concurrent.TimeUnit;
  2. public class FutureServiceClient2 {
  3. public static void main(String[] args) throws InterruptedException {
  4. // 定义不需要返回值的FutureService
  5. FutureService<String, Integer> service = FutureService.newService();
  6. // submit方法为立即返回的方法
  7. Future<?> future = service.submit(
  8. input-> {
  9. try {
  10. TimeUnit.SECONDS.sleep(10);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. return input.length();
  15. },"HelloWorld", System.out::println
  16. );
  17. // get方法会使当前线程进入阻塞
  18. future.get();
  19. }
  20. }