应用程序开发人员的角度来看,Netty 的主要组件是ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的地方。Netty 以适配器类的形式提供了大量默认的ChannelHandler 实现,帮我们简化应用程序处理逻辑的开发过程。

ChannelHandler

Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。
Netty 事件是按照它们与入站或出站数据流的相关性进行分类的。

  • ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
  • ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
  • ChannelInboundHandler 用于处理入站 I/O 事件。
  • ChannelOutboundHandler 用于处理出站 I/O 操作。
  • ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
  • ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
  • ChannelDuplexHandler 用于处理入站和出站事件

    入站事件

    可能由入站数据或者相关的状态更改而触发的事件包括:

  • 连接已被激活或者连接失活

  • 数据读取
  • 用户事件
  • 错误事件

    出战事件

    出站事件是未来将会触发的某个动作的操作结果,这些动作包括:

  • 打开或者关闭到远程节点的连接

  • 将数据写到或者冲刷到套接字

每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法。
Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议(如 HTTP 和 SSL/TLS)的ChannelHandler。

ChannelHandler 的生命周期

在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用下面这些方法。这些方法中的每一个都接受一个 ChannelHandlerContext 参数。
image.png

  • handlerAdded :当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
  • handlerRemoved :当从 ChannelPipeline 中移除 ChannelHandler 时被调用
  • exceptionCaught :当处理过程中在 ChannelPipeline 中有错误产生时被调用


服务端

  1. new ServerBootstrap()
  2. .group(new NioEventLoopGroup())
  3. .channel(NioServerSocketChannel.class)
  4. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  5. protected void initChannel(NioSocketChannel ch) {
  6. ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
  7. @Override
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  9. System.out.println(1);
  10. ctx.fireChannelRead(msg); // 1
  11. }
  12. });
  13. ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
  14. @Override
  15. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  16. System.out.println(2);
  17. ctx.fireChannelRead(msg); // 2
  18. }
  19. });
  20. ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
  21. @Override
  22. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  23. System.out.println(3);
  24. ctx.channel().write(msg); // 3
  25. }
  26. });
  27. ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
  28. @Override
  29. public void write(ChannelHandlerContext ctx, Object msg,
  30. ChannelPromise promise) {
  31. System.out.println(4);
  32. ctx.write(msg, promise); // 4
  33. }
  34. });
  35. ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
  36. @Override
  37. public void write(ChannelHandlerContext ctx, Object msg,
  38. ChannelPromise promise) {
  39. System.out.println(5);
  40. ctx.write(msg, promise); // 5
  41. }
  42. });
  43. ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){
  44. @Override
  45. public void write(ChannelHandlerContext ctx, Object msg,
  46. ChannelPromise promise) {
  47. System.out.println(6);
  48. ctx.write(msg, promise); // 6
  49. }
  50. });
  51. }
  52. })
  53. .bind(8080);

客户端

  1. new Bootstrap()
  2. .group(new NioEventLoopGroup())
  3. .channel(NioSocketChannel.class)
  4. .handler(new ChannelInitializer<Channel>() {
  5. @Override
  6. protected void initChannel(Channel ch) {
  7. ch.pipeline().addLast(new StringEncoder());
  8. }
  9. })
  10. .connect("127.0.0.1", 8080)
  11. .addListener((ChannelFutureListener) future -> {
  12. future.channel().writeAndFlush("hello,world");
  13. });

服务器端打印:

  1. 1
  2. 2
  3. 3
  4. 6
  5. 5
  6. 4