当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。
ChannelFuture 是一个接口 : public interface ChannelFuture extends Future
。我们可以添加监听器,当监听的事件发生时,就会通知到监听器。
Netty 的异步模型是建立在 future 和 callback 的之上的。其中,Future的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future去监控方法 fun 的处理过程。Future这里用来表示异步执行的结果,可以通过它提供的方法来检测执行是否完成。
使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。Netty 框架的目标就是让业务逻辑从网络基础应用编码中分离出来、解脱出来。
Future-Listener机制
**
当 Future
对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture
来获取操作执行的状态,注册监听函数来执行完成后的操作。
常用方法如下:
方法名称 | 方法作用 |
---|---|
isDone() | 判断当前操作是否完成 |
isSuccess() | 判断已完成的当前操作是否成功 |
getCause() | 获取已完成当前操作失败的原因 |
isCancelled() | 判断已完成的当前操作是否被取消 |
addListener() | 注册监听器,当前操作(Future)已完成,将会通知指定的监听器 |
例如,绑定端口操作时异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑:
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
} else{
System.err.println("端口["+ port + "]绑定失败!");
}
});
下面通过喝茶的案例来说明异步回调的不同实现方式,喝茶可分为三步:
- 烧水
- 准备茶叶
- 泡茶喝
阻塞式异步回调
**
根据喝茶的三个执行步骤,我们需要使用三个线程来分别表示,其中喝茶线程需要等待前两个线程执行完毕之后,它才能开始执行。如果采用阻塞式实现异步回调,可以使用Thread类的join方法,它会等待join的线程执行完毕之后才继续往下执行。
/**
* @Author dyliang
* @Date 2020/11/3 9:55
* @Version 1.0
*/
public class BlockingTea {
public static void main(String[] args) {
System.out.println("喝茶线程: " + Thread.currentThread().getName() + " Thread");
Thread water = new Thread(()->{
System.out.println("烧水线程 :" + Thread.currentThread().getName());
try {
System.out.println("烧水中...");
Thread.sleep(5000);
System.out.println("水烧好了~");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "water Thread");
Thread leaf = new Thread(()->{
System.out.println("准备茶叶线程 :" + Thread.currentThread().getName());
try {
System.out.println("选茶中...");
Thread.sleep(5000);
System.out.println("选好了~");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "leaf Thread");
water.start();
leaf.start();
System.out.println("等待泡茶中...");
try {
water.join();
leaf.join();
System.out.println("喝茶啦~");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
程序输出:
D:\Java\jdk1.8.0_221\bin\java.exe "-javaagent:D:\IntelliJ IDEA 2020.1.2\lib\idea_rt.jar=1430:D:\IntelliJ IDEA 2020.1.2\bin" -Dfile.encoding=GBK -classpath D:\Java\jdk1.8.0_221\jre\lib\charsets.jar;D:\Java\jdk1.8.0_221\jre\lib\deploy.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\access-bridge-64.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\cldrdata.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\dnsns.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\jaccess.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\jfxrt.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\localedata.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\nashorn.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\sunec.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\sunjce_provider.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\sunmscapi.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\sunpkcs11.jar;D:\Java\jdk1.8.0_221\jre\lib\ext\zipfs.jar;D:\Java\jdk1.8.0_221\jre\lib\javaws.jar;D:\Java\jdk1.8.0_221\jre\lib\jce.jar;D:\Java\jdk1.8.0_221\jre\lib\jfr.jar;D:\Java\jdk1.8.0_221\jre\lib\jfxswt.jar;D:\Java\jdk1.8.0_221\jre\lib\jsse.jar;D:\Java\jdk1.8.0_221\jre\lib\management-agent.jar;D:\Java\jdk1.8.0_221\jre\lib\plugin.jar;D:\Java\jdk1.8.0_221\jre\lib\resources.jar;D:\Java\jdk1.8.0_221\jre\lib\rt.jar;D:\data\Code\Java_code\out\production\Java_code;D:\Java\Maven\repository\junit\junit\4.12\junit-4.12.jar;D:\Java\Maven\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;D:\project\work\Java\设计模式\源码笔记课件\代码\DesignPattern\src\com\atguigu\proxy\cglib\cglib-2.2.jar;D:\project\work\Java\设计模式\源码笔记课件\代码\DesignPattern\src\com\atguigu\proxy\cglib\asm.jar;D:\project\work\Java\设计模式\源码笔记课件\代码\DesignPattern\src\com\atguigu\proxy\cglib\asm-commons.jar;D:\project\work\Java\设计模式\源码笔记课件\代码\DesignPattern\src\com\atguigu\proxy\cglib\asm-tree.jar NIO.AIO.BlockingTea
喝茶线程: main Thread
等待泡茶中...
烧水线程 :water Thread
烧水中...
准备茶叶线程 :leaf Thread
选茶中...
选好了~
水烧好了~
喝茶啦~
这里使用的join方法是持续阻塞版本,其他的实现方式有:
- join(long mils):等待线程最多mils毫秒,不管等待的线程是否执行结束,改线程都重新恢复执行
- join(long mils, int nanos):等待线程最多mils毫秒加nanos纳秒,不管等待的线程是否执行结束,改线程都重新恢复执行
join方法调用时,调用方法的线程会被阻塞,直到等待的线程执行结束或者等待超时才恢复运行。join方法是没有返回值的,调用join方法的线程无法知道等待线程的执行结果。如果想要知道等待线程的执行结果,可以使用Java所提供的FutureTask。
FutureTask异步回调
**
如果想要使用FutureTask来实现有结果的异步回调,那么实现Runnable接口创建线程的方式就不行了,需要使用实现Callable接口并重写call方法的方式完成。
Callable接口的实现类似于Runnable接口,它也是作为中线程的实现方式存在,但它是有返回值的。定义如下:
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
需要注意的是:Callable接口的实现是不能直接作为Thread构造方法的参数来创建线程的,从Thread类的构造方法中可以很明显的看出。如果想要这种方式创建线程,我们需要一个桥梁FutureTask。FutureTask表示一个未来执行的任务,表示新线程所执行的操作,定义如下:
public class FutureTask<V> implements RunnableFuture<V> {
// ...
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// ...
}
它实现的顶层接口为Future,接口中定义对并发任务的执行及获取结构的相关操作,如下所示:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning); // 取消并发任务的执行
boolean isCancelled(); // 获取任务的取消状态
boolean isDone(); // 获取任务的完成状态
V get() throws InterruptedException, ExecutionException; // 获取结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException; // 超时获取结果
}
因此,如果想要获取FutureTask中包含的线程执行结果,可以调用get方法,get方法获取结果的过程也是阻塞的,它最终调用report方法来返回结果。
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
结果保存在FutureTask的outcome字段。
下面使用FutureTask来重现实现上述的喝茶过程,如下所示:
package NIO.AIO;
import com.sun.org.apache.xpath.internal.operations.Bool;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @Author dyliang
* @Date 2020/11/3 10:26
* @Version 1.0
*/
public class FutureTaskTea {
static class Water implements Callable{
@Override
public Object call() throws Exception {
System.out.println("烧水线程 :" + Thread.currentThread().getName());
try {
System.out.println("烧水中...");
Thread.sleep(5000);
System.out.println("水烧好了~");
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
}
static class Leaf implements Callable{
@Override
public Object call() throws Exception {
System.out.println("准备茶叶线程 :" + Thread.currentThread().getName());
try {
System.out.println("选茶中...");
Thread.sleep(5000);
System.out.println("选好了~");
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
}
public static void main(String[] args) {
System.out.println("喝茶线程: " + Thread.currentThread().getName() + " Thread");
Water water = new Water();
Leaf leaf = new Leaf();
FutureTask waterFT = new FutureTask<>(water);
FutureTask leafFT = new FutureTask<>(leaf);
Thread waterThread = new Thread(waterFT, "water Thread");
Thread leafThread = new Thread(leafFT, "leaf Thread");
waterThread.start();
leafThread.start();
try {
Boolean leafOK = (Boolean)leafFT.get();
Boolean waterOK = (Boolean)waterFT.get();
if(leafOK && waterOK){
System.out.println("喝茶啦~");
} else {
System.out.println("请再等一会儿~");
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
虽然FutureTask相比于第一种实现可以获取等待线程执行的结果,但是它在调用get方法时仍然是阻塞,此时主线程是不能做其他操作的,它们都属于异步阻塞实现。如果想要实现异步非阻塞的获取异步线程执行结果,可以使用下面Google Guava Cache提供的异步回调实现。
Guave 异步回调
**
Guava提供了对于Future接口的增强实现ListenableFuture接口实现,允许以异步非阻塞的方式来获取等待线程的执行结果。类似于操作系统中的AIO,如果想要异步非阻塞的获取结果,需要回调机制的实现。回调机制允许等待线程执行结束后调用回调方法告知主线程执行结束,主线程就不必一直阻塞等待。Guava中也提供了相应的实现,即FutureCallable接口,它可以根据异步结果来完成不同的回调,并且可以处理异步结果。
下面看一下这两个接口的定义,首先创建一个Spring Boot项目,并引入Guava的相关依赖:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
FutureCallback接口的定义如下,它用来填写异步任务执行完后的监听逻辑:
@GwtCompatible
public interface FutureCallback<V> {
void onSuccess(@Nullable V var1);
void onFailure(Throwable var1);
}
其中:
- onSuccess:异步任务执行成功后被回调
- onFailure:异步任务执行结束后被回调
再来看一下ListenableFuture接口的定义,它实现了异步任务Callable和FutureCallback结果回调之间的监控关系,如下所示:
public interface ListenableFuture<T> extends Future<T> {
/**
* Register the given {@code ListenableFutureCallback}.
* @param callback the callback to register
*/
void addCallback(ListenableFutureCallback<? super T> callback);
/**
* Java 8 lambda-friendly alternative with success and failure callbacks.
* @param successCallback the success callback
* @param failureCallback the failure callback
* @since 4.1
*/
void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback);
/**
* Expose this {@link ListenableFuture} as a JDK {@link CompletableFuture}.
* @since 5.0
*/
default CompletableFuture<T> completable() {
CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this);
addCallback(completable::complete, completable::completeExceptionally);
return completable;
}
}
如果想要获取ListenableFuture异步任务实例,需要按照下面的方式通过向Guava的线程池来提交Callable任务来获取。
//创建java 线程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
//包装java线程池,构造guava 线程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
下面使用Guava的异步回调来实现上面的案例,如下所示:
/**
* @Author dyliang
* @Date 2020/11/3 15:14
* @Version 1.0
*/
public class GuavaTea {
// 烧水异步执行逻辑
static class Water implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
System.out.println("烧水线程 :" + Thread.currentThread().getName());
try {
System.out.println("烧水中...");
Thread.sleep(1000);
System.out.println("水烧好了~");
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
}
// 选茶异步执行逻辑
static class Leaf implements Callable<Boolean>{
@Override
public Boolean call(){
System.out.println("准备茶叶线程 :" + Thread.currentThread().getName());
try {
System.out.println("选茶中...");
Thread.sleep(1000);
System.out.println("选好了~");
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
}
// 泡茶
static class Tea implements Runnable{
boolean waterOK = false;
boolean leafOK = false;
@Override
public void run(){
while(true){
try {
Thread.sleep(1000);
System.out.println("等待中");
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果水烧好、茶选好就喝茶
if(waterOK && leafOK){
drink(waterOK, leafOK);
}
}
}
public void drink(Boolean waterOK, Boolean leafOK){
if(waterOK && leafOK){
System.out.println("喝茶啦~");
} else {
System.out.println("请再等一会儿~");
}
}
}
public static void main(String[] args) {
// 喝茶线程
Tea tea = new Tea();
Thread teaThread = new Thread(tea, "Tea Thread");
teaThread.start();
Callable<Boolean> water = new Water();
Callable<Boolean> leaf = new Leaf();
//创建java 线程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
//包装java线程池,构造guava 线程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
ListenableFuture leafFuture = gPool.submit(leaf);
Futures.addCallback(leafFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean aBoolean) {
if(aBoolean){
tea.leafOK = true;
}
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("没水了...");
}
});
ListenableFuture waterFuture = gPool.submit(water);
Futures.addCallback(waterFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean aBoolean) {
if(aBoolean){
tea.waterOK = true;
}
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("没有茶叶了...");
}
});
}
}
程序输出:
准备茶叶线程 :pool-1-thread-1
选茶中...
烧水线程 :pool-1-thread-2
烧水中...
等待中
水烧好了~
选好了~
等待中
喝茶啦~
如果使用使用Guava异步回调,流程如下:
- 实现Java的Runnable或是Callable接口,创建异步执行逻辑
- 创建Guava线程池
- 将前面创建的异步执行逻辑实例通过submit提交到线程池,获取ListenableFuture异步任务实例
- 创建FutureCallback回调实例,通过Futures.setCallback将回调实例绑定到ListenableFuture异步任务上
- 当异步任务执行完毕后,就会回调FutureCallback的onSuccess或是onFailure方法
Netty异步回调
**
Netty也提供了一套类似于Guava的异步回调机制,整体上如下所示:
- 对Java的Future接口增强实现的异步任务接口,使得Netty任务可以以非阻塞的方式处理异步回调的记过
- GenericFutureListener接口用于表示异步执行完成后的监听器,接口中的operationComplate方法用于异步任务执行完成后进行回调
异步回调在Netty中十分重要,后续在Netty的介绍中再深入探讨。