一,Guava的异步回调模式
Guava是Google提供的Java扩展包,它提供了一种异步回调的解决方案。Guava中与异步回调相关的源码处于com.google.commonutilconcurrent包中。包中的很多类都用于对
javautilconcurrent的能力扩展和能力增强。比如,Guava的异步任务接口ListenableFuture扩展了Java的Future接口,实现了异步回调的能力。
1.FutureCallback
总体来说,Guava主要增强了Java而不是另起炉灶。为了实现异步回调方式获取异步线程的结果,Guava做了以下增强:
- 引入了一个新的接口ListenableFuture,继承了Java的Future接口,使得Java的Future异步任务在Guava中能被监控和非阻塞获取异步结果。
- 引入了一个新的接口FutureCallback,这是一个独立的新接口。该接口的目的是在异步任务执行完成后,根据异步结果完成不同的回调处理,并且可以处理异步结果。
FutureCallback是一个新增的接口,用来填写异步任务执行完后的监听逻辑。FutureCallback拥有两个日调方法:
- onSuccess()方法,在异步任务执行成功后被回调。调用时,异步任务的执行结果作为onSuccess方法的参数被传入。
- onFailure0方法,在异步任务执行过程中抛出异常时被回调。调用时,异步任务所抛出的异常作为onFailure方法的参数被传入。
Guava的FutureCallback与Java的Callable名字相近,实质不同,存在本质的区别:
(1)Java的Callable接口代表的是异步执行的逻辑。
(2)Guava的FutureCallback接口代表的是Callable异步逻辑执行完成之后,根据成功或者异常两种情形所需要执行的善后工作。
Guava是对JavaFuture异步回调的增强,使用Guava异步回调也需要用到Java的Callable接口。简单地说,只有在Java的Callable任务执行结果出来后,才可能执行Guava中的FutureCallback结果回调。 Guava如何实现异步任务Callable和结果回调FutureCallback之间的监控关系呢?Guava引入了一个新接口ListenableFuture,它继承了Java的 Future接口,增强了被监控的能力。
2.ListenableFuture
Guava的ListenableFuture接口是对Java的Future接口的扩展,可以理解为异步任务实例:
public interface ListenableFuture<V> extends Future<V>{
//此方法由guava内部调用
void addListener(Runnable r , Executor e)
}
ListenableFuture仅仅增加了一个addListener方法。它的作用就是将FutureCallback善后回调逻辑封装成一个内部的Runnable异步口调任务,在Callable异步任务完成后回调FutureCallback善后逻辑。
注意,此addListener(方法只在Guava内部调用,在实际编程中,addListener()不会使用到。
在实际编程中,如何将FutureCallback回调逻辑绑定到异步的 ListenableFuture任务呢?可以使用Guava的Futures工具类,它有一个 addCallback0静态方法,可以将FutureCallback的回调实例绑定到 ListenableFuture异步任务。下面是一个简单的绑定实例:
Futures.addCallback(listenableFuture,new FutureCallback<Boolean>){
public void onSuccess(Boolean r){
// listenableFuture内的Callable 成功时回调此方法
}
public void onFailure(Throwable t){
// listenableFuture内的Callable 异常时回调此方法
}
}
Guava的ListenableFuture接口是对Java的Future接口的扩展,都表示异步任务,那么Guava的异步任务实例从何而来?
3.ListenableFuture异步任务
如果要获取Guava的ListenableFuture异步任务实例,主要通过向线程池(ThreadPool)提交Callable任务的方式获取。不过,这里所说的线程池不是Java的线程池,而是经过Guava自己定制过的Guava线程池。
Guava线程池是对Java线程池的一种装饰。创建Guava线程池的方法如下:
//java线程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
//Guava线程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
首先创建Java线程池,然后以其作为Guava线程池的参数再构造一个Guava线程池。有了Guava的线程池之后,就可以通过submit()方法来提交任务了,任务提交之后的返回结果就是我们所要的ListenableFuture异步任务实例。
简单来说,获取异步任务实例的方式是通过向线程池提交Callable业务逻辑来实现,代码如下:
//submit()用来提交任务,返回异步任务实例
ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
//绑定回调实例
Futures.addCallback(listenableFuture, new FutureCallback<Boolean>(){
//有两种实现回调的方法
});
取到了ListenableFuture实例后,通过Futures.addCallback0方法将 FutureCallback回调逻辑的实例绑定到ListenableFuture异步任务实例,实现异步执行完成后的回调。
总结一下,Guava异步回调的流程如下:
- 实现Java的Callable接口,创建异步执行逻辑。还有一种情况,如果不需要返回值,异步执行逻辑也可以实现Runnable接口。
- 创建Guava线程池。
- 将(1)创建的Callable/Runnable异步执行逻辑的实例提交到 Guava线程池,从而获取ListenableFuture异步任务实例。
- 创建FutureCallback回调实例,通过FuturesaddCallback将回调实例绑定到ListenableFuture异步任务上。
完成以上4步,当Callable/Runnable异步执行逻辑完成后,就会回调 FutureCallback实例的回调方法onSuccess()/onFailure()。
4.使用Guava实现泡茶喝的实例
public class GuavaFutureDemo {
public static final int SLEEP_GAP=3;
static class HotWaterJob implements Callable<Boolean>{
@Override
public Boolean call() throws Exception {
try {
System.out.println("洗好水壶");
System.out.println("烧开水");
TimeUnit.SECONDS.sleep(SLEEP_GAP);
System.out.println("水开了");
}catch (Exception e){
e.printStackTrace();
return false;
}
return true;
}
}
static class WashJob implements Callable<Boolean>{
@Override
public Boolean call() throws Exception {
try{
System.out.println("洗茶杯");
TimeUnit.SECONDS.sleep(SLEEP_GAP);
System.out.println("洗完了");
}catch (Exception e){
System.out.println("清洗工作发生异常!");
return false;
}
System.out.println("清洗工作运行结束!");
return true;
}
}
static class DrinkJob{
boolean waterOk =false;
boolean cupOk =false;
public void drinkTea(){
if (waterOk&&cupOk){
System.out.println("泡茶喝,茶喝完!");
this.waterOk=false;
}
}
}
public static void main(String[] args)throws Exception {
Thread.currentThread().setName("泡茶喝线程");
//新启动一个线程,作为泡茶主线程
DrinkJob drinkJob = new DrinkJob();
//烧水的业务逻辑
Callable<Boolean> hotJob = new HotWaterJob();
//清晰的业务逻辑
Callable<Boolean> washJob = new HotWaterJob();
//创建java线程池
ExecutorService jPool = Executors.newFixedThreadPool(10);
//包装java线程池,构造guava线程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
//烧水的回调
FutureCallback<Boolean> hotWaterHook = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean r) {
if (r){
drinkJob.waterOk=true;
drinkJob.drinkTea();
}
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("喝nm!");
}
};
//启动烧水线程
ListenableFuture<Boolean> hotFuture = gPool.submit(hotJob);
//设置烧水任务的回调钩子
Futures.addCallback(hotFuture, hotWaterHook);
//启动清洗线程
ListenableFuture<Boolean> washFuture = gPool.submit(washJob);
//使用匿名实例,作为清洗之后的回调钩子
Futures.addCallback(washFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean r) {
if (r){
drinkJob.cupOk=true;
//执行回调
drinkJob.drinkTea();
}
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("喝nm!");
}
});
System.out.println("干点其他事!");
TimeUnit.SECONDS.sleep(1);
System.out.println("执行完成!");
}
}
以上结果,烧水线程为pool-1-thread-1,清洗线程为pool-1-thread-2,在二者完成之前,泡茶喝线程已经执行完了。泡茶喝的工作在异步回调方法drinkTea(中执行,执行的线程并不是“泡茶喝”线程,而是烧水线程和清洗线程。
5.Guava异步回调和Java异步回调的区别
总结一下Guava异步回调和Java的FutureTask异步调用的区别,具体如下:
- FutureTask是主动调用的模式,“调用线程”主动获得异步结果,在获取异步结果时处于阻塞状态,并且会一直阻塞,直到拿到异步线程的结果。
- Guava是异步回调模式,“调用线程”不会主动获得异步结果,而是准备好回调函数,并设置好回调钩子,执行回调函数的并不是“调用线程”自身,回调承数的执行者是“被调用线程”,“调用线程”在执行完自己的业务逻辑后就已经结束了,当回调采数被执行时,“调用线程”可能已经结束很久了。
二,Netty的异步回调模式
Netty官方文档说明Netty的网络操作都是异步的。Netty源码中大量使用了异步回调处理模式。在Netty的业务开发层面,处于Netty应用的 Handler处理程序中的业务处理代码也都是异步执行的。所以,了解 Netty的异步回调,无论是Netty应用开发还是源码级开发都是十分重要的。
Netty和Guava一样,实现了自己的异步回调体系:Netty继承和扩展了JDKFuture系列异步回调的API,定义了自身的Future系列接口和类,实现了异步任务的监控、异步执行结果的获取。
总体来说,Netty对JavaFuture异步任务的扩展如下:
继承Java的Future接口得到了一个新的属于Netty自己的Future异步任务接口,该接口对原有的接口进行了增强,使得Netty异步任务能够非阻塞地处理回调结果。注意,Netty没有修改Future的名称,只是调整了所在的包名,Netty的Future类的包名和Java的Future接口的包不同。
引入了一个新接口—GenericFutureListener,用于表示异步执行完成的监听器。这个接口和Guava的FutureCallback回调接口不同。Nettv使用了监听器的模式,异步任务执行完成后的回调逻辑抽象成了Listener监听器接口。可以将Netty的GenericFutureListener监听器接口加入Netty异步任务Future中,实现对异步任务执行状态的事件监听。
总体来说,在异步非阻塞回调的设计思路上,Netty和Guava是一致的。对应关系为:
Netty的Future接口可以对应到Guava的ListenableFuture接口。
Netty的GenericFutureListener接口可以对应到Guava的 FutureCallback接口。
1.GenericFutureListener
前面提到,和Guava的FutureCallback一样,Netty新增了一个接口,用来封装异步非阻塞回调的逻辑,那就是GenericFutureListener接口。
GenericFutureListener位于io.netty.util.concurrent包中,源码如下:
package io.netty.util.concurrent; import java.util.EventListener
public interface GenericFutureListener<F extends Future<?>> extends Eventlistener{
//监听器的回调方法
void operationComplete(F var1) throws Exception;
}
GenericFutureListener拥有一个回调方法operationCompleteO,表示异步任务操作完成。在Future异步任务执行完成后将回调此方法。大多数情况下,Netty的异步回调代码编写在GenericFutureListener接口的实现类的operationComplete方法中。
说明一下,GenericFutureListener的父接口EventListener是一个空接口,没有任何抽象方法,是一个仅仅具有标识作用的接口。
2.Netty的Future接口
Netty也对Java的Future接口进行了扩展,并且名称没有变,还是叫作Future接口,实现在io.nettyutil.concurrent包中。
和Guava的ListenableFuture一样,Netty的Future接口扩展了一系列方法,对执行的过程进行监控,对异步回调完成事件进行Listen监听并且回调。
Netty的Future接口一般不会直接使用,使用过程中会使用它的子接口。Netty有一系列子接口,代表不同类型的异步任务,如ChannelFuture接口。
ChannelFuture子接口表示Channel通道I/0操作的异步任务,如果在 Channel的异步I/0操作完成后需要执行回调操作,就需要使用到 ChannelFuture接口。
3.ChannelFuture
在Netty网络编程中,网络连接通道的输入、输出处理都是异步进行的,都会返回一个ChannelFuture接口的实例。通过返回的异步任务实例可以为其增加异步回调的监听器。在异步任务真正完成后,回调执行。
Netty的网络连接的异步回调实例代码如下:
//connect是异步的,仅仅是提交异步任务
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com,80));
//connect的异步任务真正执行完成后,future回调监听器会执行
future.addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess))
System.out.println("Connection established");
else {
System.err.println("Connection attempt failed");
channelFuture.cause().printStackTrace);
}
}
}
GenericFutureListener接口在Netty中是一个基础类型接口。在网络编程的异步回调中,一般使用Netty中提供的某个子接口,如ChannelFutureListener接口。在上面的代码中,使用到的是这个子接口。
4.Netty的出站和入站异步回调
Netty的出站和入站操作都是异步的。异步回调的方法和前面Netty建立的异步回调是一样的。
下面以经典的NIO出站操作write为例说明ChannelFuture的使用。
在write操作调用后,Netty并没有立即完成对JavaNIO底层连接的写入操作,底层的写入操作是异步执行的,代码如下:
//write()输出方法,返回的是一个异步任务
ChannelFuture future=ctx.channel0.write(msg);//为异步任务加上监听器
future.addListener(
new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future){
// write操作完成后的回调代码
}
});
在write操作完成后立即返回,返回的是一个ChannelFuture接口的实例。通过这个实例可以绑定异步回调监听器,编写异步回调的逻辑。