netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
| 功能/名称 | jdk Future | netty Future | Promise |
|---|---|---|---|
| cancel | 取消任务 | - | - |
| isCanceled | 任务是否取消 | - | - |
| isDone | 任务是否完成,不能区分成功失败 | - | - |
| get | 获取任务结果,阻塞等待。如果任务失败,抛出异常 | - | - |
| getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
| await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
| sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
| isSuccess | - | 判断任务是否成功 | - |
| cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
| addLinstener | - | 添加回调,异步接收结果 | - |
| setSuccess | - | - | 设置成功结果 |
| setFailure | - | - | 设置失败结果 |
例1
同步处理任务成功
DefaultEventLoop eventExecutors = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);eventExecutors.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("set success, {}",10);promise.setSuccess(10);});log.debug("start...");log.debug("{}",promise.getNow()); // 还没有结果log.debug("{}",promise.get());
输出
11:51:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...11:51:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null11:51:54 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set success, 1011:51:54 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - 10
例2
异步处理任务成功
DefaultEventLoop eventExecutors = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);// 设置回调,异步接收结果promise.addListener(future -> {// 这里的 future 就是上面的 promiselog.debug("{}",future.getNow());});// 等待 1000 后设置成功结果eventExecutors.execute(()->{try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("set success, {}",10);promise.setSuccess(10);});log.debug("start...");
输出
11:49:30 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...11:49:31 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set success, 1011:49:31 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - 10
例3
同步处理任务失败 - sync & get
DefaultEventLoop eventExecutors = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);eventExecutors.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException e = new RuntimeException("error...");log.debug("set failure, {}", e.toString());promise.setFailure(e);});log.debug("start...");log.debug("{}", promise.getNow());promise.get(); // sync() 也会出现异常,只是 get 会再用 ExecutionException 包一层异常
输出
12:11:07 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...12:11:07 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null12:11:08 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error...Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: error...at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)at com.itcast.oio.DefaultPromiseTest2.main(DefaultPromiseTest2.java:34)Caused by: java.lang.RuntimeException: error...at com.itcast.oio.DefaultPromiseTest2.lambda$main$0(DefaultPromiseTest2.java:27)at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)at java.lang.Thread.run(Thread.java:745)
例4
同步处理任务失败 - await
DefaultEventLoop eventExecutors = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);eventExecutors.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException e = new RuntimeException("error...");log.debug("set failure, {}", e.toString());promise.setFailure(e);});log.debug("start...");log.debug("{}", promise.getNow());promise.await(); // 与 sync 和 get 区别在于,不会抛异常log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());
输出
12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...12:18:53 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - null12:18:54 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error...12:18:54 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...
例5
异步处理任务失败
DefaultEventLoop eventExecutors = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);promise.addListener(future -> {log.debug("result {}", (promise.isSuccess() ? promise.getNow() : promise.cause()).toString());});eventExecutors.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException e = new RuntimeException("error...");log.debug("set failure, {}", e.toString());promise.setFailure(e);});log.debug("start...");
输出
12:04:57 [DEBUG] [main] c.i.o.DefaultPromiseTest2 - start...12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - set failure, java.lang.RuntimeException: error...12:04:58 [DEBUG] [defaultEventLoop-1-1] c.i.o.DefaultPromiseTest2 - result java.lang.RuntimeException: error...
例6
await 死锁检查
DefaultEventLoop eventExecutors = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors);eventExecutors.submit(()->{System.out.println("1");try {promise.await();// 注意不能仅捕获 InterruptedException 异常// 否则 死锁检查抛出的 BlockingOperationException 会继续向上传播// 而提交的任务会被包装为 PromiseTask,它的 run 方法中会 catch 所有异常然后设置为 Promise 的失败结果而不会抛出} catch (Exception e) {e.printStackTrace();}System.out.println("2");});eventExecutors.submit(()->{System.out.println("3");try {promise.await();} catch (Exception e) {e.printStackTrace();}System.out.println("4");});
输出
1234io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete)at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384)at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212)at com.itcast.oio.DefaultPromiseTest.lambda$main$0(DefaultPromiseTest.java:27)at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)at java.lang.Thread.run(Thread.java:745)io.netty.util.concurrent.BlockingOperationException: DefaultPromise@47499c2a(incomplete)at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:384)at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:212)at com.itcast.oio.DefaultPromiseTest.lambda$main$1(DefaultPromiseTest.java:36)at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)at java.lang.Thread.run(Thread.java:745)
