在Reactor经典模型中,反应器查询到IO事件后会分发到Handler业务处理器,由Handler完成IO操作和业务处理。
1、Handler处理流程
整个IO操作环节大致包括从通道读取数据包(Netty底层负责),数据包解码,业务处理,目标数据编码,把数据包写到通道,然后由通道发到对端(Netty底层负责)。
前文提到Netty的Handler分为两类,ChannelInboundHandler 入站处理器,ChannelOutboundHandler 出站处理器
数据包解码,业务处理 两个环节属于入站处理器工作
目标数据编码,把数据包写到通道 两个环节属于出站处理器工作
2、入站处理器
当对端数据入站到Netty通道时,Netty将触发ChannelInboundHandler 所对应的入站API,进行入站操作处理
ChannelInboundHandler核心方法:
| 方法 | 作用 |
|---|---|
| channelRegistered(ChannelHandlerContext var1) | |
| channelUnregistered(ChannelHandlerContext var1) | |
| channelActive(ChannelHandlerContext var1) | |
| channelInactive(ChannelHandlerContext var1) | |
| channelRead(ChannelHandlerContext var1, Object var2) | |
| channelReadComplete(ChannelHandlerContext var1) | |
| userEventTriggered(ChannelHandlerContext var1, Object var2) | |
| channelWritabilityChanged(ChannelHandlerContext var1) | |
| exceptionCaught(ChannelHandlerContext var1, Throwable var2) |
3、出站处理器
当业务处理完成后,需要操作java NIO通道时,通过一系列的ChannelOutboundHandler出站处理器完成Netty通道到底层通道的操作,比如建立底层连接,断开底层连接,写入底层Java NIO通道等。
ChannelOutboundHandler核心方法
| 方法 | 作用 |
|---|---|
| bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) | 监听地址IP +端口绑定,如果为TCP协议,该方法适用于服务端 |
| connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) | 连接服务端,如果为TCP协议,该方法适用于客户端 |
| disconnect(ChannelHandlerContext var1, ChannelPromise var2) | |
| close(ChannelHandlerContext var1, ChannelPromise var2) | 主动关闭底层通道 |
| deregister(ChannelHandlerContext var1, ChannelPromise var2) | |
| read(ChannelHandlerContext var1) | 完成Netty从Java IO通道的数据读取 |
| write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) | 触发Netty通道向底层java IO通道的数据写入操作 |
| flush(ChannelHandlerContext var1) | 将底层缓冲区数据刷出,立即写到对端 |
4、ChannelInitalizer
每个Netty通道拥有一条Handler业务处理流水线,负责装配自己的Handler业务处理器。那么如何向流水线装配业务处理器呢?这就需要借助通道的初始化处理器
ChannelInitalizer。
在通道初始化时,会调用提前注册的初始化处理器的initChannel()方法
.childHandler(new ChannelInitializer<NioSocketChannel>() {@Override//有连接到达时会创建一个通道的子通道protected void initChannel(NioSocketChannel ch) throws Exception {//向子通道的流水线添加Handler业务处理器ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;log.info(byteBuf.toString(Charset.defaultCharset()));//将消息传递到流水线的下一个Handlerctx.fireChannelRead(msg);}}).addLast(longTimeHandleGroup,"longTimeHandler",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;log.info("longTimeHandler" + byteBuf.toString(Charset.defaultCharset()));}});}})
5、ChannelInboundHandler生命周期
| 方法类型 | 方法 | 说明 |
|---|---|---|
| 生命周期方法 | handlerAdded() | 当业务处理器被添加到流水线后,此方法被回调 |
| channelRegistered() | 当通道成功绑定一个EventLoop后,此方法被回调 | |
| channelActive() | 当通道激活后,此方法被回调。通道激活是指所有业务处理器添加注册的异步任务完成,并且与EventLoop反应器绑定的异步任务完成 | |
| channelInactive() | 当通道的底层连接已经不是ESTABLISH状态或者底层连接已经关闭时,回调所有业务处理器的channelInactive方法 | |
| channelUnregistered() | 通道和EventLoop解除绑定,移除掉对这条通道的事件处理后,回调所有业务处理器的channelUnregistered方法 | |
| handlerRemoved() | netty会移除通道上的所有的业务处理器,并回调所有业务处理器的handlerRemoved方法 | |
| 数据入站回调方法 | channelRead() | 有数据包入站,通道可读。流水线启动入站处理流程,从前向后,入站处理器的channelRead方法会被依次回调 |
| channelReadComplete() | 流水线完成入站处理后,会从前向后依次回调每个入站处理器的channelReadComplete方法,表示数据读取完毕 |
执行顺序:
- 通道创建和绑定时: handlerAdded() -> channelRegistered() -> channelActive()
- 数据传输时: channelRead() -> channelReadComplete()
- 通道关闭时:channelInactive() -> channelUnregistered() -> handlerRemoved()
@Slf4jpublic class TestHandlerLife {public static void main(String[] args) {new ServerBootstrap()//boos只负责NioServerSocketChannel的accept worker负责NioSocketChannel上的读写.group(new NioEventLoopGroup(),new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info("调用: channelRegistered");super.channelRegistered(ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info("调用: channelUnregistered");super.channelUnregistered(ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("调用: channelActive");super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("调用: channelInactive");super.channelInactive(ctx);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {log.info("调用: channelReadComplete");super.channelReadComplete(ctx);}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {log.info("调用: channelWritabilityChanged");super.channelWritabilityChanged(ctx);}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("调用: handlerAdded");super.handlerAdded(ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("调用: handlerRemoved");super.handlerRemoved(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("调用: channelRead");ByteBuf byteBuf = (ByteBuf) msg;log.info(byteBuf.toString(Charset.defaultCharset()));//将消息传递到流水线的下一个Handlerctx.fireChannelRead(msg);}});}}).bind(8989);}}
public class BootStrapClient {public static void main(String[] args) throws InterruptedException {Channel channel = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {//连接建立后调用@Overrideprotected void initChannel(NioSocketChannel sc) throws Exception {sc.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress("localhost", 8989))//阻塞方法,直至连接建立.sync()//代表连接对象.channel();Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String str = scanner.next();if(!"quit".equals(str)){channel.writeAndFlush(str);}else {channel.close().sync();log.info("客户端断开");}}}}
