在进行异步方式处理任务时,会用到 Future 与 Promise 接口,Netty的Future继承自JDK的Future,而Promise 又对 Netty Future 进行了扩展

  1. JDK Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  2. Netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  3. Netty Promise 不仅有Netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

基本介绍:

  1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者
  2. Netty 中的I/0操作是异步的,包括Bind、Write、 Connect 等操作会简单的返回一个ChannelFuture
  3. 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果
  4. Netty 的异步模型是建立在 Future 和 Callback 之上的。核心思想是假设一个方法计算过程可能非常耗时,等待返回显然不合适。那么可以在调用的时候,立马返回一个 Future,后续可以通过 Future 去监控方法的处理过程 ( 即 Future-Listener 机制 )

常用方法:

方法 Future Promise
getNow 获取任务结果,非阻塞,未有结果时返回Null 与Future一致
await 等待任务结束,如果任务失败不会抛出异常,需要通过isSuccess判断任务是否失败 与Future一致
sync 异步阻塞方式等待任务结束,如果任务失败将抛出异常 与Future一致
isSuccess 判断任务是否成功 与Future一致
cause 获取失败信息,非阻塞,如果没有失败返回Null 与Future一致
addListener 添加回调,异步接收结果 与Future一致
setSuccess / 设置成功结果
ssetFailure / 设置失败结果

Future:

普通回调:

get 方法与 JDK 的 Future get 方法含义一致,属于阻塞获取方法,该案例为普通回调方法演示

  1. package channelfuture;
  2. import io.netty.channel.EventLoop;
  3. import io.netty.channel.EventLoopGroup;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.util.concurrent.Future;
  6. import io.netty.util.concurrent.GenericFutureListener;
  7. import java.util.concurrent.Callable;
  8. import java.util.concurrent.TimeUnit;
  9. /* Netty的Future*/
  10. public class NettyFuture {
  11. public static void main(String[] args) throws Exception {
  12. EventLoopGroup workGroup = new NioEventLoopGroup();
  13. EventLoop nextLoop = workGroup.next();
  14. Future<Integer> future = nextLoop.submit(new Callable<Integer>() {
  15. @Override
  16. public Integer call() throws Exception {
  17. System.out.println("异步任务开始执行,当前是: "+Thread.currentThread().getName());
  18. TimeUnit.SECONDS.sleep(20);
  19. System.out.println("异步任务执行完成,当前是: "+Thread.currentThread().getName());
  20. return 70;
  21. }
  22. });
  23. //使用回调函数进行结果接收,任务未完成之前不会触发
  24. future.addListener(new GenericFutureListener<Future<? super Integer>>() {
  25. @Override
  26. public void operationComplete(Future<? super Integer> future) throws Exception {
  27. Object now = future.get();
  28. System.out.println("回调函数已获取到值,是: "+now+",当前线程是: "+Thread.currentThread().getName());
  29. }
  30. });
  31. TimeUnit.SECONDS.sleep(5);
  32. System.out.println("主线程继续其他操作...");
  33. }
  34. }

image.png
异步回调成功

通道回调:

当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作

  1. isDone 判断当前操作是否完成
  2. isSuccess 判断当前操作是否成功
  3. getCause 获取已完成的当前操作失败的原因
  4. isCancelled 判断已完成的当前操作是否被取消
  5. addListener 注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器,如果Future对象已完成,则通知指定的监听器

案例:
修改组件讲解与入门案例中案例演示三中的服务端代码,为绑定端口事件添加监听器

  1. package simple;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.nio.NioEventLoopGroup;
  5. import io.netty.channel.socket.SocketChannel;
  6. import io.netty.channel.socket.nio.NioServerSocketChannel;
  7. /* 服务端代码 */
  8. public class NettyServer {
  9. public static void main(String[] args) throws InterruptedException {
  10. /* 创建 BossGroup 和 WorkerGroup 线程组
  11. * BossGroup 只处理连接请求,真正的和客户端业务处理会交给 WorkGrouop 完成
  12. * bossGroup 和 workerGroup 含有的子线程的个数默认为 CPU最大线程数 X 2,可以手动指定
  13. */
  14. EventLoopGroup bossGroup = new NioEventLoopGroup(1); //创建子线程的个数为1的 bossGroup
  15. EventLoopGroup workerGroup = new NioEventLoopGroup(8); //创建子线程的个数为8的 workerGroup
  16. try {
  17. //创建服务器端的启动对象,配置参数
  18. ServerBootstrap bootstrap = new ServerBootstrap();
  19. //使用链式参数配置启动参数
  20. bootstrap.group(bossGroup,workerGroup) //设置两个线程组
  21. .channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器通道实现
  22. .option(ChannelOption.SO_BACKLOG,120) //设置线程队列等待连接的个数
  23. .childOption(ChannelOption.SO_KEEPALIVE,true) //保持活动连接状态
  24. .childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
  25. //给PipeLine设置处理器
  26. @Override
  27. protected void initChannel(SocketChannel ch) throws Exception {
  28. ch.pipeline().addLast(new NettyServerHandler()); //将自定义处理器加入到PipeLine
  29. }
  30. }); //给 WorkerGroup 的 EventLoop 对应的管道设置处理器
  31. System.out.println("服务器 is ready .....");
  32. //启动服务器绑定一个端口并且同步,生成一个 ChannelFuture 对象
  33. ChannelFuture cf = bootstrap.bind(6666).sync();
  34. //给CF注册监听器,监控关心的事件
  35. cf.addListener(new ChannelFutureListener() {
  36. @Override
  37. public void operationComplete(ChannelFuture channelFuture) throws Exception {
  38. if(cf.isSuccess()){
  39. System.out.println("监听端口成功");
  40. }else{
  41. System.out.println("监听端口失败");
  42. }
  43. }
  44. });
  45. //对关闭通道进行监听(当有关闭通道的消息时才进行监听)
  46. cf.channel().closeFuture().sync();
  47. } finally {
  48. bossGroup.shutdownGracefully(); //关闭资源
  49. workerGroup.shutdownGracefully(); //关闭
  50. }
  51. }
  52. }

image.png
运行结果

Promise:

相比Future可以自定义成功与失败的期望结果,通常用于RPC框架的编写

  1. package channelfuture;
  2. import io.netty.channel.EventLoop;
  3. import io.netty.channel.nio.NioEventLoopGroup;
  4. import io.netty.util.concurrent.DefaultPromise;
  5. import java.util.concurrent.ExecutionException;
  6. import java.util.concurrent.TimeUnit;
  7. /* Netty的Promise案例*/
  8. public class NettyPromise {
  9. public static void main(String[] args) throws ExecutionException, InterruptedException {
  10. //创建EventLoop对象
  11. EventLoop eventLoop = new NioEventLoopGroup().next();
  12. //可以主动创建Promise,是结果容器
  13. DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
  14. new Thread(() -> {
  15. //线程开始执行计算
  16. System.out.println("开始计算...");
  17. try {
  18. TimeUnit.SECONDS.sleep(2);
  19. int a = 1/0;
  20. promise.setSuccess(80); //添加计算方法结束后想返回的正确结果
  21. } catch (Exception e) {
  22. promise.setSuccess(90); //添加计算方法遇到异常结束后想返回的失败结果
  23. }
  24. }).start();
  25. System.out.println("主线程等待结果.....");
  26. System.out.println("异步任务执行完毕,结果是: "+promise.get());
  27. }
  28. }

image.png
返回设定的失败结果