在Reactor经典模型中,反应器查询到IO事件后会分发到Handler业务处理器,由Handler完成IO操作和业务处理。
1、Handler处理流程
整个IO操作环节大致包括从通道读取数据包(Netty底层负责),数据包解码,业务处理,目标数据编码,把数据包写到通道,然后由通道发到对端(Netty底层负责)。
前文提到Netty的Handler分为两类,ChannelInboundHandler 入站处理器,ChannelOutboundHandler 出站处理器
数据包解码,业务处理 两个环节属于入站处理器工作
目标数据编码,把数据包写到通道 两个环节属于出站处理器工作
2、入站处理器
当对端数据入站到Netty通道时,Netty将触发ChannelInboundHandler 所对应的入站API,进行入站操作处理
ChannelInboundHandler核心方法:
方法 | 作用 |
---|---|
channelRegistered(ChannelHandlerContext var1) | |
channelUnregistered(ChannelHandlerContext var1) | |
channelActive(ChannelHandlerContext var1) | |
channelInactive(ChannelHandlerContext var1) | |
channelRead(ChannelHandlerContext var1, Object var2) | |
channelReadComplete(ChannelHandlerContext var1) | |
userEventTriggered(ChannelHandlerContext var1, Object var2) | |
channelWritabilityChanged(ChannelHandlerContext var1) | |
exceptionCaught(ChannelHandlerContext var1, Throwable var2) |
3、出站处理器
当业务处理完成后,需要操作java NIO通道时,通过一系列的ChannelOutboundHandler出站处理器完成Netty通道到底层通道的操作,比如建立底层连接,断开底层连接,写入底层Java NIO通道等。
ChannelOutboundHandler核心方法
方法 | 作用 |
---|---|
bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) | 监听地址IP +端口绑定,如果为TCP协议,该方法适用于服务端 |
connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) | 连接服务端,如果为TCP协议,该方法适用于客户端 |
disconnect(ChannelHandlerContext var1, ChannelPromise var2) | |
close(ChannelHandlerContext var1, ChannelPromise var2) | 主动关闭底层通道 |
deregister(ChannelHandlerContext var1, ChannelPromise var2) | |
read(ChannelHandlerContext var1) | 完成Netty从Java IO通道的数据读取 |
write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) | 触发Netty通道向底层java IO通道的数据写入操作 |
flush(ChannelHandlerContext var1) | 将底层缓冲区数据刷出,立即写到对端 |
4、ChannelInitalizer
每个Netty通道拥有一条Handler业务处理流水线,负责装配自己的Handler业务处理器。那么如何向流水线装配业务处理器呢?这就需要借助通道的初始化处理器
ChannelInitalizer。
在通道初始化时,会调用提前注册的初始化处理器的initChannel()
方法
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
//有连接到达时会创建一个通道的子通道
protected void initChannel(NioSocketChannel ch) throws Exception {
//向子通道的流水线添加Handler业务处理器
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info(byteBuf.toString(Charset.defaultCharset()));
//将消息传递到流水线的下一个Handler
ctx.fireChannelRead(msg);
}
}).addLast(longTimeHandleGroup,"longTimeHandler",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info("longTimeHandler" + byteBuf.toString(Charset.defaultCharset()));
}
});
}
})
5、ChannelInboundHandler生命周期
方法类型 | 方法 | 说明 |
---|---|---|
生命周期方法 | handlerAdded() | 当业务处理器被添加到流水线后,此方法被回调 |
channelRegistered() | 当通道成功绑定一个EventLoop后,此方法被回调 | |
channelActive() | 当通道激活后,此方法被回调。通道激活是指所有业务处理器添加注册的异步任务完成,并且与EventLoop反应器绑定的异步任务完成 | |
channelInactive() | 当通道的底层连接已经不是ESTABLISH状态或者底层连接已经关闭时,回调所有业务处理器的channelInactive方法 | |
channelUnregistered() | 通道和EventLoop解除绑定,移除掉对这条通道的事件处理后,回调所有业务处理器的channelUnregistered方法 | |
handlerRemoved() | netty会移除通道上的所有的业务处理器,并回调所有业务处理器的handlerRemoved方法 | |
数据入站回调方法 | channelRead() | 有数据包入站,通道可读。流水线启动入站处理流程,从前向后,入站处理器的channelRead方法会被依次回调 |
channelReadComplete() | 流水线完成入站处理后,会从前向后依次回调每个入站处理器的channelReadComplete方法,表示数据读取完毕 |
执行顺序:
- 通道创建和绑定时: handlerAdded() -> channelRegistered() -> channelActive()
- 数据传输时: channelRead() -> channelReadComplete()
- 通道关闭时:channelInactive() -> channelUnregistered() -> handlerRemoved()
@Slf4j
public class TestHandlerLife {
public static void main(String[] args) {
new ServerBootstrap()
//boos只负责NioServerSocketChannel的accept worker负责NioSocketChannel上的读写
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("调用: channelRegistered");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("调用: channelUnregistered");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("调用: channelActive");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("调用: channelInactive");
super.channelInactive(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("调用: channelReadComplete");
super.channelReadComplete(ctx);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
log.info("调用: channelWritabilityChanged");
super.channelWritabilityChanged(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("调用: handlerAdded");
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("调用: handlerRemoved");
super.handlerRemoved(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("调用: channelRead");
ByteBuf byteBuf = (ByteBuf) msg;
log.info(byteBuf.toString(Charset.defaultCharset()));
//将消息传递到流水线的下一个Handler
ctx.fireChannelRead(msg);
}
});
}
})
.bind(8989);
}
}
public class BootStrapClient {
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后调用
@Override
protected void initChannel(NioSocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8989))
//阻塞方法,直至连接建立
.sync()
//代表连接对象
.channel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String str = scanner.next();
if(!"quit".equals(str)){
channel.writeAndFlush(str);
}else {
channel.close().sync();
log.info("客户端断开");
}
}
}
}