在Reactor经典模型中,反应器查询到IO事件后会分发到Handler业务处理器,由Handler完成IO操作和业务处理。

1、Handler处理流程

整个IO操作环节大致包括从通道读取数据包(Netty底层负责),数据包解码,业务处理,目标数据编码,把数据包写到通道,然后由通道发到对端(Netty底层负责)。

前文提到Netty的Handler分为两类,ChannelInboundHandler 入站处理器,ChannelOutboundHandler 出站处理器
数据包解码,业务处理 两个环节属于入站处理器工作
目标数据编码,把数据包写到通道 两个环节属于出站处理器工作

2、入站处理器

当对端数据入站到Netty通道时,Netty将触发ChannelInboundHandler 所对应的入站API,进行入站操作处理

ChannelInboundHandler核心方法:

方法 作用
channelRegistered(ChannelHandlerContext var1)
channelUnregistered(ChannelHandlerContext var1)
channelActive(ChannelHandlerContext var1)
channelInactive(ChannelHandlerContext var1)
channelRead(ChannelHandlerContext var1, Object var2)
channelReadComplete(ChannelHandlerContext var1)
userEventTriggered(ChannelHandlerContext var1, Object var2)
channelWritabilityChanged(ChannelHandlerContext var1)
exceptionCaught(ChannelHandlerContext var1, Throwable var2)

3、出站处理器

当业务处理完成后,需要操作java NIO通道时,通过一系列的ChannelOutboundHandler出站处理器完成Netty通道到底层通道的操作,比如建立底层连接,断开底层连接,写入底层Java NIO通道等。

ChannelOutboundHandler核心方法

方法 作用
bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) 监听地址IP +端口绑定,如果为TCP协议,该方法适用于服务端
connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) 连接服务端,如果为TCP协议,该方法适用于客户端
disconnect(ChannelHandlerContext var1, ChannelPromise var2)
close(ChannelHandlerContext var1, ChannelPromise var2) 主动关闭底层通道
deregister(ChannelHandlerContext var1, ChannelPromise var2)
read(ChannelHandlerContext var1) 完成Netty从Java IO通道的数据读取
write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) 触发Netty通道向底层java IO通道的数据写入操作
flush(ChannelHandlerContext var1) 将底层缓冲区数据刷出,立即写到对端

4、ChannelInitalizer

每个Netty通道拥有一条Handler业务处理流水线,负责装配自己的Handler业务处理器。那么如何向流水线装配业务处理器呢?这就需要借助通道的初始化处理器
ChannelInitalizer。

在通道初始化时,会调用提前注册的初始化处理器的initChannel()方法

  1. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  2. @Override
  3. //有连接到达时会创建一个通道的子通道
  4. protected void initChannel(NioSocketChannel ch) throws Exception {
  5. //向子通道的流水线添加Handler业务处理器
  6. ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
  7. @Override
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  9. ByteBuf byteBuf = (ByteBuf) msg;
  10. log.info(byteBuf.toString(Charset.defaultCharset()));
  11. //将消息传递到流水线的下一个Handler
  12. ctx.fireChannelRead(msg);
  13. }
  14. }).addLast(longTimeHandleGroup,"longTimeHandler",new ChannelInboundHandlerAdapter(){
  15. @Override
  16. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  17. ByteBuf byteBuf = (ByteBuf) msg;
  18. log.info("longTimeHandler" + byteBuf.toString(Charset.defaultCharset()));
  19. }
  20. });
  21. }
  22. })

5、ChannelInboundHandler生命周期

方法类型 方法 说明
生命周期方法 handlerAdded() 当业务处理器被添加到流水线后,此方法被回调
channelRegistered() 当通道成功绑定一个EventLoop后,此方法被回调
channelActive() 当通道激活后,此方法被回调。通道激活是指所有业务处理器添加注册的异步任务完成,并且与EventLoop反应器绑定的异步任务完成
channelInactive() 当通道的底层连接已经不是ESTABLISH状态或者底层连接已经关闭时,回调所有业务处理器的channelInactive方法
channelUnregistered() 通道和EventLoop解除绑定,移除掉对这条通道的事件处理后,回调所有业务处理器的channelUnregistered方法
handlerRemoved() netty会移除通道上的所有的业务处理器,并回调所有业务处理器的handlerRemoved方法
数据入站回调方法 channelRead() 有数据包入站,通道可读。流水线启动入站处理流程,从前向后,入站处理器的channelRead方法会被依次回调
channelReadComplete() 流水线完成入站处理后,会从前向后依次回调每个入站处理器的channelReadComplete方法,表示数据读取完毕

执行顺序:

  • 通道创建和绑定时: handlerAdded() -> channelRegistered() -> channelActive()
  • 数据传输时: channelRead() -> channelReadComplete()
  • 通道关闭时:channelInactive() -> channelUnregistered() -> handlerRemoved()
  1. @Slf4j
  2. public class TestHandlerLife {
  3. public static void main(String[] args) {
  4. new ServerBootstrap()
  5. //boos只负责NioServerSocketChannelaccept worker负责NioSocketChannel上的读写
  6. .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
  7. .channel(NioServerSocketChannel.class)
  8. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  9. @Override
  10. protected void initChannel(NioSocketChannel ch) throws Exception {
  11. ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
  12. @Override
  13. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  14. log.info("调用: channelRegistered");
  15. super.channelRegistered(ctx);
  16. }
  17. @Override
  18. public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
  19. log.info("调用: channelUnregistered");
  20. super.channelUnregistered(ctx);
  21. }
  22. @Override
  23. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  24. log.info("调用: channelActive");
  25. super.channelActive(ctx);
  26. }
  27. @Override
  28. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  29. log.info("调用: channelInactive");
  30. super.channelInactive(ctx);
  31. }
  32. @Override
  33. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
  34. log.info("调用: channelReadComplete");
  35. super.channelReadComplete(ctx);
  36. }
  37. @Override
  38. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
  39. log.info("调用: channelWritabilityChanged");
  40. super.channelWritabilityChanged(ctx);
  41. }
  42. @Override
  43. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  44. log.info("调用: handlerAdded");
  45. super.handlerAdded(ctx);
  46. }
  47. @Override
  48. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  49. log.info("调用: handlerRemoved");
  50. super.handlerRemoved(ctx);
  51. }
  52. @Override
  53. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  54. log.info("调用: channelRead");
  55. ByteBuf byteBuf = (ByteBuf) msg;
  56. log.info(byteBuf.toString(Charset.defaultCharset()));
  57. //将消息传递到流水线的下一个Handler
  58. ctx.fireChannelRead(msg);
  59. }
  60. });
  61. }
  62. })
  63. .bind(8989);
  64. }
  65. }
  1. public class BootStrapClient {
  2. public static void main(String[] args) throws InterruptedException {
  3. Channel channel = new Bootstrap()
  4. .group(new NioEventLoopGroup())
  5. .channel(NioSocketChannel.class)
  6. .handler(new ChannelInitializer<NioSocketChannel>() {
  7. //连接建立后调用
  8. @Override
  9. protected void initChannel(NioSocketChannel sc) throws Exception {
  10. sc.pipeline().addLast(new StringEncoder());
  11. }
  12. })
  13. .connect(new InetSocketAddress("localhost", 8989))
  14. //阻塞方法,直至连接建立
  15. .sync()
  16. //代表连接对象
  17. .channel();
  18. Scanner scanner = new Scanner(System.in);
  19. while (scanner.hasNext()) {
  20. String str = scanner.next();
  21. if(!"quit".equals(str)){
  22. channel.writeAndFlush(str);
  23. }else {
  24. channel.close().sync();
  25. log.info("客户端断开");
  26. }
  27. }
  28. }
  29. }