一,Guava的异步回调模式

Guava是Google提供的Java扩展包,它提供了一种异步回调的解决方案。Guava中与异步回调相关的源码处于com.google.commonutilconcurrent包中。包中的很多类都用于对
javautilconcurrent的能力扩展和能力增强。比如,Guava的异步任务接口ListenableFuture扩展了Java的Future接口,实现了异步回调的能力。

1.FutureCallback

总体来说,Guava主要增强了Java而不是另起炉灶。为了实现异步回调方式获取异步线程的结果,Guava做了以下增强:

  • 引入了一个新的接口ListenableFuture,继承了Java的Future接口,使得Java的Future异步任务在Guava中能被监控和非阻塞获取异步结果。
  • 引入了一个新的接口FutureCallback,这是一个独立的新接口。该接口的目的是在异步任务执行完成后,根据异步结果完成不同的回调处理,并且可以处理异步结果。

FutureCallback是一个新增的接口,用来填写异步任务执行完后的监听逻辑。FutureCallback拥有两个日调方法:

  • onSuccess()方法,在异步任务执行成功后被回调。调用时,异步任务的执行结果作为onSuccess方法的参数被传入。
  • onFailure0方法,在异步任务执行过程中抛出异常时被回调。调用时,异步任务所抛出的异常作为onFailure方法的参数被传入。

Guava的FutureCallback与Java的Callable名字相近,实质不同,存在本质的区别:

(1)Java的Callable接口代表的是异步执行的逻辑。

(2)Guava的FutureCallback接口代表的是Callable异步逻辑执行完成之后,根据成功或者异常两种情形所需要执行的善后工作。

Guava是对JavaFuture异步回调的增强,使用Guava异步回调也需要用到Java的Callable接口。简单地说,只有在Java的Callable任务执行结果出来后,才可能执行Guava中的FutureCallback结果回调。 Guava如何实现异步任务Callable和结果回调FutureCallback之间的监控关系呢?Guava引入了一个新接口ListenableFuture,它继承了Java的 Future接口,增强了被监控的能力。

2.ListenableFuture

Guava的ListenableFuture接口是对Java的Future接口的扩展,可以理解为异步任务实例:

  1. public interface ListenableFuture<V> extends Future<V>{
  2. //此方法由guava内部调用
  3. void addListener(Runnable r , Executor e)
  4. }

ListenableFuture仅仅增加了一个addListener方法。它的作用就是将FutureCallback善后回调逻辑封装成一个内部的Runnable异步口调任务,在Callable异步任务完成后回调FutureCallback善后逻辑。

注意,此addListener(方法只在Guava内部调用,在实际编程中,addListener()不会使用到。

在实际编程中,如何将FutureCallback回调逻辑绑定到异步的 ListenableFuture任务呢?可以使用Guava的Futures工具类,它有一个 addCallback0静态方法,可以将FutureCallback的回调实例绑定到 ListenableFuture异步任务。下面是一个简单的绑定实例:

  1. Futures.addCallback(listenableFuture,new FutureCallback<Boolean>){
  2. public void onSuccess(Boolean r){
  3. // listenableFuture内的Callable 成功时回调此方法
  4. }
  5. public void onFailure(Throwable t){
  6. // listenableFuture内的Callable 异常时回调此方法
  7. }
  8. }

Guava的ListenableFuture接口是对Java的Future接口的扩展,都表示异步任务,那么Guava的异步任务实例从何而来?

3.ListenableFuture异步任务

如果要获取Guava的ListenableFuture异步任务实例,主要通过向线程池(ThreadPool)提交Callable任务的方式获取。不过,这里所说的线程池不是Java的线程池,而是经过Guava自己定制过的Guava线程池。

Guava线程池是对Java线程池的一种装饰。创建Guava线程池的方法如下:

  1. //java线程池
  2. ExecutorService jPool = Executors.newFixedThreadPool(10);
  3. //Guava线程池
  4. ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);

首先创建Java线程池,然后以其作为Guava线程池的参数再构造一个Guava线程池。有了Guava的线程池之后,就可以通过submit()方法来提交任务了,任务提交之后的返回结果就是我们所要的ListenableFuture异步任务实例。

简单来说,获取异步任务实例的方式是通过向线程池提交Callable业务逻辑来实现,代码如下:

  1. //submit()用来提交任务,返回异步任务实例
  2. ListenableFuture<Boolean> hFuture = gPool.submit(hJob);
  3. //绑定回调实例
  4. Futures.addCallback(listenableFuture, new FutureCallback<Boolean>(){
  5. //有两种实现回调的方法
  6. });

取到了ListenableFuture实例后,通过Futures.addCallback0方法将 FutureCallback回调逻辑的实例绑定到ListenableFuture异步任务实例,实现异步执行完成后的回调。

总结一下,Guava异步回调的流程如下:

  1. 实现Java的Callable接口,创建异步执行逻辑。还有一种情况,如果不需要返回值,异步执行逻辑也可以实现Runnable接口。
  2. 创建Guava线程池。
  3. 将(1)创建的Callable/Runnable异步执行逻辑的实例提交到 Guava线程池,从而获取ListenableFuture异步任务实例。
  4. 创建FutureCallback回调实例,通过FuturesaddCallback将回调实例绑定到ListenableFuture异步任务上。

完成以上4步,当Callable/Runnable异步执行逻辑完成后,就会回调 FutureCallback实例的回调方法onSuccess()/onFailure()。

4.使用Guava实现泡茶喝的实例

1.png

  1. public class GuavaFutureDemo {
  2. public static final int SLEEP_GAP=3;
  3. static class HotWaterJob implements Callable<Boolean>{
  4. @Override
  5. public Boolean call() throws Exception {
  6. try {
  7. System.out.println("洗好水壶");
  8. System.out.println("烧开水");
  9. TimeUnit.SECONDS.sleep(SLEEP_GAP);
  10. System.out.println("水开了");
  11. }catch (Exception e){
  12. e.printStackTrace();
  13. return false;
  14. }
  15. return true;
  16. }
  17. }
  18. static class WashJob implements Callable<Boolean>{
  19. @Override
  20. public Boolean call() throws Exception {
  21. try{
  22. System.out.println("洗茶杯");
  23. TimeUnit.SECONDS.sleep(SLEEP_GAP);
  24. System.out.println("洗完了");
  25. }catch (Exception e){
  26. System.out.println("清洗工作发生异常!");
  27. return false;
  28. }
  29. System.out.println("清洗工作运行结束!");
  30. return true;
  31. }
  32. }
  33. static class DrinkJob{
  34. boolean waterOk =false;
  35. boolean cupOk =false;
  36. public void drinkTea(){
  37. if (waterOk&&cupOk){
  38. System.out.println("泡茶喝,茶喝完!");
  39. this.waterOk=false;
  40. }
  41. }
  42. }
  43. public static void main(String[] args)throws Exception {
  44. Thread.currentThread().setName("泡茶喝线程");
  45. //新启动一个线程,作为泡茶主线程
  46. DrinkJob drinkJob = new DrinkJob();
  47. //烧水的业务逻辑
  48. Callable<Boolean> hotJob = new HotWaterJob();
  49. //清晰的业务逻辑
  50. Callable<Boolean> washJob = new HotWaterJob();
  51. //创建java线程池
  52. ExecutorService jPool = Executors.newFixedThreadPool(10);
  53. //包装java线程池,构造guava线程池
  54. ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
  55. //烧水的回调
  56. FutureCallback<Boolean> hotWaterHook = new FutureCallback<Boolean>() {
  57. @Override
  58. public void onSuccess(Boolean r) {
  59. if (r){
  60. drinkJob.waterOk=true;
  61. drinkJob.drinkTea();
  62. }
  63. }
  64. @Override
  65. public void onFailure(Throwable throwable) {
  66. System.out.println("喝nm!");
  67. }
  68. };
  69. //启动烧水线程
  70. ListenableFuture<Boolean> hotFuture = gPool.submit(hotJob);
  71. //设置烧水任务的回调钩子
  72. Futures.addCallback(hotFuture, hotWaterHook);
  73. //启动清洗线程
  74. ListenableFuture<Boolean> washFuture = gPool.submit(washJob);
  75. //使用匿名实例,作为清洗之后的回调钩子
  76. Futures.addCallback(washFuture, new FutureCallback<Boolean>() {
  77. @Override
  78. public void onSuccess(Boolean r) {
  79. if (r){
  80. drinkJob.cupOk=true;
  81. //执行回调
  82. drinkJob.drinkTea();
  83. }
  84. }
  85. @Override
  86. public void onFailure(Throwable throwable) {
  87. System.out.println("喝nm!");
  88. }
  89. });
  90. System.out.println("干点其他事!");
  91. TimeUnit.SECONDS.sleep(1);
  92. System.out.println("执行完成!");
  93. }
  94. }

以上结果,烧水线程为pool-1-thread-1,清洗线程为pool-1-thread-2,在二者完成之前,泡茶喝线程已经执行完了。泡茶喝的工作在异步回调方法drinkTea(中执行,执行的线程并不是“泡茶喝”线程,而是烧水线程和清洗线程。

5.Guava异步回调和Java异步回调的区别

总结一下Guava异步回调和Java的FutureTask异步调用的区别,具体如下:

  1. FutureTask是主动调用的模式,“调用线程”主动获得异步结果,在获取异步结果时处于阻塞状态,并且会一直阻塞,直到拿到异步线程的结果。
  2. Guava是异步回调模式,“调用线程”不会主动获得异步结果,而是准备好回调函数,并设置好回调钩子,执行回调函数的并不是“调用线程”自身,回调承数的执行者是“被调用线程”,“调用线程”在执行完自己的业务逻辑后就已经结束了,当回调采数被执行时,“调用线程”可能已经结束很久了。

二,Netty的异步回调模式

Netty官方文档说明Netty的网络操作都是异步的。Netty源码中大量使用了异步回调处理模式。在Netty的业务开发层面,处于Netty应用的 Handler处理程序中的业务处理代码也都是异步执行的。所以,了解 Netty的异步回调,无论是Netty应用开发还是源码级开发都是十分重要的。

Netty和Guava一样,实现了自己的异步回调体系:Netty继承和扩展了JDKFuture系列异步回调的API,定义了自身的Future系列接口和类,实现了异步任务的监控、异步执行结果的获取。

总体来说,Netty对JavaFuture异步任务的扩展如下:

继承Java的Future接口得到了一个新的属于Netty自己的Future异步任务接口,该接口对原有的接口进行了增强,使得Netty异步任务能够非阻塞地处理回调结果。注意,Netty没有修改Future的名称,只是调整了所在的包名,Netty的Future类的包名和Java的Future接口的包不同。

引入了一个新接口—GenericFutureListener,用于表示异步执行完成的监听器。这个接口和Guava的FutureCallback回调接口不同。Nettv使用了监听器的模式,异步任务执行完成后的回调逻辑抽象成了Listener监听器接口。可以将Netty的GenericFutureListener监听器接口加入Netty异步任务Future中,实现对异步任务执行状态的事件监听。

总体来说,在异步非阻塞回调的设计思路上,Netty和Guava是一致的。对应关系为:

  1. Netty的Future接口可以对应到Guava的ListenableFuture接口。

  2. Netty的GenericFutureListener接口可以对应到Guava的 FutureCallback接口。

1.GenericFutureListener

前面提到,和Guava的FutureCallback一样,Netty新增了一个接口,用来封装异步非阻塞回调的逻辑,那就是GenericFutureListener接口。

GenericFutureListener位于io.netty.util.concurrent包中,源码如下:

  1. package io.netty.util.concurrent; import java.util.EventListener
  2. public interface GenericFutureListener<F extends Future<?>> extends Eventlistener{
  3. //监听器的回调方法
  4. void operationComplete(F var1) throws Exception;
  5. }

GenericFutureListener拥有一个回调方法operationCompleteO,表示异步任务操作完成。在Future异步任务执行完成后将回调此方法。大多数情况下,Netty的异步回调代码编写在GenericFutureListener接口的实现类的operationComplete方法中。

说明一下,GenericFutureListener的父接口EventListener是一个空接口,没有任何抽象方法,是一个仅仅具有标识作用的接口。

2.Netty的Future接口

Netty也对Java的Future接口进行了扩展,并且名称没有变,还是叫作Future接口,实现在io.nettyutil.concurrent包中。

和Guava的ListenableFuture一样,Netty的Future接口扩展了一系列方法,对执行的过程进行监控,对异步回调完成事件进行Listen监听并且回调。

Netty的Future接口一般不会直接使用,使用过程中会使用它的子接口。Netty有一系列子接口,代表不同类型的异步任务,如ChannelFuture接口。

ChannelFuture子接口表示Channel通道I/0操作的异步任务,如果在 Channel的异步I/0操作完成后需要执行回调操作,就需要使用到 ChannelFuture接口。

3.ChannelFuture

在Netty网络编程中,网络连接通道的输入、输出处理都是异步进行的,都会返回一个ChannelFuture接口的实例。通过返回的异步任务实例可以为其增加异步回调的监听器。在异步任务真正完成后,回调执行。

Netty的网络连接的异步回调实例代码如下:

  1. //connect是异步的,仅仅是提交异步任务
  2. ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com,80));
  3. //connect的异步任务真正执行完成后,future回调监听器会执行
  4. future.addListener(new ChannelFutureListener(){
  5. @Override
  6. public void operationComplete(ChannelFuture channelFuture) throws Exception {
  7. if(channelFuture.isSuccess))
  8. System.out.println("Connection established");
  9. else {
  10. System.err.println("Connection attempt failed");
  11. channelFuture.cause().printStackTrace);
  12. }
  13. }
  14. }

GenericFutureListener接口在Netty中是一个基础类型接口。在网络编程的异步回调中,一般使用Netty中提供的某个子接口,如ChannelFutureListener接口。在上面的代码中,使用到的是这个子接口。

4.Netty的出站和入站异步回调

Netty的出站和入站操作都是异步的。异步回调的方法和前面Netty建立的异步回调是一样的。

下面以经典的NIO出站操作write为例说明ChannelFuture的使用。

在write操作调用后,Netty并没有立即完成对JavaNIO底层连接的写入操作,底层的写入操作是异步执行的,代码如下:

  1. //write()输出方法,返回的是一个异步任务
  2. ChannelFuture future=ctx.channel0.write(msg);//为异步任务加上监听器
  3. future.addListener(
  4. new ChannelFutureListener(){
  5. @Override
  6. public void operationComplete(ChannelFuture future){
  7. // write操作完成后的回调代码
  8. }
  9. });

在write操作完成后立即返回,返回的是一个ChannelFuture接口的实例。通过这个实例可以绑定异步回调监听器,编写异步回调的逻辑。