1. netty 中的 Future jdk 中的 Future 同名,但是是两个接口,netty Future 继承自 jdk Future,而 Promise 又对 netty Future 进行了扩展。
  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待。如果任务失败,抛出异常 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

例1

同步处理任务成功

  1. DefaultEventLoop eventExecutors = new DefaultEventLoop();
  2. DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);
  3. eventExecutors.execute(()->{
  4. try {
  5. Thread.sleep(1000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. log.debug("set success, {}",10);
  10. promise.setSuccess(10);
  11. });
  12. log.debug("start...");
  13. log.debug("{}",promise.getNow()); // 还没有结果
  14. log.debug("{}",promise.get());

输出

  1. 11:51:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
  2. 11:51:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null
  3. 11:51:54 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set success, 10
  4. 11:51:54 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - 10

例2

异步处理任务成功

  1. DefaultEventLoop eventExecutors = new DefaultEventLoop();
  2. DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);
  3. // 设置回调,异步接收结果
  4. promise.addListener(future -> {
  5. // 这里的 future 就是上面的 promise
  6. log.debug("{}",future.getNow());
  7. });
  8. // 等待 1000 后设置成功结果
  9. eventExecutors.execute(()->{
  10. try {
  11. Thread.sleep(1000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. log.debug("set success, {}",10);
  16. promise.setSuccess(10);
  17. });
  18. log.debug("start...");

输出

  1. 11:49:30 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
  2. 11:49:31 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set success, 10
  3. 11:49:31 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - 10

例3

同步处理任务失败 - sync & get

  1. DefaultEventLoop eventExecutors = new DefaultEventLoop();
  2. DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);
  3. eventExecutors.execute(() -> {
  4. try {
  5. Thread.sleep(1000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. RuntimeException e = new RuntimeException("error...");
  10. log.debug("set failure, {}", e.toString());
  11. promise.setFailure(e);
  12. });
  13. log.debug("start...");
  14. log.debug("{}", promise.getNow());
  15. promise.get(); // sync() 也会出现异常,只是 get 会再用 ExecutionException 包一层异常

输出

  1. 12:11:07 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
  2. 12:11:07 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null
  3. 12:11:08 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error...
  4. Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: error...
  5. at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)
  6. at com.itcast.oio.DefaultPromiseTest2.main(DefaultPromiseTest2.java:34)
  7. Caused by: java.lang.RuntimeException: error...
  8. at com.itcast.oio.DefaultPromiseTest2.lambda$main$0(DefaultPromiseTest2.java:27)
  9. at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
  10. at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
  11. at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  12. at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  13. at java.lang.Thread.run(Thread.java:745)

例4

同步处理任务失败 - await

  1. DefaultEventLoop eventExecutors = new DefaultEventLoop();
  2. DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);
  3. eventExecutors.execute(() -> {
  4. try {
  5. Thread.sleep(1000);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. RuntimeException e = new RuntimeException("error...");
  10. log.debug("set failure, {}", e.toString());
  11. promise.setFailure(e);
  12. });
  13. log.debug("start...");
  14. log.debug("{}", promise.getNow());
  15. promise.await(); // 与 sync 和 get 区别在于,不会抛异常
  16. log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());

输出

  1. 12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
  2. 12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null
  3. 12:18:54 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error...
  4. 12:18:54 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...

例5

异步处理任务失败

  1. DefaultEventLoop eventExecutors = new DefaultEventLoop();
  2. DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);
  3. promise.addListener(future -> {
  4. log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());
  5. });
  6. eventExecutors.execute(() -> {
  7. try {
  8. Thread.sleep(1000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. RuntimeException e = new RuntimeException("error...");
  13. log.debug("set failure, {}", e.toString());
  14. promise.setFailure(e);
  15. });
  16. log.debug("start...");

输出

  1. 12:04:57 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...
  2. 12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error...
  3. 12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...

例6

await 死锁检查

  1. DefaultEventLoop eventExecutors = new DefaultEventLoop();
  2. DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);
  3. eventExecutors.submit(()->{
  4. System.out.println("1");
  5. try {
  6. promise.await();
  7. // 注意不能仅捕获 InterruptedException 异常
  8. // 否则 死锁检查抛出的 BlockingOperationException 会继续向上传播
  9. // 而提交的任务会被包装为 PromiseTask,它的 run 方法中会 catch 所有异常然后设置为 Promise 的失败结果而不会抛出
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. System.out.println("2");
  14. });
  15. eventExecutors.submit(()->{
  16. System.out.println("3");
  17. try {
  18. promise.await();
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. System.out.println("4");
  23. });

输出

  1. 1
  2. 2
  3. 3
  4. 4
  5. io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete)
  6. at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384)
  7. at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212)
  8. at com.itcast.oio.DefaultPromiseTest.lambda$main$0(DefaultPromiseTest.java:27)
  9. at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
  10. at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
  11. at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
  12. at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
  13. at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  14. at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  15. at java.lang.Thread.run(Thread.java:745)
  16. io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete)
  17. at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384)
  18. at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212)
  19. at com.itcast.oio.DefaultPromiseTest.lambda$main$1(DefaultPromiseTest.java:36)
  20. at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
  21. at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
  22. at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
  23. at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
  24. at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  25. at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  26. at java.lang.Thread.run(Thread.java:745)