Netty 的 ChannelPipeline 和 ChannelHandler 机制类似于 Servlet 和 Filter 过滤器,这类拦截器实际上是职责链模式的一种变形,主要是为了方便事件的拦截和用户业务逻辑的定制。

Servlet Filter 能够以声明的方式(web.xml 配置文件)插入到 HTTP 请求响应的处理过程中,用于拦截请求和响应,以便能够查看、提取或以某种方式操作正在客户端和服务器之间交换的数据。拦截器封装了业务定制逻辑,能够实现对 Web 应用程序 的预处理和事后处理。

Netty 的 Channel 过滤器 实现原理与 Servlet Filter 机制 一致,它将 Channel 的数据管道 抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 I/O 事件拦截器 ChannelHandler 链表,由 ChannelHandler 链表 对 IO 事件 进行拦截和处理,可以通过新增和删除 ChannelHandler 来实现不同的业务逻辑定制,不需要对已有的 ChannelHandler 进行修改,能够实现对修改封闭和对扩展的支持。

下面我们对 ChannelPipeline 和 ChannelHandler 的功能进行简单地介绍,然后分析下其源码设计。

ChannelPipeline 的功能和作用

ChannelPipeline 是 ChannelHandler 的容器,它负责 ChannelHandler 的管理、事件拦截与调度。

ChannelPipeline 的事件处理

下图展示了一个消息被 ChannelPipeline 的 ChannelHandler 链拦截和处理的全过程。

  1. * I/O Request
  2. * via {@link Channel} or
  3. * {@link ChannelHandlerContext}
  4. * |
  5. * +---------------------------------------------------+---------------+
  6. * | ChannelPipeline | |
  7. * | \|/ |
  8. * | +---------------------+ +-----------+----------+ |
  9. * | | Inbound Handler N | | Outbound Handler 1 | |
  10. * | +----------+----------+ +-----------+----------+ |
  11. * | /|\ | |
  12. * | | \|/ |
  13. * | +----------+----------+ +-----------+----------+ |
  14. * | | Inbound Handler N-1 | | Outbound Handler 2 | |
  15. * | +----------+----------+ +-----------+----------+ |
  16. * | /|\ . |
  17. * | . . |
  18. * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
  19. * | [ method call] [method call] |
  20. * | . . |
  21. * | . \|/ |
  22. * | +----------+----------+ +-----------+----------+ |
  23. * | | Inbound Handler 2 | | Outbound Handler M-1 | |
  24. * | +----------+----------+ +-----------+----------+ |
  25. * | /|\ | |
  26. * | | \|/ |
  27. * | +----------+----------+ +-----------+----------+ |
  28. * | | Inbound Handler 1 | | Outbound Handler M | |
  29. * | +----------+----------+ +-----------+----------+ |
  30. * | /|\ | |
  31. * +---------------+-----------------------------------+---------------+
  32. * | \|/
  33. * +---------------+-----------------------------------+---------------+
  34. * | | | |
  35. * | [ Socket.read() ] [ Socket.write() ] |
  36. * | |
  37. * | Netty Internal I/O Threads (Transport Implementation) |
  38. * +-------------------------------------------------------------------+

从上图可以看出 消息读取和发送处理全流程为:

  1. 底层的 SocketChannel.read()方法 读取 ByteBuf,触发 ChannelRead 事件,由 IO 线程 NioEventLoop 调用 ChannelPipeline 的 fireChannelRead(Object msg)方法,将消息传输到 ChannelPipeline 中。
  2. 消息依次被 HeadHandler、ChannelHandler1、ChannelHandler2 … TailHandler 拦截和处理,在这个过程中,任何 ChannelHandler 都可以中断当前的流程,结束消息的传递。
  3. 调用 ChannelHandlerContext 的 write 方法 发送消息,消息从 TailHandler 开始途经 ChannelHandlerN … ChannelHandler1、HeadHandler,最终被添加到消息发送缓冲区中等待刷新和发送,在此过程中也可以中断消息的传递,例如当编码失败时,就需要中断流程,构造异常的 Future 返回。

Netty 中的事件分为 Inbound 事件 和 Outbound 事件。Inbound 事件 通常由 I/O 线程 触发,例如 TCP 链路建立事件、链路关闭事件、读事件、异常通知事件等,它对应上图的左半部分。触发 Inbound 事件 的方法如下。

  1. ChannelHandlerContext.fireChannelRegistered():Channel 注册事件;
  2. ChannelHandlerContext.fireChannelActive():TCP 链路建立成功,Channel 激活事件;
  3. ChannelHandlerContext.fireChannelRead(Object):读事件;
  4. ChannelHandlerContext.fireChannelReadComplete():读操作完成通知事件;
  5. ChannelHandlerContext.fireExceptionCaught(Throwable):异常通知事件;
  6. ChannelHandlerContext.fireUserEventTriggered(Object):用户自定义事件;
  7. ChannelHandlerContext.fireChannelWritabilityChanged():Channel 的可写状态变化;
  8. ChannelHandlerContext.fireChannellnactive():TCP 连接关闭,链路不可用通知事件。

Outbound 事件 通常是由用户主动发起的 网络 IO 操作,例如用户发起的连接操作、绑定操作、消息发送等操作,它对应上图的右半部分。触发 Outbound 事件 的方法如下:

  1. ChannelHandlerContext.bind(SocketAddress, ChannelPromise):绑定本地地址事件;
  2. ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise):连接服务端事件;
  3. ChannelHandlerContext.write(Object, ChannelPromise):发送事件;
  4. ChannelHandlerContext.flush():刷新事件;
  5. ChannelHandlerContext.read():读事件;
  6. ChannelHandlerContext.disconnect(ChannelPromise):断开连接事件;
  7. ChannelHandlerContext.close(ChannelPromise):关闭当前 Channel 事件。

ChannelPipeline 自定义拦截器

ChannelPipeline 通过 ChannelHandler 来实现事件的拦截和处理,由于 ChannelHandler 中的事件种类繁多,不同的 ChannelHandler 可能只需要关心其中的个别事件,所以,自定义的 ChannelHandler 只需要继承 ChannelInboundHandlerAdapter / ChannelOutboundHandlerAdapter,覆盖自己关心的方法即可。

下面的两个示例分别展示了:拦截 Channel Active 事件,打印 TCP 链路建立成功日志,和 链路关闭的时候释放资源,代码如下。

  1. public class MyInboundHandler extends ChannelInboundHandlerAdapter {
  2. @Override
  3. public void channelActive(ChannelHandlerContext context) {
  4. System.out.println("欢迎来到,LPL!");
  5. context.fireChannelActive();
  6. }
  7. }
  8. public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
  9. @Override
  10. public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
  11. System.out.println("游戏结束...");
  12. ctx.close();
  13. }
  14. }

构建 pipeline

使用 Netty 时,用户不需要自己创建 pipeline,因为使用 ServerBootstrap 或者 Bootstrap 进行配置后,Netty 会为每个 Channel 连接 创建一个独立的 pipeline。我们只需将自定义的 ChannelHandler 加入到 pipeline 即可。相关代码如下。

  1. ServerBootstrap server = new ServerBootstrap();
  2. server.childHandler(new ChannelInitializer<SocketChannel>() {
  3. @Override
  4. public void initChannel(SocketChannel ch) throws Exception {
  5. ChannelPipeline pipeline = ch.pipeline();
  6. /** 解析自定义协议 */
  7. pipeline.addLast(new MyDecoder());
  8. pipeline.addLast(new MyEncoder());
  9. pipeline.addLast(new SocketHandler());
  10. /** 解析Http请求 */
  11. pipeline.addLast(new HttpServerCodec());
  12. //主要是将同一个http请求或响应的多个消息对象变成一个 fullHttpRequest完整的消息对象
  13. pipeline.addLast(new HttpObjectAggregator(64 * 1024));
  14. //主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的 ,加上这个handler我们就不用考虑这个问题了
  15. pipeline.addLast(new ChunkedWriteHandler());
  16. }
  17. });

对于类似编解码这样的 ChannelHandler,它存在先后顺序,例如 MessageToMessageDecoder,在它之前往往需要有 ByteToMessageDecoder 将 ByteBuf 解码为对象,然后将对象做二次解码 得到最终的 POJO 对象。pipeline 支持指定位置添加或者删除 ChannelHandler。

ChannelPipeline 的主要特性

ChannelPipeline 支持运行时动态的添加或者删除 ChannelHandler,在某些场景下这个特性非常实用。例如当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将 系统拥塞保护 ChannelHandler 添加到当前的 ChannelPipeline 中,当高峰期过去之后,再动态删除 拥塞保护 ChannelHandler。

ChannelPipeline 是线程安全的,这意味着 N 个业务线程可以并发地操作 ChannelPipeline 而不存在多线程并发问题。但 ChannelHandler 不是线程安全的,这意味着 我们需要自己保证 ChannelHandler 的线程安全。

ChannelPipeline 源码解析

ChannelPipeline 的代码比较简单,它实际上是一个 ChannelHandler 容器,内部维护了一个 ChannelHandler 的链表和迭代器,可以方便地进行 ChannelHandler 的 CRUD。

另外一个比较重要的部分是,当发生某个 I/O 事件 时,如 链路建立、链路关闭、读写操作 等,都会产一个事件,事件在 pipeline 中传播和处理,它是事件处理的总入口。由于 网络 I/O 相关的事件有限,因此 Netty 对这些事件进行了统一抽象,Netty 提供的 和用户自定义的 ChannelHandler 会对感兴趣的事件进行拦截和处理。

pipeline 中以 fireXXX 命名的方法都是从 I/O 线程 流向 用户业务 Handler 的 inbound 事件,它们的实现因功能而异,但是处理步骤类似,都是 调用 HeadHandler 对应的 fireXXX 方法,然后执行事件相关的逻辑操作。

  1. public interface ChannelPipeline
  2. extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
  3. /**
  4. * 管理 ChannelHandler 的api
  5. */
  6. ChannelPipeline addFirst(String name, ChannelHandler handler);
  7. ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
  8. ChannelPipeline addLast(String name, ChannelHandler handler);
  9. ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
  10. ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
  11. ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
  12. ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
  13. ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
  14. ChannelPipeline addFirst(ChannelHandler... handlers);
  15. ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
  16. ChannelPipeline addLast(ChannelHandler... handlers);
  17. ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
  18. ChannelPipeline remove(ChannelHandler handler);
  19. ChannelHandler remove(String name);
  20. <T extends ChannelHandler> T remove(Class<T> handlerType);
  21. ChannelHandler removeFirst();
  22. ChannelHandler removeLast();
  23. ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
  24. ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
  25. <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName, ChannelHandler newHandler);
  26. ChannelHandler first();
  27. ChannelHandler last();
  28. ChannelHandler get(String name);
  29. <T extends ChannelHandler> T get(Class<T> handlerType);
  30. /**
  31. * 处理 I/O事件 的api
  32. */
  33. @Override
  34. ChannelPipeline fireChannelRegistered();
  35. @Override
  36. ChannelPipeline fireChannelUnregistered();
  37. @Override
  38. ChannelPipeline fireChannelActive();
  39. @Override
  40. ChannelPipeline fireChannelInactive();
  41. @Override
  42. ChannelPipeline fireExceptionCaught(Throwable cause);
  43. @Override
  44. ChannelPipeline fireUserEventTriggered(Object event);
  45. @Override
  46. ChannelPipeline fireChannelRead(Object msg);
  47. @Override
  48. ChannelPipeline fireChannelReadComplete();
  49. @Override
  50. ChannelPipeline fireChannelWritabilityChanged();
  51. @Override
  52. ChannelPipeline flush();
  53. }

ChannelHandler 的功能和作用

ChannelHandler 负责对 I/O 事件 进行拦截处理,它可以选择性地 拦截处理感兴趣的事件,也可以透传和终止事件的传递。基于 ChannelHandler 接口,我们可以方便地进行业务逻辑定制,如 打印日志、统一封装异常信息、性能统计和消息编解码等。

ChannelHandlerAdapter

大部分 ChannelHandler 都会选择性 拦截处理感兴趣的 I/O 事件,忽略其他事件,然后交由下一个 ChannelHandler 进行拦截处理。这会导致一个问题:自定义 ChannelHandler 必须要实现 ChannelHandler 的所有接口,包括它不关心的那些事件处理接口,这会导致用户代码的冗余和臃肿,代码的可维护性也会变差。

为了解决这个问题,Netty 提供了 ChannelHandlerAdapter 基类,和 ChannelInboundHandlerAdapter / ChannelOutboundHandlerAdapter 两个实现类,如果 自定义 ChannelHandler 关心某个事件,只需要继承 ChannelInboundHandlerAdapter / ChannelOutboundHandlerAdapter 覆盖对应的方法即可,对于不关心的,可以直接继承使用父类的方法,这样子类的代码就会非常简洁清晰。

ChannelHandler 组件 的类结构

相对于 ByteBuf 和 Channel,ChannelHandler 的类继承关系稍微简单些,但是它的子类非常多,功能各异,主要可以分为如下四类。

  1. ChannelPipeline 的系统 ChannelHandler,用于 I/O 操作 和对事件进行预处理,对用户不可见,这类 ChannelHandler 主要包括 HeadHandler 和 TailHandler;
  2. 编解码 ChannelHandler,如 MessageToMessageEncoder、MessageToMessageDecoder、MessageToMessageCodec;
  3. 其他系统功能性 ChannelHandler,如 流量整型 Handler、读写超时 Handler、日志 Handler 等;
  4. 自定义 ChannelHandler。

ChannelHandler 组件 的核心类及常用类的类图如下。

在这里插入图片描述