ListenableFuture

ListenableFuture接口继承Future接口。Future接口是JDK定义的接口,其定义了与任务操作相关的方法,例如任务的取消:cancel(),判断任务的状态:isCancelled()、isDone(),获取任务结果:get()等
ListenableFuture是Guava(google提供的一个java开发包)定义的接口,其在Future接口的基础上添加了addListener()方法,源码如下:

  1. void addListener(Runnable listener, Executor executor);

该方法主要用于给Future任务添加监听任务,需要主要的是:

  • listener任务必须指定执行该任务的线程池executor(该executor一定不要与执行Future任务的线程池是同一个,否则会出现死锁情况)
  • 监听任务执行的时机为Future任务执行完成(包括正确执行完成和任务抛异常终止)或者被取消。

AbstractFuture内部类Sync

AbstractFuture的实现与FutureTask的实现非常相似,FutureTask使用unsafe包的CAS实现,而AbstractFuture使用的是AQS(AbstractQueuedSynchronizer,jdk锁实现的模板类,本文不做讲解),AbstractFuture将99%的操作全部都交于内部类Sync实现,下面讲解一下Sync类的实现

  • 任务的状态信息

Sync将任务状态分为5种,分别为running、completing、completed、cancelled、interrupted。其与FutureTask任务的状态对比如下图所示:
image.png
FutureTask-AbstractFuture状态对比图.png

从图中可以看出,AbstractFure相比与FutureTask少了两种状态Exceptional和interrupting状态,其中Exceptional状态,在AbstractFure中通过添加一个Throwable类型的结果来实现(如果Throwable对象的值不为null,则说明是exceptional),而interrupting状态再FutureTask中的作用就不是很大,在AstractFurue中并没有设计该状态。

  • 成员变量

与FutureTask不同,FutureTask中只有一个Object对象用来存放Future执行的结果,可以是正常结果,也可以是异常。在AbstractFuture中,将两种结果分开,正常结果放在value中,异常结果放在exception中,源码如下:

  1. private V value;
  2. private Throwable exception;

这也是为什么不设置exceptional状态就能区分正常和异常结果的原因。

  • 锁方法的重写

该锁使用的是共享锁来实现,主要涉及两个方法:

  1. tryAcquireShared(int ignored) 该方法是在获取锁时调用,多个线程可以同时获取锁
  2. tryReleaseShared(int finalState) 该方法是在释放锁时调用

在锁的时候时,我们一般都是先尝试获取锁,然后处理临界资源,处理完成后释放锁。而在AbstractFuture中并不是这种常规的使用方式,其实现是必须有一个线程先调用releaseShared(int arg)释放锁,其他线程才能调用acquireShared(int arg)获取锁,否则,所有的获取锁线程都将会堵塞
源码如下:

  1. protected int tryAcquireShared(int ignored) {
  2. if (isDone()) {
  3. return 1;
  4. }
  5. return -1;
  6. }
  7. @Override
  8. protected boolean tryReleaseShared(int finalState) {
  9. setState(finalState);
  10. return true;
  11. }

从源码中可以看出,调用tryAcquireShared()方法就是判断任务有没有完成(isDone()方法),任务完成成功获取锁,任务没有完成则等待。
那么问题就是任务什么时候完成,任务完成都会调用tryReleaseShared()方法,该方法用于任务完成或者取消时设置最终状态。

  • 成员方法

一. 阻塞与非阻塞的get()方法
get()方法或间接调用获取锁操作,如果成功获取锁,则返回对应的结果,如果获取不到锁,则返回对应的异常

  1. V get(long nanos) throws TimeoutException, CancellationException,
  2. ExecutionException, InterruptedException {
  3. //间接调用tryAcquireShared(arg)方法,支持中断,最长等待nanos时间
  4. if (!tryAcquireSharedNanos(-1, nanos)) {
  5. throw new TimeoutException("Timeout waiting for task.");
  6. }
  7. return getValue();
  8. }
  9. V get() throws CancellationException, ExecutionException,
  10. InterruptedException {
  11. //间接调用tryAcquireShared(arg)方法,支持中断,直到获取锁返回
  12. acquireSharedInterruptibly(-1);
  13. return getValue();
  14. }

从源码中可以看出,两种get()方法都会间接的调用tryAcquireShared(arg)方法,且都支持中断。在成功获取锁之后,两者都会调用getValue()方法,其源码如下:

  1. private V getValue() throws CancellationException, ExecutionException {
  2. int state = getState();
  3. switch (state) {
  4. case COMPLETED:
  5. if (exception != null) {
  6. throw new ExecutionException(exception);
  7. } else {
  8. return value;
  9. }
  10. case CANCELLED:
  11. case INTERRUPTED:
  12. throw cancellationExceptionWithCause(
  13. "Task was cancelled.", exception);
  14. default:
  15. throw new IllegalStateException(
  16. "Error, synchronizer in invalid state: " + state);
  17. }
  18. }

该方法主要针对三种终止状态做处理:

  1. completed状态,该状态又分为正常结束和异常终止,通过判断exception是否为null进行区分,如果异常,则抛出异常,否则返回正常结束的结果
  2. cancelled和interrupted状态,统一处理抛出取消异常
  3. 其他情况,讲道理不会出现的状态,如果出现了抛出非法状态异常

二. 设值方法
该类方法都会间接调用 tryReleaseShared(int finalState)方法,使锁处于可获取状态,表示任务执行完成。其中包括set(@Nullable V v)设值正常值、setException(Throwable t)设值异常、cancel(boolean interrupt)取消任务,期源码如下:

  1. boolean set(@Nullable V v) {
  2. return complete(v, null, COMPLETED);
  3. }
  4. boolean setException(Throwable t) {
  5. return complete(null, t, COMPLETED);
  6. }
  7. boolean cancel(boolean interrupt) {
  8. return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
  9. }

从源码中可以看出三者都调用了complete(@Nullable V v, @Nullable Throwable t, int finalState)方法,下面对该方法进行详细的讲解,其核心逻辑如下:

  1. 将任务状态由running修改为completing状态
  2. 如果状态修改成功,则对结果value和异常exception赋值,异常的赋值主要看方法参数finalState,改状态如果为cancelled或者interrupted则为异常赋值,否则赋值为参数t的值(t可能为null),然后调用releaseShared(finalState)(该方法间接调用tryReleaseShared(arg))方法更新任务状态为最终状态。
  3. 如果状态修改失败,判断状态是否为completing状态,如果是则说明任务已经执行完成,只在赋值阶段,执行acquireShared(-1)获取锁操作,使自己阻塞至任务完成

源码如下:

  1. private boolean complete(@Nullable V v, @Nullable Throwable t,
  2. int finalState) {
  3. boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
  4. if (doCompletion) {
  5. // If this thread successfully transitioned to COMPLETING, set the value
  6. // and exception and then release to the final state.
  7. this.value = v;
  8. // Don't actually construct a CancellationException until necessary.
  9. this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)
  10. ? new CancellationException("Future.cancel() was called.") : t;
  11. releaseShared(finalState);
  12. } else if (getState() == COMPLETING) {
  13. // If some other thread is currently completing the future, block until
  14. // they are done so we can guarantee completion.
  15. acquireShared(-1);
  16. }
  17. return doCompletion;
  18. }

AbstractFuture的状态转换全部都在该方法中了。
三. 状态判断方法
主要判断任务状态state的值,state的值存放在AQS中,再次不过多讲解,源码如下:

  1. boolean isDone() {
  2. return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
  3. }
  4. boolean isCancelled() {
  5. return (getState() & (CANCELLED | INTERRUPTED)) != 0;
  6. }
  7. boolean wasInterrupted() {
  8. return getState() == INTERRUPTED;
  9. }

AbstractFuture

  • 成员变量

    1. private final Sync<V> sync = new Sync<V>();
    2. private final ExecutionList executionList = new ExecutionList();

    其中,sync已经讲解,ExecutionList主要用于执行listener,其实用可以参考我的另一篇博客Guava ListenableFutureTask中有关于ExecutionList的详细讲解

  • 成员方法

大部分的成员方法都是直接调用sync的对应方法,并没有做过多的操作,只是简单的将方法暴露给外部使用而已。其中,get(long timeout, TimeUnit unit)、get()、isDone()、isCancelled()、wasInterrupted()都是直接调用的sync的方法。
除了上述方法外,还有赋值操作和取消操作的方法,由于该类方法设计到任务完成回调listener方法的关系,因此不是简单的调用sync的方法,其实现如下所示:

  1. protected boolean set(@Nullable V value) {
  2. boolean result = sync.set(value);
  3. if (result) {
  4. executionList.execute();
  5. }
  6. return result;
  7. }

如果设置成功,表示任务结束,则执行listener方法(executionList.execute())
setException(Throwable throwable)方法与set(@Nullable V value)操作一直,多了一个非空判断而已,不再讲解。
cancel() 如果取消成功,则执行listener回调,如果参数为真,则interruptTask();该方法为抽象方法,子类可以实现。cancel()的源码如下:

  1. public boolean cancel(boolean mayInterruptIfRunning) {
  2. if (!sync.cancel(mayInterruptIfRunning)) {
  3. return false;
  4. }
  5. executionList.execute();
  6. if (mayInterruptIfRunning) {
  7. interruptTask();
  8. }
  9. return true;
  10. }

AbstractFuture与FutureTask的区别

  1. AbstractFuture通过AQS实现,FutureTask通过unsafe CAS实现,本质是一样的
  2. AbstractFuture 有五种状态,两种任务结果value和exception,而FutureTask有七种状态,任务执行结果只有一个outcome
  3. AbstractFuture没有实现Runnable接口,不能作为任务放到线程池中执行,而FutureTask可以
  4. AbstractFuture有接口回调,FutureTask没有,但是留下了回调的接口,可以重写done()方法

    AbstractFuture的使用场景

    AbstractFuture是一个抽象类,我们需要自定义子类来使用AbstractFuture,又因此AbstractFuture并没有实现Runnable接口,因此其不适合和线程池配合使用(子类同时实现Runnable接口也是可以的)。它经常用来与AsyncHttpClient配合使用,使用异步HttpClient发起请求,在请求的回调中根据请求的返回结果执行AbstractFuture对象set()、setException()、cancel()操作。