ListenableFuture
ListenableFuture
ListenableFuture
void addListener(Runnable listener, Executor executor);
该方法主要用于给Future任务添加监听任务,需要主要的是:
- listener任务必须指定执行该任务的线程池executor(该executor一定不要与执行Future任务的线程池是同一个,否则会出现死锁情况)
- 监听任务执行的时机为Future任务执行完成(包括正确执行完成和任务抛异常终止)或者被取消。
AbstractFuture内部类Sync
AbstractFuture的实现与FutureTask的实现非常相似,FutureTask使用unsafe包的CAS实现,而AbstractFuture使用的是AQS(AbstractQueuedSynchronizer,jdk锁实现的模板类,本文不做讲解),AbstractFuture将99%的操作全部都交于内部类Sync实现,下面讲解一下Sync类的实现
- 任务的状态信息
Sync将任务状态分为5种,分别为running、completing、completed、cancelled、interrupted。其与FutureTask任务的状态对比如下图所示:
FutureTask-AbstractFuture状态对比图.png
从图中可以看出,AbstractFure相比与FutureTask少了两种状态Exceptional和interrupting状态,其中Exceptional状态,在AbstractFure中通过添加一个Throwable类型的结果来实现(如果Throwable对象的值不为null,则说明是exceptional),而interrupting状态再FutureTask中的作用就不是很大,在AstractFurue中并没有设计该状态。
- 成员变量
与FutureTask不同,FutureTask中只有一个Object对象用来存放Future执行的结果,可以是正常结果,也可以是异常。在AbstractFuture中,将两种结果分开,正常结果放在value中,异常结果放在exception中,源码如下:
private V value;
private Throwable exception;
这也是为什么不设置exceptional状态就能区分正常和异常结果的原因。
- 锁方法的重写
该锁使用的是共享锁来实现,主要涉及两个方法:
tryAcquireShared(int ignored) 该方法是在获取锁时调用,多个线程可以同时获取锁
tryReleaseShared(int finalState) 该方法是在释放锁时调用
在锁的时候时,我们一般都是先尝试获取锁,然后处理临界资源,处理完成后释放锁。而在AbstractFuture中并不是这种常规的使用方式,其实现是必须有一个线程先调用releaseShared(int arg)释放锁,其他线程才能调用acquireShared(int arg)获取锁,否则,所有的获取锁线程都将会堵塞
源码如下:
protected int tryAcquireShared(int ignored) {
if (isDone()) {
return 1;
}
return -1;
}
@Override
protected boolean tryReleaseShared(int finalState) {
setState(finalState);
return true;
}
从源码中可以看出,调用tryAcquireShared()方法就是判断任务有没有完成(isDone()方法),任务完成成功获取锁,任务没有完成则等待。
那么问题就是任务什么时候完成,任务完成都会调用tryReleaseShared()方法,该方法用于任务完成或者取消时设置最终状态。
- 成员方法
一. 阻塞与非阻塞的get()方法
get()方法或间接调用获取锁操作,如果成功获取锁,则返回对应的结果,如果获取不到锁,则返回对应的异常
V get(long nanos) throws TimeoutException, CancellationException,
ExecutionException, InterruptedException {
//间接调用tryAcquireShared(arg)方法,支持中断,最长等待nanos时间
if (!tryAcquireSharedNanos(-1, nanos)) {
throw new TimeoutException("Timeout waiting for task.");
}
return getValue();
}
V get() throws CancellationException, ExecutionException,
InterruptedException {
//间接调用tryAcquireShared(arg)方法,支持中断,直到获取锁返回
acquireSharedInterruptibly(-1);
return getValue();
}
从源码中可以看出,两种get()方法都会间接的调用tryAcquireShared(arg)方法,且都支持中断。在成功获取锁之后,两者都会调用getValue()方法,其源码如下:
private V getValue() throws CancellationException, ExecutionException {
int state = getState();
switch (state) {
case COMPLETED:
if (exception != null) {
throw new ExecutionException(exception);
} else {
return value;
}
case CANCELLED:
case INTERRUPTED:
throw cancellationExceptionWithCause(
"Task was cancelled.", exception);
default:
throw new IllegalStateException(
"Error, synchronizer in invalid state: " + state);
}
}
该方法主要针对三种终止状态做处理:
- completed状态,该状态又分为正常结束和异常终止,通过判断exception是否为null进行区分,如果异常,则抛出异常,否则返回正常结束的结果
- cancelled和interrupted状态,统一处理抛出取消异常
- 其他情况,讲道理不会出现的状态,如果出现了抛出非法状态异常
二. 设值方法
该类方法都会间接调用 tryReleaseShared(int finalState)方法,使锁处于可获取状态,表示任务执行完成。其中包括set(@Nullable V v)设值正常值、setException(Throwable t)设值异常、cancel(boolean interrupt)取消任务,期源码如下:
boolean set(@Nullable V v) {
return complete(v, null, COMPLETED);
}
boolean setException(Throwable t) {
return complete(null, t, COMPLETED);
}
boolean cancel(boolean interrupt) {
return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
}
从源码中可以看出三者都调用了complete(@Nullable V v, @Nullable Throwable t, int finalState)方法,下面对该方法进行详细的讲解,其核心逻辑如下:
- 将任务状态由running修改为completing状态
- 如果状态修改成功,则对结果value和异常exception赋值,异常的赋值主要看方法参数finalState,改状态如果为cancelled或者interrupted则为异常赋值,否则赋值为参数t的值(t可能为null),然后调用releaseShared(finalState)(该方法间接调用tryReleaseShared(arg))方法更新任务状态为最终状态。
- 如果状态修改失败,判断状态是否为completing状态,如果是则说明任务已经执行完成,只在赋值阶段,执行acquireShared(-1)获取锁操作,使自己阻塞至任务完成
源码如下:
private boolean complete(@Nullable V v, @Nullable Throwable t,
int finalState) {
boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
if (doCompletion) {
// If this thread successfully transitioned to COMPLETING, set the value
// and exception and then release to the final state.
this.value = v;
// Don't actually construct a CancellationException until necessary.
this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)
? new CancellationException("Future.cancel() was called.") : t;
releaseShared(finalState);
} else if (getState() == COMPLETING) {
// If some other thread is currently completing the future, block until
// they are done so we can guarantee completion.
acquireShared(-1);
}
return doCompletion;
}
AbstractFuture的状态转换全部都在该方法中了。
三. 状态判断方法
主要判断任务状态state的值,state的值存放在AQS中,再次不过多讲解,源码如下:
boolean isDone() {
return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
}
boolean isCancelled() {
return (getState() & (CANCELLED | INTERRUPTED)) != 0;
}
boolean wasInterrupted() {
return getState() == INTERRUPTED;
}
AbstractFuture
成员变量
private final Sync<V> sync = new Sync<V>();
private final ExecutionList executionList = new ExecutionList();
其中,sync已经讲解,ExecutionList主要用于执行listener,其实用可以参考我的另一篇博客Guava ListenableFutureTask中有关于ExecutionList的详细讲解
成员方法
大部分的成员方法都是直接调用sync的对应方法,并没有做过多的操作,只是简单的将方法暴露给外部使用而已。其中,get(long timeout, TimeUnit unit)、get()、isDone()、isCancelled()、wasInterrupted()都是直接调用的sync的方法。
除了上述方法外,还有赋值操作和取消操作的方法,由于该类方法设计到任务完成回调listener方法的关系,因此不是简单的调用sync的方法,其实现如下所示:
protected boolean set(@Nullable V value) {
boolean result = sync.set(value);
if (result) {
executionList.execute();
}
return result;
}
如果设置成功,表示任务结束,则执行listener方法(executionList.execute())
setException(Throwable throwable)方法与set(@Nullable V value)操作一直,多了一个非空判断而已,不再讲解。
cancel() 如果取消成功,则执行listener回调,如果参数为真,则interruptTask();该方法为抽象方法,子类可以实现。cancel()的源码如下:
public boolean cancel(boolean mayInterruptIfRunning) {
if (!sync.cancel(mayInterruptIfRunning)) {
return false;
}
executionList.execute();
if (mayInterruptIfRunning) {
interruptTask();
}
return true;
}
AbstractFuture与FutureTask的区别
- AbstractFuture通过AQS实现,FutureTask通过unsafe CAS实现,本质是一样的
- AbstractFuture 有五种状态,两种任务结果value和exception,而FutureTask有七种状态,任务执行结果只有一个outcome
- AbstractFuture没有实现Runnable接口,不能作为任务放到线程池中执行,而FutureTask可以
- AbstractFuture有接口回调,FutureTask没有,但是留下了回调的接口,可以重写done()方法
AbstractFuture的使用场景
AbstractFuture是一个抽象类,我们需要自定义子类来使用AbstractFuture,又因此AbstractFuture并没有实现Runnable接口,因此其不适合和线程池配合使用(子类同时实现Runnable接口也是可以的)。它经常用来与AsyncHttpClient配合使用,使用异步HttpClient发起请求,在请求的回调中根据请求的返回结果执行AbstractFuture对象set()、setException()、cancel()操作。