对于开发人员来讲,Netty中的最主要的组件是 ChannelHandler,所有入站和出站的数据都要经过对应的 ChannelHandler 的处理。使用Netty进行业务开发,开发人员主要做的事就是编写各种 ChannelHandler 。今天我们就来看一看 Handler 中的各种方法到底是以什么样的形式调用的。
关于Handler的核心组件
首先,我们需要搞清楚与 ChannelHandler 相关的一些核心组件
ChannelHandler
Netty中定义了两个重要的 ChannelHandler 子接口,分别处理数据的入站和出站,当然,ChannelHandler 可以同时实现 ChannelInboundhandler 和 ChannelOutboundhandler
ChannelInboundHandler
主要用于处理入站数据以及各种状态变化,入站事件会从 pipeline 中的 head 节点往后传递到最后一个ChannelInboundhandler,常见API如下:
channelRegistered() | 当 Channel 注册到 EventLoop 的 Selector上时被调用 |
---|---|
channelUnregistered() | 当 Channel 从 EventLoop 的 Selector上注销时被调用 |
channelActive() | 当 Channel 已经建立好连接时调用 |
channelInactive() | 当 Channel 与断开连接时调用 |
channelRead() | 往 Channel 中读取数据时被调用 |
channelReadComplete() | 往 Channel 中完成读操作时被调用 |
userEventTriggered() | 当上一个 ChannelInboundHandler的 fireUserEventTriggered() 方法被调用时会触发,如Netty心跳机制的实现 |
ChannelOutboundHandler
出站操作和数据将由 ChannelOutboundhandler 处理,出站事件会从pipeline 中的 tail 节点往前传递到最前一个 ChannelOutboundhandler。ChannelOutboundhandler的方法将会被Channel、ChannelPipeline 以及 ChannelHandlerContext 调用。常见API如下:
bind(ChannelHandlerContext, SocketAddress, ChannelPromise) | 当请求将 Channel 绑定到本地地址时调用 |
---|---|
bind(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise) | 当请求将 Channel 与远程节点进行连接时被调用 |
disconnect(ChannelHandlerContext, ChannelPromise) | 当请求将 Channel 与远程节点断开连接时被调用 |
close(ChannelHandlerContext, ChannelPromise) | 当请求关闭 Channel 时调用 |
write(ChannelHandlerContext, Object, ChannelPromise) | 当请求通过 Channel 将数据写到远程节点时调用 |
flush(ChannelHandlerContext) | 当请求入队消息冲刷到远程节点时调用 |
ChannelOutboundhandler 的大部分方法中都需要一个 ChannelPromise 参数,以便在操作完成时得到通知。ChannelPromise 是 ChannelFuture 的子类,其定义了一些可写方法,如 setSuccess() 和 setFailure(),从而使 ChannelFuture 不可变。
这里借鉴的是 Scala 的 Promise 和 Future 设计,当一个Promise 被完成后,其对应的 Future 不能再进行任何修改
ChannelPipeline
ChannelPipeline 是一个拦截流经 Channel 的入站和出站的管道,里面维护了一个Channel -Handler 的链表(包括所有的ChannelOutboundhandler 和 ChannelOutboundhandler )
每一个 Channel 都会有自己固定对应的 ChannelPipeline。入站和出站事件都会被Pipeline中对应的Handler处理。
如果一个入站事件触发,它将被 ChannelPipeline 的 head 一直传到尾部,在传播过程中,还会检查下一个 ChannelHandler 的类型是否和入站事件想匹配,如果不匹配,则会跳过该 ChannelHandler,直到找到对应的Handler。
ChannelPipeline 提供了用于触发 ChannelHandler 处理入站和出站操作的API
fireChannelRegistered | 调用 ChannelPipeline 中位于 head 节点的 channelregistered 方法 |
---|---|
fireChannelUnregistered | 调用 ChannelPipeline 中位于 head 节点的 channelUnregistered 方法 |
fireChannelActive | 调用 ChannelPipeline 中位于 head 节点的 channelActive 方法 |
fireChannelInactive | 调用 ChannelPipeline 中位于 head 节点的 channelInactive 方法 |
fireChannelRead | 调用 ChannelPipeline 中位于 head 节点的 channelRead 方法 |
fireExceptionCaught | 调用 ChannelPipeline 中位于 head 节点的 exceptionCaught 方法 |
fireChannelReadComplete | 调用 ChannelPipeline 中位于 head 节点的 channelReadComplete 方法 |
bind | 调用 ChannelPipeline 中位于 tail 节点的 bind 方法 |
connect | 调用 ChannelPipeline 中位于 tail 节点的 connect 方法 |
disconnect | 调用 ChannelPipeline 中位于 tail 节点的 disconnect 方法 |
write | 调用 ChannelPipeline 中位于 tail 节点的 write 方法 |
ChannelHandlerContext
每当有ChannelHandler 添加加到管道中,都会创建 ChannelHandlerContext,其主要的功能是管理它所管理的 ChannelHandler 和在同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。ChannelHandlerContext 有很多方法的功能跟 Channel 和ChannelPipeline 中的类似。Channel 或 ChannelPipeline 上的这些方法(fireXxxx),之所以能沿着整个Pipeline进行传播,是通过 ChannelHandlerContext 来实现的。
ChannelHandlerContext的常用API:
fireChannelRegistered | 触发对下一个ChannelHandlerContext 上的 channelregistered 方法的调用 |
---|---|
fireChannelUnregistered | 触发对下一个ChannelHandlerContext 上的 channelUnregistered 方法的调用 |
fireChannelActive | 触发对下一个ChannelHandlerContext 上的 channelActive 方法的调用 |
fireChannelInactive | 触发对下一个ChannelHandlerContext 上的 channelInactive 方法的调用 |
fireChannelRead | 触发对下一个ChannelHandlerContext 上的 channelRead 方法的调用 |
fireExceptionCaught | 触发对下一个ChannelHandlerContext 上的 exceptionCaught 方法的调用 |
fireChannelReadComplete | 触发对下一个ChannelHandlerContext 上的 channelReadComplete 方法的调用 |
bind | 绑定到指定的 SocketAddress |
connect | 连接指定的 SocketAddress |
disconnect | 与远程节点断开连接 |
write | 通过这个实例写入消息并经过 ChannelPipeline |
Handler的责任链调用
下面我们以从 Channel 中读取数据为例。当要往 Channel 中读取数据时,Netty底层会调用 ChannelPipeline 中的 invokeChannelRead() 方法,从第一个处理入站的 Handler 开始调用依次往后调用(即从head开始)
public final ChannelPipeline fireChannelRead(Object msg) {
//从head开始调用
AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
return this;
}
在 invokeChannelRegistered(ChannelHandlerContext, Object) 中,分为了同步执行和异步执行两种方式,不管是哪种方式,最终都会调用传入的 ChannelHandlerContext 的 invokeChannelRegistered() 方法(此时是head节点)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
//同步调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
//异步调用
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
调用位于 head 的 ChannelHandlerContext 的 channelRead() 方法
private void invokeChannelRead(Object msg) {
if (this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelRead(this, msg);
} catch (Throwable var3) {
this.notifyHandlerException(var3);
}
} else {
......
}
}
head节点对应的 Handler 并没有做任何处理,直接触发下一个 Handler 的 channelRead()方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//head节点对应的Handler并没有做任何处理,直接调用下一个Handler的channelRead()方法
ctx.fireChannelRead(msg);
}
触发下一个Handler
public ChannelHandlerContext fireChannelRegistered(Object msg) {
//找到下一个ChannelInboundHandler,重新执行invokeChannelRead(ChannelHandlerContext,Object)方法
invokeChannelRead(this.findContextInbound(32), msg);
return this;
}
找到下一个处理入站的Handler,即我们自定义的Handler,又回到了invokeChannelRead方法,开始新一轮处理。
public ChannelHandlerContext fireChannelRead(Object msg) {
//找到下一个处理入站的Handler,又调用了invokeChannelRead()
invokeChannelRead(this.findContextInbound(32), msg);
return this;
}
-----------------------------------------
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
//找到下一个相同类型的Handler
ctx = ctx.next;
} while((ctx.executionMask & mask) == 0);
return ctx;
}