在开始介绍AbstractFuture之前先让我们手动实现一个回调.

    回调
    手写Future类

    1. public class Future {
    2. private Consumer consumer;
    3. public void addListener(Consumer<T> consumer) {
    4. this.consumer = consumer;
    5. }
    6. public void set(T value) {
    7. consumer.accept(value);
    8. }
    9. }


    测试回调

    1. public static void main(String[] args) throws InterruptedException {
    2. Future future = new Future();
    3. //监听future设置了值
    4. future.addListener(new Consumer<String>() {
    5. @Override
    6. public void accept(String s) {
    7. System.out.println("---------"+s);
    8. }
    9. });
    10. TimeUnit.SECONDS.sleep(5);
    11. future.set("hh");
    12. TimeUnit.SECONDS.sleep(1);
    13. }


    执行mian方法,可以发现睡眠5秒后,future设置了值,这时addListener方法可以监听到future设置了值.
    以上回调有个问题,假设Consumer接口中的方法执行需要大量的时间,那这样future.set(“hh”)的时候就会出现阻塞,其实对于这种情况,我们完全可以启用一个线程去实现,但是AbstractFuture已经为我们很好的解决方法.

    AbstractFuture使用
    先通过一个例子,感受下AbstractFuture的好处

    1. static class AbstractFutureImpl extends AbstractFuture {
    2. public boolean set(T value) {
    3. return super.set(value);
    4. }
    5. }
    6. // 创建线程池
    7. final static ExecutorService executors = Executors.newCachedThreadPool();
    8. public static void main(String[] args) throws InterruptedException {
    9. AbstractFutureImpl<String> future = new AbstractFutureImpl();
    10. //监听future设置了值
    11. future.addListener(new Runnable() {
    12. @Override
    13. public void run() {
    14. try {
    15. System.out.println("进入回调函数");
    16. TimeUnit.SECONDS.sleep(3);
    17. System.out.println("收到set的值: " + future.get());
    18. } catch (Exception e) {
    19. e.printStackTrace();
    20. }
    21. }
    22. }, executors);
    23. TimeUnit.SECONDS.sleep(5);
    24. future.set("hh");
    25. System.out.println("set完值");
    26. TimeUnit.SECONDS.sleep(100);
    27. }
    28. /**
    29. set完值
    30. 进入回调函数
    31. 收到set的值: hh
    32. */


    由打印结果可以看出,future.set(“hh”)方法未阻塞,回调函数完全由传入线程池去执行. 当set完值时,就会主动回调addListener方法,通过future.get()(这时已经有值,故get()方法不会阻塞)获取到值.

    AbstractFuture解析
    AbstractFuture实现了ListenableFuture,Future接口,实现了接口中所有方法.无需我们重复造轮子.
    那addListener是如何实现的呢?具体实现细节看源码,这里说下大概思路.

    addListener核心源码:

    1. public void add(Runnable runnable, Executor executor) {
    2. // Fail fast on a null. We throw NPE here because the contract of
    3. // Executor states that it throws NPE on null listener, so we propagate
    4. // that contract up into the add method as well.
    5. Preconditions.checkNotNull(runnable, "Runnable was null.");
    6. Preconditions.checkNotNull(executor, "Executor was null.");
    7. // Lock while we check state. We must maintain the lock while adding the
    8. // new pair so that another thread can't run the list out from under us.
    9. // We only add to the list if we have not yet started execution.
    10. synchronized (this) {
    11. if (!executed) {
    12. runnables = new RunnableExecutorPair(runnable, executor, runnables);
    13. return;
    14. }
    15. }
    16. // Execute the runnable immediately. Because of scheduling this may end up
    17. // getting called before some of the previously added runnables, but we're
    18. // OK with that. If we want to change the contract to guarantee ordering
    19. // among runnables we'd have to modify the logic here to allow it.
    20. executeListener(runnable, executor);
    21. }


    源码里有个executed判断,如何走if()判断,会把runnable和executor存储起来,否则直接通过executor执行runnable方法.
    为什么会有executed呢? 看set(T value)方法源码就会知道,:

    1. 如果addListener方法比set方法先执行,这时executed就是false, 执行set方法时会主动通过executor执行runnable方法.
    2. 如果addListener方法比set方法后执行,这时executed就是ture.直接通过executor执行runnable方法.

    set核心源码:

    1. public void execute() {
    2. // Lock while we update our state so the add method above will finish adding
    3. // any listeners before we start to run them.
    4. RunnableExecutorPair list;
    5. synchronized (this) {
    6. if (executed) {
    7. return;
    8. }
    9. executed = true;
    10. list = runnables;
    11. runnables = null; // allow GC to free listeners even if this stays around for a while.
    12. }
    13. // If we succeeded then list holds all the runnables we to execute. The pairs in the stack are
    14. // in the opposite order from how they were added so we need to reverse the list to fulfill our
    15. // contract.
    16. // This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we
    17. // could drop the contract on the method that enforces this queue like behavior since depending
    18. // on it is likely to be a bug anyway.
    19. // N.B. All writes to the list and the next pointers must have happened before the above
    20. // synchronized block, so we can iterate the list without the lock held here.
    21. RunnableExecutorPair reversedList = null;
    22. while (list != null) {
    23. RunnableExecutorPair tmp = list;
    24. list = list.next;
    25. tmp.next = reversedList;
    26. reversedList = tmp;
    27. }
    28. while (reversedList != null) {
    29. executeListener(reversedList.runnable, reversedList.executor);
    30. reversedList = reversedList.next;
    31. }
    32. }


    注: 思想是灵魂,实现是形式