对于开发人员来讲,Netty中的最主要的组件是 ChannelHandler,所有入站和出站的数据都要经过对应的 ChannelHandler 的处理。使用Netty进行业务开发,开发人员主要做的事就是编写各种 ChannelHandler 。今天我们就来看一看 Handler 中的各种方法到底是以什么样的形式调用的。

关于Handler的核心组件

首先,我们需要搞清楚与 ChannelHandler 相关的一些核心组件
微信截图_20210723145748.png

ChannelHandler

Netty中定义了两个重要的 ChannelHandler 子接口,分别处理数据的入站和出站,当然,ChannelHandler 可以同时实现 ChannelInboundhandler 和 ChannelOutboundhandler
微信截图_20210817103723.png

ChannelInboundHandler

主要用于处理入站数据以及各种状态变化,入站事件会从 pipeline 中的 head 节点往后传递到最后一个ChannelInboundhandler,常见API如下:

channelRegistered() 当 Channel 注册到 EventLoop 的 Selector上时被调用
channelUnregistered() 当 Channel 从 EventLoop 的 Selector上注销时被调用
channelActive() 当 Channel 已经建立好连接时调用
channelInactive() 当 Channel 与断开连接时调用
channelRead() 往 Channel 中读取数据时被调用
channelReadComplete() 往 Channel 中完成读操作时被调用
userEventTriggered() 当上一个 ChannelInboundHandler的 fireUserEventTriggered() 方法被调用时会触发,如Netty心跳机制的实现

ChannelOutboundHandler

出站操作和数据将由 ChannelOutboundhandler 处理,出站事件会从pipeline 中的 tail 节点往前传递到最前一个 ChannelOutboundhandler。ChannelOutboundhandler的方法将会被ChannelChannelPipeline 以及 ChannelHandlerContext 调用。常见API如下:

bind(ChannelHandlerContext, SocketAddress, ChannelPromise) 当请求将 Channel 绑定到本地地址时调用
bind(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise) 当请求将 Channel 与远程节点进行连接时被调用
disconnect(ChannelHandlerContext, ChannelPromise) 当请求将 Channel 与远程节点断开连接时被调用
close(ChannelHandlerContext, ChannelPromise) 当请求关闭 Channel 时调用
write(ChannelHandlerContext, Object, ChannelPromise) 当请求通过 Channel 将数据写到远程节点时调用
flush(ChannelHandlerContext) 当请求入队消息冲刷到远程节点时调用

ChannelOutboundhandler 的大部分方法中都需要一个 ChannelPromise 参数,以便在操作完成时得到通知。ChannelPromise 是 ChannelFuture 的子类,其定义了一些可写方法,如 setSuccess() 和 setFailure(),从而使 ChannelFuture 不可变。

这里借鉴的是 Scala 的 Promise 和 Future 设计,当一个Promise 被完成后,其对应的 Future 不能再进行任何修改

ChannelPipeline

ChannelPipeline 是一个拦截流经 Channel 的入站和出站的管道,里面维护了一个Channel -Handler 的链表(包括所有的ChannelOutboundhandler 和 ChannelOutboundhandler )
每一个 Channel 都会有自己固定对应的 ChannelPipeline。入站和出站事件都会被Pipeline中对应的Handler处理。
微信截图_20210817123655.png
如果一个入站事件触发,它将被 ChannelPipeline 的 head 一直传到尾部,在传播过程中,还会检查下一个 ChannelHandler 的类型是否和入站事件想匹配,如果不匹配,则会跳过该 ChannelHandler,直到找到对应的Handler。

ChannelPipeline 提供了用于触发 ChannelHandler 处理入站和出站操作的API

fireChannelRegistered 调用 ChannelPipeline 中位于 head 节点的 channelregistered 方法
fireChannelUnregistered 调用 ChannelPipeline 中位于 head 节点的 channelUnregistered 方法
fireChannelActive 调用 ChannelPipeline 中位于 head 节点的 channelActive 方法
fireChannelInactive 调用 ChannelPipeline 中位于 head 节点的 channelInactive 方法
fireChannelRead 调用 ChannelPipeline 中位于 head 节点的 channelRead 方法
fireExceptionCaught 调用 ChannelPipeline 中位于 head 节点的 exceptionCaught 方法
fireChannelReadComplete 调用 ChannelPipeline 中位于 head 节点的 channelReadComplete 方法
bind 调用 ChannelPipeline 中位于 tail 节点的 bind 方法
connect 调用 ChannelPipeline 中位于 tail 节点的 connect 方法
disconnect 调用 ChannelPipeline 中位于 tail 节点的 disconnect 方法
write 调用 ChannelPipeline 中位于 tail 节点的 write 方法

ChannelHandlerContext

每当有ChannelHandler 添加加到管道中,都会创建 ChannelHandlerContext,其主要的功能是管理它所管理的 ChannelHandler 和在同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。ChannelHandlerContext 有很多方法的功能跟 Channel 和ChannelPipeline 中的类似。Channel 或 ChannelPipeline 上的这些方法(fireXxxx),之所以能沿着整个Pipeline进行传播,是通过 ChannelHandlerContext 来实现的。

ChannelHandlerContext的常用API:

fireChannelRegistered 触发对下一个ChannelHandlerContext 上的 channelregistered 方法的调用
fireChannelUnregistered 触发对下一个ChannelHandlerContext 上的 channelUnregistered 方法的调用
fireChannelActive 触发对下一个ChannelHandlerContext 上的 channelActive 方法的调用
fireChannelInactive 触发对下一个ChannelHandlerContext 上的 channelInactive 方法的调用
fireChannelRead 触发对下一个ChannelHandlerContext 上的 channelRead 方法的调用
fireExceptionCaught 触发对下一个ChannelHandlerContext 上的 exceptionCaught 方法的调用
fireChannelReadComplete 触发对下一个ChannelHandlerContext 上的 channelReadComplete 方法的调用
bind 绑定到指定的 SocketAddress
connect 连接指定的 SocketAddress
disconnect 与远程节点断开连接
write 通过这个实例写入消息并经过 ChannelPipeline

Handler的责任链调用

下面我们以从 Channel 中读取数据为例。当要往 Channel 中读取数据时,Netty底层会调用 ChannelPipeline 中的 invokeChannelRead() 方法,从第一个处理入站的 Handler 开始调用依次往后调用(即从head开始)

  1. public final ChannelPipeline fireChannelRead(Object msg) {
  2. //从head开始调用
  3. AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
  4. return this;
  5. }

在 invokeChannelRegistered(ChannelHandlerContext, Object) 中,分为了同步执行和异步执行两种方式,不管是哪种方式,最终都会调用传入的 ChannelHandlerContext 的 invokeChannelRegistered() 方法(此时是head节点)

  1. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  2. final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
  3. EventExecutor executor = next.executor();
  4. //同步调用
  5. if (executor.inEventLoop()) {
  6. next.invokeChannelRead(m);
  7. //异步调用
  8. } else {
  9. executor.execute(new Runnable() {
  10. public void run() {
  11. next.invokeChannelRead(m);
  12. }
  13. });
  14. }
  15. }

调用位于 head 的 ChannelHandlerContext 的 channelRead() 方法

  1. private void invokeChannelRead(Object msg) {
  2. if (this.invokeHandler()) {
  3. try {
  4. ((ChannelInboundHandler)this.handler()).channelRead(this, msg);
  5. } catch (Throwable var3) {
  6. this.notifyHandlerException(var3);
  7. }
  8. } else {
  9. ......
  10. }
  11. }

head节点对应的 Handler 并没有做任何处理,直接触发下一个 Handler 的 channelRead()方法

  1. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  2. //head节点对应的Handler并没有做任何处理,直接调用下一个Handler的channelRead()方法
  3. ctx.fireChannelRead(msg);
  4. }

触发下一个Handler

  1. public ChannelHandlerContext fireChannelRegistered(Object msg) {
  2. //找到下一个ChannelInboundHandler,重新执行invokeChannelRead(ChannelHandlerContext,Object)方法
  3. invokeChannelRead(this.findContextInbound(32), msg);
  4. return this;
  5. }

找到下一个处理入站的Handler,即我们自定义的Handler,又回到了invokeChannelRead方法,开始新一轮处理。

  1. public ChannelHandlerContext fireChannelRead(Object msg) {
  2. //找到下一个处理入站的Handler,又调用了invokeChannelRead()
  3. invokeChannelRead(this.findContextInbound(32), msg);
  4. return this;
  5. }
  6. -----------------------------------------
  7. private AbstractChannelHandlerContext findContextInbound(int mask) {
  8. AbstractChannelHandlerContext ctx = this;
  9. do {
  10. //找到下一个相同类型的Handler
  11. ctx = ctx.next;
  12. } while((ctx.executionMask & mask) == 0);
  13. return ctx;
  14. }

微信截图_20210817170115.png