一.使用
public class Test {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("AAA " + Thread.currentThread().getName());
return "hello ";
}, executorService).thenAccept(s -> {
System.out.println(s + "world");
System.out.println("BBB " + Thread.currentThread().getName());
});
System.out.println(Thread.currentThread().getName());
Thread.sleep(40000);
}
}
输出:
main
AAA pool-1-thread-1
hello world
BBB pool-1-thread-1
二.源码
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
// 执行到这Thread.sleep(2000);
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.uniAccept(this, f, null)) {
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
// 入栈volatile Completion stack;
push(c);
c.tryFire(SYNC);
}
return d;
}
final <S> boolean uniAccept(CompletableFuture<S> a,
Consumer<? super S> f, UniAccept<S> c) {
Object r; Throwable x;
// a.result == null
if (a == null || (r = a.result) == null || f == null)
return false;
}
static final class UniAccept<T> extends UniCompletion<T,Void> {
Consumer<? super T> fn;
UniAccept(Executor executor, CompletableFuture<Void> dep,
CompletableFuture<T> src, Consumer<? super T> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniAccept(a = src, fn, mode > 0 ? null : this))
// 执行到这
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
// f.stack != null
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
// stack = h.next
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniAccept(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
final <S> boolean uniAccept(CompletableFuture<S> a,
Consumer<? super S> f, UniAccept<S> c) {
Object r; Throwable x;
// a.result:hello
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
// s:hello
f.accept(s);
completeNull();
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}