在进行异步方式处理任务时,会用到 Future 与 Promise 接口,Netty的Future继承自JDK的Future,而Promise 又对 Netty Future 进行了扩展
JDK Future 只能同步等待任务结束(或成功、或失败)才能得到结果
Netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
Netty Promise 不仅有Netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
基本介绍:
- 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者
- Netty 中的I/0操作是异步的,包括Bind、Write、 Connect 等操作会简单的返回一个ChannelFuture
- 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果
- Netty 的异步模型是建立在 Future 和 Callback 之上的。核心思想是假设一个方法计算过程可能非常耗时,等待返回显然不合适。那么可以在调用的时候,立马返回一个 Future,后续可以通过 Future 去监控方法的处理过程 ( 即 Future-Listener 机制 )
常用方法:
方法 | Future | Promise |
---|---|---|
getNow | 获取任务结果,非阻塞,未有结果时返回Null | 与Future一致 |
await | 等待任务结束,如果任务失败不会抛出异常,需要通过isSuccess判断任务是否失败 | 与Future一致 |
sync | 异步阻塞方式等待任务结束,如果任务失败将抛出异常 | 与Future一致 |
isSuccess | 判断任务是否成功 | 与Future一致 |
cause | 获取失败信息,非阻塞,如果没有失败返回Null | 与Future一致 |
addListener | 添加回调,异步接收结果 | 与Future一致 |
setSuccess | / | 设置成功结果 |
ssetFailure | / | 设置失败结果 |
Future:
普通回调:
get 方法与 JDK 的 Future get 方法含义一致,属于阻塞获取方法,该案例为普通回调方法演示
package channelfuture;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/* Netty的Future*/
public class NettyFuture {
public static void main(String[] args) throws Exception {
EventLoopGroup workGroup = new NioEventLoopGroup();
EventLoop nextLoop = workGroup.next();
Future<Integer> future = nextLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("异步任务开始执行,当前是: "+Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(20);
System.out.println("异步任务执行完成,当前是: "+Thread.currentThread().getName());
return 70;
}
});
//使用回调函数进行结果接收,任务未完成之前不会触发
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
Object now = future.get();
System.out.println("回调函数已获取到值,是: "+now+",当前线程是: "+Thread.currentThread().getName());
}
});
TimeUnit.SECONDS.sleep(5);
System.out.println("主线程继续其他操作...");
}
}
异步回调成功
通道回调:
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作
isDone 判断当前操作是否完成
isSuccess 判断当前操作是否成功
getCause 获取已完成的当前操作失败的原因
isCancelled 判断已完成的当前操作是否被取消
addListener 注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器,如果Future对象已完成,则通知指定的监听器
案例:
修改组件讲解与入门案例中案例演示三中的服务端代码,为绑定端口事件添加监听器
package simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/* 服务端代码 */
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
/* 创建 BossGroup 和 WorkerGroup 线程组
* BossGroup 只处理连接请求,真正的和客户端业务处理会交给 WorkGrouop 完成
* bossGroup 和 workerGroup 含有的子线程的个数默认为 CPU最大线程数 X 2,可以手动指定
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1); //创建子线程的个数为1的 bossGroup
EventLoopGroup workerGroup = new NioEventLoopGroup(8); //创建子线程的个数为8的 workerGroup
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式参数配置启动参数
bootstrap.group(bossGroup,workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作为服务器通道实现
.option(ChannelOption.SO_BACKLOG,120) //设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE,true) //保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象
//给PipeLine设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler()); //将自定义处理器加入到PipeLine
}
}); //给 WorkerGroup 的 EventLoop 对应的管道设置处理器
System.out.println("服务器 is ready .....");
//启动服务器绑定一个端口并且同步,生成一个 ChannelFuture 对象
ChannelFuture cf = bootstrap.bind(6666).sync();
//给CF注册监听器,监控关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(cf.isSuccess()){
System.out.println("监听端口成功");
}else{
System.out.println("监听端口失败");
}
}
});
//对关闭通道进行监听(当有关闭通道的消息时才进行监听)
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully(); //关闭资源
workerGroup.shutdownGracefully(); //关闭
}
}
}
运行结果
Promise:
相比Future可以自定义成功与失败的期望结果,通常用于RPC框架的编写
package channelfuture;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/* Netty的Promise案例*/
public class NettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建EventLoop对象
EventLoop eventLoop = new NioEventLoopGroup().next();
//可以主动创建Promise,是结果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
//线程开始执行计算
System.out.println("开始计算...");
try {
TimeUnit.SECONDS.sleep(2);
int a = 1/0;
promise.setSuccess(80); //添加计算方法结束后想返回的正确结果
} catch (Exception e) {
promise.setSuccess(90); //添加计算方法遇到异常结束后想返回的失败结果
}
}).start();
System.out.println("主线程等待结果.....");
System.out.println("异步任务执行完毕,结果是: "+promise.get());
}
}
返回设定的失败结果