对于开发人员来讲,Netty中的最主要的组件是 ChannelHandler,所有入站和出站的数据都要经过对应的 ChannelHandler 的处理。使用Netty进行业务开发,开发人员主要做的事就是编写各种 ChannelHandler 。今天我们就来看一看 Handler 中的各种方法到底是以什么样的形式调用的。
关于Handler的核心组件
首先,我们需要搞清楚与 ChannelHandler 相关的一些核心组件
ChannelHandler
Netty中定义了两个重要的 ChannelHandler 子接口,分别处理数据的入站和出站,当然,ChannelHandler 可以同时实现 ChannelInboundhandler 和 ChannelOutboundhandler 
ChannelInboundHandler
主要用于处理入站数据以及各种状态变化,入站事件会从 pipeline 中的 head 节点往后传递到最后一个ChannelInboundhandler,常见API如下:
| channelRegistered() | 当 Channel 注册到 EventLoop 的 Selector上时被调用 |
|---|---|
| channelUnregistered() | 当 Channel 从 EventLoop 的 Selector上注销时被调用 |
| channelActive() | 当 Channel 已经建立好连接时调用 |
| channelInactive() | 当 Channel 与断开连接时调用 |
| channelRead() | 往 Channel 中读取数据时被调用 |
| channelReadComplete() | 往 Channel 中完成读操作时被调用 |
| userEventTriggered() | 当上一个 ChannelInboundHandler的 fireUserEventTriggered() 方法被调用时会触发,如Netty心跳机制的实现 |
ChannelOutboundHandler
出站操作和数据将由 ChannelOutboundhandler 处理,出站事件会从pipeline 中的 tail 节点往前传递到最前一个 ChannelOutboundhandler。ChannelOutboundhandler的方法将会被Channel、ChannelPipeline 以及 ChannelHandlerContext 调用。常见API如下:
| bind(ChannelHandlerContext, SocketAddress, ChannelPromise) | 当请求将 Channel 绑定到本地地址时调用 |
|---|---|
| bind(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise) | 当请求将 Channel 与远程节点进行连接时被调用 |
| disconnect(ChannelHandlerContext, ChannelPromise) | 当请求将 Channel 与远程节点断开连接时被调用 |
| close(ChannelHandlerContext, ChannelPromise) | 当请求关闭 Channel 时调用 |
| write(ChannelHandlerContext, Object, ChannelPromise) | 当请求通过 Channel 将数据写到远程节点时调用 |
| flush(ChannelHandlerContext) | 当请求入队消息冲刷到远程节点时调用 |
ChannelOutboundhandler 的大部分方法中都需要一个 ChannelPromise 参数,以便在操作完成时得到通知。ChannelPromise 是 ChannelFuture 的子类,其定义了一些可写方法,如 setSuccess() 和 setFailure(),从而使 ChannelFuture 不可变。
这里借鉴的是 Scala 的 Promise 和 Future 设计,当一个Promise 被完成后,其对应的 Future 不能再进行任何修改
ChannelPipeline
ChannelPipeline 是一个拦截流经 Channel 的入站和出站的管道,里面维护了一个Channel -Handler 的链表(包括所有的ChannelOutboundhandler 和 ChannelOutboundhandler )
每一个 Channel 都会有自己固定对应的 ChannelPipeline。入站和出站事件都会被Pipeline中对应的Handler处理。
如果一个入站事件触发,它将被 ChannelPipeline 的 head 一直传到尾部,在传播过程中,还会检查下一个 ChannelHandler 的类型是否和入站事件想匹配,如果不匹配,则会跳过该 ChannelHandler,直到找到对应的Handler。
ChannelPipeline 提供了用于触发 ChannelHandler 处理入站和出站操作的API
| fireChannelRegistered | 调用 ChannelPipeline 中位于 head 节点的 channelregistered 方法 |
|---|---|
| fireChannelUnregistered | 调用 ChannelPipeline 中位于 head 节点的 channelUnregistered 方法 |
| fireChannelActive | 调用 ChannelPipeline 中位于 head 节点的 channelActive 方法 |
| fireChannelInactive | 调用 ChannelPipeline 中位于 head 节点的 channelInactive 方法 |
| fireChannelRead | 调用 ChannelPipeline 中位于 head 节点的 channelRead 方法 |
| fireExceptionCaught | 调用 ChannelPipeline 中位于 head 节点的 exceptionCaught 方法 |
| fireChannelReadComplete | 调用 ChannelPipeline 中位于 head 节点的 channelReadComplete 方法 |
| bind | 调用 ChannelPipeline 中位于 tail 节点的 bind 方法 |
| connect | 调用 ChannelPipeline 中位于 tail 节点的 connect 方法 |
| disconnect | 调用 ChannelPipeline 中位于 tail 节点的 disconnect 方法 |
| write | 调用 ChannelPipeline 中位于 tail 节点的 write 方法 |
ChannelHandlerContext
每当有ChannelHandler 添加加到管道中,都会创建 ChannelHandlerContext,其主要的功能是管理它所管理的 ChannelHandler 和在同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。ChannelHandlerContext 有很多方法的功能跟 Channel 和ChannelPipeline 中的类似。Channel 或 ChannelPipeline 上的这些方法(fireXxxx),之所以能沿着整个Pipeline进行传播,是通过 ChannelHandlerContext 来实现的。
ChannelHandlerContext的常用API:
| fireChannelRegistered | 触发对下一个ChannelHandlerContext 上的 channelregistered 方法的调用 |
|---|---|
| fireChannelUnregistered | 触发对下一个ChannelHandlerContext 上的 channelUnregistered 方法的调用 |
| fireChannelActive | 触发对下一个ChannelHandlerContext 上的 channelActive 方法的调用 |
| fireChannelInactive | 触发对下一个ChannelHandlerContext 上的 channelInactive 方法的调用 |
| fireChannelRead | 触发对下一个ChannelHandlerContext 上的 channelRead 方法的调用 |
| fireExceptionCaught | 触发对下一个ChannelHandlerContext 上的 exceptionCaught 方法的调用 |
| fireChannelReadComplete | 触发对下一个ChannelHandlerContext 上的 channelReadComplete 方法的调用 |
| bind | 绑定到指定的 SocketAddress |
| connect | 连接指定的 SocketAddress |
| disconnect | 与远程节点断开连接 |
| write | 通过这个实例写入消息并经过 ChannelPipeline |
Handler的责任链调用
下面我们以从 Channel 中读取数据为例。当要往 Channel 中读取数据时,Netty底层会调用 ChannelPipeline 中的 invokeChannelRead() 方法,从第一个处理入站的 Handler 开始调用依次往后调用(即从head开始)
public final ChannelPipeline fireChannelRead(Object msg) {//从head开始调用AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);return this;}
在 invokeChannelRegistered(ChannelHandlerContext, Object) 中,分为了同步执行和异步执行两种方式,不管是哪种方式,最终都会调用传入的 ChannelHandlerContext 的 invokeChannelRegistered() 方法(此时是head节点)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();//同步调用if (executor.inEventLoop()) {next.invokeChannelRead(m);//异步调用} else {executor.execute(new Runnable() {public void run() {next.invokeChannelRead(m);}});}}
调用位于 head 的 ChannelHandlerContext 的 channelRead() 方法
private void invokeChannelRead(Object msg) {if (this.invokeHandler()) {try {((ChannelInboundHandler)this.handler()).channelRead(this, msg);} catch (Throwable var3) {this.notifyHandlerException(var3);}} else {......}}
head节点对应的 Handler 并没有做任何处理,直接触发下一个 Handler 的 channelRead()方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {//head节点对应的Handler并没有做任何处理,直接调用下一个Handler的channelRead()方法ctx.fireChannelRead(msg);}
触发下一个Handler
public ChannelHandlerContext fireChannelRegistered(Object msg) {//找到下一个ChannelInboundHandler,重新执行invokeChannelRead(ChannelHandlerContext,Object)方法invokeChannelRead(this.findContextInbound(32), msg);return this;}
找到下一个处理入站的Handler,即我们自定义的Handler,又回到了invokeChannelRead方法,开始新一轮处理。
public ChannelHandlerContext fireChannelRead(Object msg) {//找到下一个处理入站的Handler,又调用了invokeChannelRead()invokeChannelRead(this.findContextInbound(32), msg);return this;}-----------------------------------------private AbstractChannelHandlerContext findContextInbound(int mask) {AbstractChannelHandlerContext ctx = this;do {//找到下一个相同类型的Handlerctx = ctx.next;} while((ctx.executionMask & mask) == 0);return ctx;}

