一.使用
public class Test { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } System.out.println("AAA " + Thread.currentThread().getName()); return "hello "; }, executorService).thenAccept(s -> { System.out.println(s + "world"); System.out.println("BBB " + Thread.currentThread().getName()); }); System.out.println(Thread.currentThread().getName()); Thread.sleep(40000); }}输出:mainAAA pool-1-thread-1hello worldBBB pool-1-thread-1
二.源码
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier);}static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d;}static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { CompletableFuture<T> dep; Supplier<T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { this.dep = dep; this.fn = fn; } public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { // 执行到这Thread.sleep(2000); d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); } }}public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action);}private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.uniAccept(this, f, null)) { UniAccept<T> c = new UniAccept<T>(e, d, this, f); // 入栈volatile Completion stack; push(c); c.tryFire(SYNC); } return d;}final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) { Object r; Throwable x; // a.result == null if (a == null || (r = a.result) == null || f == null) return false;}static final class UniAccept<T> extends UniCompletion<T,Void> { Consumer<? super T> fn; UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this)) // 执行到这 return null; dep = null; src = null; fn = null; return d.postFire(a, mode); }}final void postComplete() { CompletableFuture<?> f = this; Completion h; // f.stack != null while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; // stack = h.next if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } }}final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode);}final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) { Object r; Throwable x; // a.result:hello if (a == null || (r = a.result) == null || f == null) return false; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } try { if (c != null && !c.claim()) return false; @SuppressWarnings("unchecked") S s = (S) r; // s:hello f.accept(s); completeNull(); } catch (Throwable ex) { completeThrowable(ex); } } return true;}