概述
我们知道,ChannelHandler 是我们实现自定义的业务逻辑地方,而 ChannelPipeline 则是编排和管理 ChannelHandler 的管道(容器):将 ChannelHandler 链接在一起以组织处理逻辑。我们接下来会深入了解 ChannelHandler 和 ChannelPipeline,别忘记了还有重要的纽带 — ChannelHandlerContext。理解这些组件之间的关系以及相关事件传播机制对于通过 Netty 构建模块化的、可重用的实现至关重要。
ChannelHandler 家族
对于网络编程来说,我们最需要关注的是通道的生命周期(比如是否活动、是否注册)以及相关 I/O 事件(连接、读和写)。因此,这些生命周期和 I/O 事件都会对应回调方法,一旦事件发生,则触发相关回调方法,这是异步框架的体现之一。
Channel 的生命周期
我们需要对 Channel 的生命周期有一定认知,下表列表了 Channel 的 4 种状态。
状态 | 描述 |
---|---|
ChannelUnregistered | Channel 已经被创建,但还未注册到 EventLoop |
ChannelRegistered | Channel 已经被注册到了 EventLoop |
ChannelActive | Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 |
ChannelInactive | Channel 没有连接到远程节点 |
ChannelHandler 层次结构
从上图可以看出,ChannelHandler 的继承体系相对比较简洁明了。InboundHandler 和 OutBoundHandler 分别对应入站事件处理和出站事件处理。而中间的 ChannelHandlerAdapter 抽象类最重要的是实现 isSharable() 方法,这个等会再分析。
ChannelHandler
ChannelHandler 相关的生命周期如下表所示:
状态 | 描述 |
---|---|
handlerAdded | 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用 |
handlerRemoved | 当从 ChannelPipeline 中移除 ChannelHandler 时被调用 |
exceptionCaught | 当处理过程中在 ChannelPipeline 中有错误产生时被调用 |
顶层接口 ChannelHandler 只定义了 ChannelHandler 相关生命周期的回调方法,这是 Netty 提供给用户的扩展点之一。
/**
* ChannelHandler 顶层接口
*/
public interface ChannelHandler {
/**
* 「处理器已添加事件」
*
* {@link ChannelHandler} 被添加到实际的 {@link ChannelHandlerContext}之后调用,
* 表明已准备好处理事件
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* 「处理器被移除」
*
* {@link ChannelHandler}从实际的上下文中移除,意味着该handler不再会处理任何事件。
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
/**
* 「异常抛出」
*
* 当一个 {@link Throwable} 被抛出后回调此方法。
*
* @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
* implement the method there.
*/
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
/**
* 用于注解{@link ChannelHandler},表明该{@link ChannelHandler}是线程安全,
* 可以被用于不同实例的{@link ChannelPipeline}管道中。
*/
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
ChannelInboundHandler
而 ChannelInboundHandler 接口定义的方法则稍显复杂,相关回调接口归纳如下:
事件类型 | 描述 |
---|---|
channelRegistered | 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O时被调用 |
channelUnregistered | 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O时被调用 |
channelActive | 当处理过程中在 ChannelPipeline 中有错误产生时被调用 |
channelInactive | 当 Channel 离开活动状态并且不再连接它的远程节点时被调用 |
channelReadComplete | 当 Channel 中所有可读的字节都已经从 Channel 中读取之后,将会回调此方法。 可能在调用此方法之前会多次调用 channelRead() 方法读取数据。 |
channelRead | 当从 Channel 读取数据时被调用 |
ChannelWritabilityChanged | 当 Channel 的可写状态发生改变时被调用。 用户可以确保写操作不会完成得太快(以避免 OutOfMemoryError)或者可以在 Channel 变为再次可写时恢复写入。 可以通过调用 Channel 的 isWritable()方法来检测 Channel 的可写性。 与可写性相关的阈值可以通过 Channel.config().setWriteHighWaterMark() 和 Channel.config().setWriteLowWaterMark() 方法来设置 |
userEventTriggered | 当 ChannelnboundHandler.fireUserEventTriggered() 方法被调用时将会回调此方法,因为一个 POJO 在 ChannelPipeline 中传播 |
可以看到,ChannelInboundHandler 所定义的回调方法包含了 Channel 的生命周期,同时也增加了 4 个新的回调方法,它们用于在从 Channel 中读取数据的过程中触发,提供更多的回调接口便于用户扩展。
ChannelOutboundHandler
ChanelOutboundHandler 接口定义的方法则是与写有关。相关回调接口归纳如下:
事件类型 | 描述 |
---|---|
bind | 当 Channel 绑定到本地地址时回调 |
connect | 当 Channel 连接到远程节点时回调 |
disconnect | 当 Channel 从远程节点断开时回调 |
close | 当 Channel 被关闭时回调 |
deregister | 当 Channel 从所绑定的 EventLoop 中注销时回调 |
read | 从 Channel 读取数据时回调 |
write | 通过 Channel 将数据暂存队列,并没有发送到远程节点 |
flush | 通过 Channel 将入队数据冲刷到远程节点时回调 |
如果我们需要关注比如通道绑定、连接等事件的话,可以继承 ChannelOutboundHandler 接口,当感兴趣的事件发生时,相关方法将会被回调。
ChannelHandlerAdapter
它是一个抽象类,是 ChannelHandler 接口的实现类的骨架。一般 ChannelHandler 都会继承 ChannelHandlerAdapter,这是因为它实现了非常重要的方法 isSharable()
,用来判断 Channelhandler 的实现类是否被含有 @Sharable
注解并缓存相关信息。
ChannelInboundHandlerAdapter
ChannelInboundHandlerAdapter 可以看作是 ChannelInboundHandler 接口的实现基类,提供了所有方法的默认实现。有以下几点需要注意:
- 该类只是将事件继续向下一个 ChannelHandler 节点传播,并不会做额外的工作。
- 每个方法都包含
@Skip
注解,表明该方法不会被 ChannelPipeline 调用: 就是说默认所有的方法都会被跳过。因此,用户只需要覆盖感兴趣的方法即可。 channelRead() 方法自动返回后并不会释放 ByteBuf。因此,如果用户直接继承此类实现自身逻辑需要手动释放 ByteBuf 资源,避免内存泄漏。Netty 很贴心提供了可自动释放资源的 SimpleChannelInboundHandler。通常在实际的编程中会继承 SimpleChannelInboundHandler 而非 ChannelInboundHandlerAdapter。
ChannelOutboundHandlerAdapter
ChannelOutboundHandlerAdapter 可以看作是 ChannelOutboundHandler 接口的实现基类,提供了所有方法的默认实现。
当调用 write() 方法处理数据时并不会释放 ByteBuf。因此,需要根据自身逻辑手动释放,避免内存泄漏。
ChannelPipeline
我们先看看 ChannelPipeline 层次结构:
继承体系比较清晰,这里简单说明一下:ChannelInboundInvoker 和 ChannelOutboundInvoker 分别定义触发 ChannelInbound 和 ChannelOutbound 回调方法的接口。ChannelInboundInvoker 接口名称以
fire
开头,而 ChannelOutboundInvoker 则定义出站事件触发接口。- ChannelPipeline 定义与 ChannelHnadler 增、删、改、查 、遍历等接口。
- DefaultChannelPipeline 是 ChannelPipeline 的默认实现。
ChannelInboundInvoker
下图展示 ChannelInboundInvoker 的相关 API,用于通知 ChannelInboundHandler 在 ChannelPipeline 中所发生的事件。ChannelOutboundInvoker
ChannelOutboundInvoker 则定义和出站相关的触发接口。相关事件处理会导致底层的套接字上发生一系列的动作。相关 API 说明如下:
方法名称 | 描述 |
---|---|
bind | 当 Channel 绑定到一个本地地址后,将调用此方法:在 ChannelPipeline 中传播 bind 事件(回调 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 bind() 方法) |
connect | 当 Channel 连接到一个远程地址后,将调用此方法:在管道中传播 connect 事件 |
disconnect | 当 Channel 断开连接后,将调用此方法:在管道中传播 disconnect 事件 |
close | 当 Channel 关闭连接后,将调用此方法:在管道中传播 close 事件 |
deregister | 当 Channel 注销后,将调用此方法:在管道中传播 deregister 事件 |
flush | 冲刷 Channel 所有挂起的数据。在管道中传播 flush 事件 |
write | 当消息被写入 Channel 后,将调用此方法:在管道中传播 write 事件 |
writeAndFlush | 先调用 write() 方法,再调用 flush 方法 |
read | 请求从 Channel 中读取更多数据 |
ChannelPipeline
ChannelPipeline 定义的接口与 ChannelHandler 相关,包含 ChannelHandler 的增、删、改、查、遍历等。相关接口就不展示出来了。
DefaultChannelPipeline
DefaultChannelPipeline 是 ChannelPipeline 的默认实现,功能比较多,还有 5 个内部类,我们分别从内部类结构、ChannelInboundInvoker、ChannelOutboundInvoker 以及自身 ChannelPipeline 相关接口实现讲解。
内部类结构
DefaultChannelPipeline 定义了 5 个内部类结构,可以将分为两类,一类是上下文结点,一类是挂起的Handler添加/移除任务。
HeadContext
HeadContext 既是 Inbound 处理器,也是 Outbound 处理器(分别实现 ChannelInboundHandler 和 ChannelOutboundHandler 接口)。HeadContext 任务比较多:
- 网络数据流经 ChannelPipeline 的第一站。底层通过 Unsafe 从通道中读取数据。
- 处理后的数据流经 ChannelPipeline 的最后一站。底层通过 Unsafe 向通道写入数据。
- 在实现 ChannelInboundHandler 接口中,大部分实现逻辑是调用
ChannelHandlerContext#fireXX
方法,主要功能是回调 ChannelPipeline 中下一个可用的 InboundHandler 相关方法。 - 在实现 ChannelOutboundHandler 接口中,一般是与底层通道相关,会委托 Unsafe 魔法类来完成相关操作。比如从通道中读取数据、连接通道、关闭通道等。
相关源码摘录如下:
// io.netty.channel.DefaultChannelPipeline.HeadContext
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
/**
* 魔法类,用来操作底层Socket
*/
private final Unsafe unsafe;
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
unsafe.disconnect(promise);
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
unsafe.close(promise);
}
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
unsafe.deregister(promise);
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
ctx.fireChannelUnregistered();
// Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// #2 触发Channel的读事件
readIfIsAutoRead();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg); // HeadContext主要还是触发ChannelHandler的回调方法
}
/**
* 已把本轮次通道中数据读完,向管道传播「channelreadComplete」事件
* 事件传播过程:HeadContext->TailContext->HeadContext#read()
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// #1 向管道传播「channelreadComplete」事件
ctx.fireChannelReadComplete();
// #2 如果配置为自动读,则在读完之后需要判断当前
// 如果没有注册OP_READ事件,则注册OP_READ。
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
ctx.fireChannelWritabilityChanged();
}
}
TailContext
TailContext 只寅了 ChannelInboundHandler 接口,它 Inbound 事件传播终止的地方。主要作用有:
- 终止 Inbound 事件。
- 释放数据资源。
具体源码就不看了,大部分方法都是空实现。
这里对 HeadContext 和 TailContext 做一个小结。
- 它们都是 ChannelHandler。存在于 ChannelPipeline 管道中。通过 ChannelPipeline 传播事件时,回调 ChannelHandler 相关方法。
HeadContext 是事件传播起点和终点(包括 Inbound 和 Outbound),任务繁重。而 TailContext 仅是 Inbound 的终点,部分 API 有自动释放资源的功能。
PendingHandlerTask
PendingHandlerTask 服务于 ChannelPipeline。当我们向 ChannelPipeline 添加通道时,如果此时通道未完成注册,则需要调用
DefaultChannelPipeline#callHandlerCallbackLater
方法添加到 ChanelPipeline 的异步任务队列中,待通道完成注册后,调用Pipeline#invokeHandlerAddedIfNeeded()
方法执行所有挂起的任务。而这个任务可以分成两类,一个是通道添加(PendingHandlerAddedTask),另一个是通道移除(PendingHandlerRemovedTask)。它们分别回调ChannelHandler#handlerAdded()
和ChannelHandler#handlerRemove()
方法。
上图主要想表达 PendingHandlerTask 执行时机和通道注册的前后关系。因为只有当通道注册完成,触发 HandlerAdded 事件才会有意义。所以 Netty 为了能异步触发事件创建了 PendingHandlerTask 相关的类。
PendingHandlerTask 之间构成单向链表,因此,可以通过next()
方法获取下一个节点任务并执行。具体源码就不贴出来了,我们只需要知道为什么会存在这几个内部类,哦,别记了。我们的管道初始化工作就是在方法pipeline.invokeHandlerAddedIfNeeded()
完成的(即回调ChannelInitializer#initChannel()
方法),它们向管道中添加我们自定义的 ChannelHandler。双向链表结构
ChannelPipeline 作用之一是维护 ChannelHandler 的顺序。内部是通过使用两个指针分别保存头结点和尾结点引用,这样无论是 addFirst() 在头部添加或是 addLast() 都能以最快速度完成。引用类型为 AbstractChannelHandlerContext,这个接下来会进行分析,AbstractChannelHandlerContext 之间构成双向链表结果。
ChannelHandlerContext 接口
ChannelHandlerContext 代表 Channelhandler 和 ChannelPipeline 之间的关联关系。ChannelHandler 和 ChannelHandlerContext 一一对应的:每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建新的 ChannelHandlerContext。ChannelHandlerContext 主要的功能有:
管理它所关联的 ChannelHandler 和在同一管道中其他 ChannelHandler 之间的交互。内部维护两个指针,分别指向前向节点和后向节点。构建双向链表。
- 向前或向后在管道中搜索可处理触发事件的 ChannelHandler。
下图对 ChannelHandlerContext API 进行了总结:
方法名称 | 描述 |
---|---|
channel | 返回绑定到这个实例的 Channel |
executor | 返回调度事件的 执行器 |
name | 返回这个实例的唯一名称 |
handler | 返回绑定到这个实例的 Channel |
isRemoved | 如果所关联的 ChannelHandler 已经从 ChannelPipeline 中移除则返回 true |
fireChannelRegistered | 触发下一个 ChannelInboundHandler 上的 channelRegistered() 方法 |
fireChannelUnregistered | 触发下一个 ChannelInboundHandler 上的 channelUnregistered() 方法 |
fireChannelActive | 触发下一个 ChannelInboundHandler 上的 channelActive() 方法 |
fireChannelInactive | 触发下一个 ChannelInboundHandler 上的 channelInactive() 方法 |
fireExceptionCaught | 触发下一个 ChannelInboundHandler 上的 exceptionCaught() 方法 |
fireUserEventTriggered | 触发下一个 ChannelInboundHandler 上的 userEventTriggered() 方法 |
fireChannelRead | 触发下一个 ChannelInboundHandler 上的 channelRead() 方法 |
fireChannelReadComplete | 触发下一个 ChannelInboundHandler 上的 channelReadComplete() 方法 |
fireChannelWritabilityChanged | 触发下一个 ChannelInboundHandler 上的 channelWritabilityChanged() 方法 |
read | 触发下一个 ChannelOutboundHandler 上的 read() 方法 |
flush | 触发下一个 ChannelOutboundHandler 上的 flush() 方法 |
pipeline | 返回这个实例所关联的 ChannelPipeline |
alloc | 返回这个实例所关联的 ByteBufAllocator 分配器 |
ChannelHandlerContext 本身有很多方法,其中一些方法存在于 Channel 和 ChannelPipeline 接口中,当调用 Channel 或 ChannelPipeline 上的方法时,它们将会沿着整个 ChannelPipeline 进行传播。而调用 ChannelHandlerContext 自身相同的方法(和 Channel 和 ChannelPipeline 方法相同),则将从当前所关联的 ChannelHandler 开始,只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。
层次结构
AbstractChannelHandlerContext
AbstractChannelHandlerContext 抽象类实现了 ChannelHandlerContext 接口的基本骨架,内部使用了多个变量维护所绑定的 ChannelHandler 的状态以及和 ChannelPipeline 交互的逻辑。
AbstractChannelHandlerContext 相关变量说明:
/**
* {@link ChannelHandlerContext} 抽象基类
*/
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
// 前向节点
volatile AbstractChannelHandlerContext next;
// 后向节点
volatile AbstractChannelHandlerContext prev;
// 所绑定的「handler」的状态更新器
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
// Handler 状态变化为:INIT->ADD_PENDING->ADD_COMPLETE->REMOVE_COMPLETE
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} 将要被调用
*/
private static final int ADD_PENDING = 1;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} 已经被调用
*/
private static final int ADD_COMPLETE = 2;
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} 已经被调用
*/
private static final int REMOVE_COMPLETE = 3;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* 或 {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} 没有被调用
*/
private static final int INIT = 0;
// 绑定的管道
private final DefaultChannelPipeline pipeline;
// 名称
private final String name;
// 是否有序。true: 执行器为空或为 OrderedEventExecutor接口的实例
private final boolean ordered;
// 掩码
private final int executionMask;
// 子执行器,可能为null
final EventExecutor executor;
private ChannelFuture succeededFuture;
// 迟延实例化任务。这些任务由不同的「Executor」触发Handler的事件
// 没有必要使用「volatile」修改此变量,因为即便在最坏的情况下,也只会创建更多的实例对象
private Tasks invokeTasks;
/**
* 处理器状态,默认值为{@link AbstractChannelHandlerContext#INIT}
*/
private volatile int handlerState = INIT;
// ...
}
从源码可以看出:
- 抽象类维护两个指针引用,分别是 prev 和 next,使得 HandlerContext 构成双向链表。
- handlerState 表示当前绑定的 HandlerContext 处于的实时状态。状态变化:INIT->ADD_PENDING->ADD_COMPLETE->REMOVE_COMPLETE。
- executionMask 表示执行掩码,通过相关位运算就能得到 ChannelHandler 所拥有的回调方法。这是 HandlerContext 构造器中完成相关掩码计算操作。
- 内部还有一个 EventExecutor 变量,可以让 ChannelHadnler 在此 EventExecutor 执行而非当前执行线程。如果当前 ChannelHandler 耗时比较多,如果放在当前执行线程执行的话会导致吞吐量下降,而 Netty 提供另一种方式,把耗时严重的 ChannelHandler 放在单独的 EventExecutor 中执行。
抽象类 AbstractChannelHandlerContext 实现了 ChannelInboundInvoker 和 ChannelOutboundInvoker 所有的接口。而 ChannelPipeline 也实现了这两个接口,那到底两者有什么区别呢?
- 直接调用 ChannelPipeline 方法传播事件,会从 HeadContext 开始传播。
- 直接调用 ChannelHandlerContext 方法传播事件,会从下一个匹配成功的 ChanelHandler 开始传播。
我们先也解 fireChannelRead(Object msg)
方法,其它方法同理。
fireChannelRead
// io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
/**
* 从下一个匹配的 {@link ChannelHandler} 开始传播「channelRead」事件
* @param msg 上一个{@link ChannelHandler}解码得到的消息
* @return
*/
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// #1 在管道中找寻此ChannelHandler下一个带有「MASK_CHANNEL_READ」标志位的「ChannelInbound」处理器
AbstractChannelHandlerContext contextInbound = findContextInbound(MASK_CHANNEL_READ);
// #2 回调「ChannelHandler」
invokeChannelRead(contextInbound, msg);
return this;
}
/**
* 寻找下一个匹配「mask」的「InboundChannelHandler」节点
* @param mask 掩码,
* @return
*/
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
// 获取下一个节点
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); // 判断是否跳过此节点
return ctx;
}
/**
* 此方法为静态方法,完成以下两个任务:
* ① 检测资源泄漏(touch)
* ② 根据是否为异线程封装异步任务或直接执行任务
* @param next
* @param msg
*/
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 用于检测资源是否存在泄漏
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
/**
* ① 通过 {@link #invokeHandler()} 判断当前「ChannelHandler」是否需要被跳过
* ② 不需要跳过,回调 {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}方法
* 需要跳过,继续在管道中寻找下一个匹配ChannelHandler
* @param msg
*/
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
方法 findContextInbound(int mask)
比较有趣,相关核心类是 io.netty.channel.ChannelHandlerMask,里面对常见的事件(如 CHANNEL_REGISTERED)进行位编码,然后通过相关位运算就能知道是否属于/不属于某个范围或某个值。
通过讲解 fireChannel() 方法,我们大致了解了在 ChannelHandlerContext 如何传播相关事件。
总结有以下几步:
- 在管道中寻找下一个匹配的 ChannelHandler。
- 针对异线程进行相关处理。
- 回调相关方法。在回调相关方法之前需要判断当前 ChannelHandler 的状态,如果
invokeHandler()==false
,则继续向下传播事件。当前 ChannelHandler 对消息不做任何处理。
其他方法流程与 fireChannelRead()
类似,这里就不做过多描述。
HeadContext/TailContext 与 ChannelHandlerContext 的关系
- HeadContext 继承 AbstractChannelHandlerContext,实现了 ChannelOutboundHandler 和 ChannelInboundHandler 接口。
- TailContext 同样继承 AbstractChannelHandlerContext,但只实现了 ChannelInboundHandler 接口。
- TailContext 实现实现 ChannelInboundHandler 接口用意是释放入站资源、避免内存泄漏。继承 AbstractChannelHandlerContext 抽象类则可以调用相关方法向前传播事件。
HeadContext 实现了 ChannelOutboundHandler 和 ChannelInboundHandler 接口,内部的所有接口它都有实现且,因为入站消息它是第一个拿到,出站消息它是最后一个拿到,所以此类就承载着非常重要的消息接收和消息发送功能。继承 AbstractChannelHandlerContext 抽象类则可以调用相关方法向后传播相关事件。
异常处理
异常处理是任务应用程序的重要组成吩,它是程序鲁棒性的重要实现方式之一。我们可以将异常分为以下几类:
入站异常
- 出站异常
入站异常
一个简单的示例:打消异常栈跟踪信息并关闭通道。
因为异常将会继续按照入站方向流动,所以相关异常处理通常位于 ChannelPipeline 的最后。这确保所有的入站异常都会被处理,无论发生在 ChannelPipeline 哪个位置。public class HelloWorldChannelInboundHandler extends SimpleChannelInboundHandler<FullHttpRequest> { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 关闭通道 ctx.close(); } }
如果处理异常取决于你自己的业务逻辑,比如最简单是关闭通道并记录出错日志。当然你也可以根据异常内容做出进一步判断:比如尝试恢复连接。
入站异常处理小结:
ChannelHandler.exceptionCaught()
的默认实现是简单地将异常向下传播。- 如果异常传播到 ChannelPipeline 的尾部,它将会被记录为未被处理。
如果想自定义异常处理,需要重写
exceptionCaught()
方法,根据异常类型决定是否继续在管道中向下传播此异常。出站异常
对于出站想说,无论正常或异常都是通过相关通知机制完成:
每个出站操作都将返回一个 ChannelFuture 对象。注册到 ChannelFuture 的 ChannelFutureListener 将在操作完成时被通知:该操作是否成功或失败。
- 几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise 实例。它是 ChannelFuture 的子类,可以用于异步通知的监听器。当然,它也具备立即通知的可写方法:
- ChannelPromise setSuccess()
- ChannelPromise setFailure(Throwable cause)
通常在调用出站操作的方法后面添加 ChannelFutureListener。比如:
ChannelFuture future = channel.write(msg); future.addListener(new ChanelFutureListener(){ @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess()) { f.cause().printStackTrace(); f.channel().close(); } } });
还可以将 ChannelFutureListener 添加到作为参数传递给 ChannelOutboundHandler 的方法的 ChannelPromise。
内置 ChannelHandler
总结