简介及用法
CompletableFuture
是什么?
- 可以明确完成的future,可以set value和status。
- 实现了
CompletionStage
接口支持任务依赖。可以用于java异步编程。
基本用法:https://www.callicoder.com/java-8-completablefuture-tutorial/
CompletableFuture源码分析
CompletableFuture成员变量:
//通过result为不为null判断future是否完成,AltResult用来封装null结果和捕获异常
volatile Object result; // Either the result or boxed AltResult
//无锁并发栈,使用cas实现,用来实现任务依赖
volatile Completion stack; // Top of Treiber stack of dependent actions
final boolean internalComplete(Object r) { // CAS from null to r
return RESULT.compareAndSet(this, null, r);
}
/** Returns true if successfully pushed c onto stack. */
final boolean tryPushStack(Completion c) {
Completion h = stack;
//把当前栈顶设为c的next,NEXT是Completion next字段的VarHandler
NEXT.set(c, h); // CAS piggyback
//cas栈顶
return STACK.compareAndSet(this, h, c);
}
stack节点类Completion
及其子类:CompletableFuture
默认使用ForkJoinPool._commonPool_()
。CompletableFuture
内部任务依赖用Completion表示,Completion
继承自ForkJoinTask
,实现了异步处理。并且对应不同的场景有很多子类实现。
Completion分类:
- single-input (UniCompletion),
- two-input (BiCompletion),
- projected (BiCompletions using exactly one of two inputs),
- shared (CoCompletion, used by the second of two sources),
- zero-input source actions,
- Signallers that unblock waiters.
具体逻辑jdk源码注释写比较清楚。
下面我们结合一段demo代码,简单过一下源码。
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("Some Result");
return "Some Result";
}).thenApplyAsync(result -> {
System.out.println("Processed Result" + result);
return "Processed Result" + result;
});
从任务定义的角度分析:
supplyAsync->asyncSupplyStage,主要是生成一个AsyncSupply丢到线程池里执行。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
//直接生成AsyncSupply执行
e.execute(new AsyncSupply<U>(d, f));
return d;
}
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<? extends T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return false; }
public void run() {
CompletableFuture<T> d; Supplier<? extends T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
//set value
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
//触发依赖任务
d.postComplete();
}
}
}
thenApplyAsync-》uniApplyStage,生成一个
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
Object r;
if ((r = result) != null)
return uniApplyNow(r, e, f);
//生成一个新的CompletableFuture
CompletableFuture<V> d = newIncompleteFuture();
//封装UniApply类型的completion,push到栈
unipush(new UniApply<T,V>(e, d, this, f));
return d;
}
从任务执行的角度分析:
AsyncSupply task执行逻辑简单,如上面所示。这里主要看一下postComplete,主要作用是递归触发依赖任务。
代码注释1、2、3、4、5为主流程
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||//1.h当前future栈头
(f != this && (h = (f = this).stack) != null)) { //5.如果this.stack!=null,跳出循环。
CompletableFuture<?> d; Completion t;
if (STACK.compareAndSet(f, h, t = h.next)) {//2.cas栈到next节点
if (t != null) {
if (f != this) { //把嵌套依赖任务放入当前stack
pushStack(h);
continue;
}
NEXT.compareAndSet(h, t, null); // try to detach 3.cas 栈头h的next为null
}
f = (d = h.tryFire(NESTED)) == null ? this : d;//4.触发栈头h,放入线程池带执行
}
}
}
Completion.tryFire()方法:
usages:
参数mode
static final int SYNC = 0; //当定义任务时,如果前置任务已经完成,则立即触发
static final int ASYNC = 1;//只在Completion run和exec中调用,也就是任务执行时使用
static final int NESTED = -1;//只在postComplete方法中调用,用于任务触发
tryFire会调用两次:
1任务的触发,最终会调用UniCompletion.claim()方法
2任务的执行。
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // Treiber stack link
/**
* Performs completion action if triggered, returning a
* dependent that may need propagation, if one exists.
*
* @param mode SYNC, ASYNC, or NESTED
*/
abstract CompletableFuture<?> tryFire(int mode);
/** Returns true if possibly still triggerable. Used by cleanStack. */
abstract boolean isLive();
public final void run() { tryFire(ASYNC); }
public final boolean exec() { tryFire(ASYNC); return false; }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}
abstract static class UniCompletion<T,V> extends Completion {
Executor executor; // executor to use (null if none)
CompletableFuture<V> dep; // the dependent to complete
CompletableFuture<T> src; // source for action
UniCompletion(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src) {
this.executor = executor; this.dep = dep; this.src = src;
}
/**
* Returns true if action can be run. Call only when known to
* be triggerable. Uses FJ tag bit to ensure that only one
* thread claims ownership. If async, starts as task -- a
* later call to tryFire will run action.
*/
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // disable
e.execute(this);
}
return false;
}
final boolean isLive() { return dep != null; }
}