概述

我们知道,ChannelHandler 是我们实现自定义的业务逻辑地方,而 ChannelPipeline 则是编排和管理 ChannelHandler 的管道(容器):将 ChannelHandler 链接在一起以组织处理逻辑。我们接下来会深入了解 ChannelHandler 和 ChannelPipeline,别忘记了还有重要的纽带 — ChannelHandlerContext。理解这些组件之间的关系以及相关事件传播机制对于通过 Netty 构建模块化的、可重用的实现至关重要。

ChannelHandler 家族

对于网络编程来说,我们最需要关注的是通道的生命周期(比如是否活动、是否注册)以及相关 I/O 事件(连接、读和写)。因此,这些生命周期和 I/O 事件都会对应回调方法,一旦事件发生,则触发相关回调方法,这是异步框架的体现之一。

Channel 的生命周期

我们需要对 Channel 的生命周期有一定认知,下表列表了 Channel 的 4 种状态。

状态 描述
ChannelUnregistered Channel 已经被创建,但还未注册到 EventLoop
ChannelRegistered Channel 已经被注册到了 EventLoop
ChannelActive Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
ChannelInactive Channel 没有连接到远程节点

Channel 生命周期正常改变如下图所示:
Channel状态醋.png

ChannelHandler 层次结构

ChannelHandlerAdapter类的层次结构.png
从上图可以看出,ChannelHandler 的继承体系相对比较简洁明了。InboundHandler 和 OutBoundHandler 分别对应入站事件处理和出站事件处理。而中间的 ChannelHandlerAdapter 抽象类最重要的是实现 isSharable() 方法,这个等会再分析。

ChannelHandler

ChannelHandler 相关的生命周期如下表所示:

状态 描述
handlerAdded 当把 ChannelHandler 添加到 ChannelPipeline 中时被调用
handlerRemoved 当从 ChannelPipeline 中移除 ChannelHandler 时被调用
exceptionCaught 当处理过程中在 ChannelPipeline 中有错误产生时被调用

顶层接口 ChannelHandler 只定义了 ChannelHandler 相关生命周期的回调方法,这是 Netty 提供给用户的扩展点之一。

  1. /**
  2. * ChannelHandler 顶层接口
  3. */
  4. public interface ChannelHandler {
  5. /**
  6. * 「处理器已添加事件」
  7. *
  8. * {@link ChannelHandler} 被添加到实际的 {@link ChannelHandlerContext}之后调用,
  9. * 表明已准备好处理事件
  10. */
  11. void handlerAdded(ChannelHandlerContext ctx) throws Exception;
  12. /**
  13. * 「处理器被移除」
  14. *
  15. * {@link ChannelHandler}从实际的上下文中移除,意味着该handler不再会处理任何事件。
  16. */
  17. void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
  18. /**
  19. * 「异常抛出」
  20. *
  21. * 当一个 {@link Throwable} 被抛出后回调此方法。
  22. *
  23. * @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
  24. * implement the method there.
  25. */
  26. @Deprecated
  27. void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
  28. /**
  29. * 用于注解{@link ChannelHandler},表明该{@link ChannelHandler}是线程安全,
  30. * 可以被用于不同实例的{@link ChannelPipeline}管道中。
  31. */
  32. @Inherited
  33. @Documented
  34. @Target(ElementType.TYPE)
  35. @Retention(RetentionPolicy.RUNTIME)
  36. @interface Sharable {
  37. // no value
  38. }
  39. }

ChannelHandler 接口定义非常简单。

ChannelInboundHandler

而 ChannelInboundHandler 接口定义的方法则稍显复杂,相关回调接口归纳如下:

事件类型 描述
channelRegistered 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O时被调用
channelUnregistered 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O时被调用
channelActive 当处理过程中在 ChannelPipeline 中有错误产生时被调用
channelInactive 当 Channel 离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete 当 Channel 中所有可读的字节都已经从 Channel 中读取之后,将会回调此方法。
可能在调用此方法之前会多次调用 channelRead() 方法读取数据。
channelRead 当从 Channel 读取数据时被调用
ChannelWritabilityChanged 当 Channel 的可写状态发生改变时被调用。
用户可以确保写操作不会完成得太快(以避免 OutOfMemoryError)或者可以在 Channel 变为再次可写时恢复写入。
可以通过调用 Channel 的 isWritable()方法来检测 Channel 的可写性。
与可写性相关的阈值可以通过 Channel.config().setWriteHighWaterMark() 和 Channel.config().setWriteLowWaterMark() 方法来设置
userEventTriggered 当 ChannelnboundHandler.fireUserEventTriggered() 方法被调用时将会回调此方法,因为一个 POJO 在 ChannelPipeline 中传播

可以看到,ChannelInboundHandler 所定义的回调方法包含了 Channel 的生命周期,同时也增加了 4 个新的回调方法,它们用于在从 Channel 中读取数据的过程中触发,提供更多的回调接口便于用户扩展。

ChannelOutboundHandler

ChanelOutboundHandler 接口定义的方法则是与有关。相关回调接口归纳如下:

事件类型 描述
bind 当 Channel 绑定到本地地址时回调
connect 当 Channel 连接到远程节点时回调
disconnect 当 Channel 从远程节点断开时回调
close 当 Channel 被关闭时回调
deregister 当 Channel 从所绑定的 EventLoop 中注销时回调
read 从 Channel 读取数据时回调
write 通过 Channel 将数据暂存队列,并没有发送到远程节点
flush 通过 Channel 将入队数据冲刷到远程节点时回调

如果我们需要关注比如通道绑定、连接等事件的话,可以继承 ChannelOutboundHandler 接口,当感兴趣的事件发生时,相关方法将会被回调。

ChannelHandlerAdapter

它是一个抽象类,是 ChannelHandler 接口的实现类的骨架。一般 ChannelHandler 都会继承 ChannelHandlerAdapter,这是因为它实现了非常重要的方法 isSharable(),用来判断 Channelhandler 的实现类是否被含有 @Sharable 注解并缓存相关信息。

ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapter 可以看作是 ChannelInboundHandler 接口的实现基类,提供了所有方法的默认实现。有以下几点需要注意:

  • 该类只是将事件继续向下一个 ChannelHandler 节点传播,并不会做额外的工作。
  • 每个方法都包含 @Skip 注解,表明该方法不会被 ChannelPipeline 调用: 就是说默认所有的方法都会被跳过。因此,用户只需要覆盖感兴趣的方法即可。
  • channelRead() 方法自动返回后并不会释放 ByteBuf。因此,如果用户直接继承此类实现自身逻辑需要手动释放 ByteBuf 资源,避免内存泄漏。Netty 很贴心提供了可自动释放资源的 SimpleChannelInboundHandler。通常在实际的编程中会继承 SimpleChannelInboundHandler 而非 ChannelInboundHandlerAdapter。

    ChannelOutboundHandlerAdapter

    ChannelOutboundHandlerAdapter 可以看作是 ChannelOutboundHandler 接口的实现基类,提供了所有方法的默认实现。

  • 当调用 write() 方法处理数据时并不会释放 ByteBuf。因此,需要根据自身逻辑手动释放,避免内存泄漏。

    ChannelPipeline

    我们先看看 ChannelPipeline 层次结构:
    ChannelPipeline.png
    继承体系比较清晰,这里简单说明一下:

  • ChannelInboundInvoker 和 ChannelOutboundInvoker 分别定义触发 ChannelInbound 和 ChannelOutbound 回调方法的接口。ChannelInboundInvoker 接口名称以 fire 开头,而 ChannelOutboundInvoker 则定义出站事件触发接口。

  • ChannelPipeline 定义与 ChannelHnadler 增、删、改、查 、遍历等接口。
  • DefaultChannelPipeline 是 ChannelPipeline 的默认实现。

    ChannelInboundInvoker

    下图展示 ChannelInboundInvoker 的相关 API,用于通知 ChannelInboundHandler 在 ChannelPipeline 中所发生的事件。 7. ChannelPipeline 和 ChannelHandler - 图4

    ChannelOutboundInvoker

    ChannelOutboundInvoker 则定义和出站相关的触发接口。相关事件处理会导致底层的套接字上发生一系列的动作。相关 API 说明如下:
方法名称 描述
bind 当 Channel 绑定到一个本地地址后,将调用此方法:在 ChannelPipeline 中传播 bind 事件(回调 ChannelPipeline 中的下一个 ChannelOutboundHandler 的 bind() 方法)
connect 当 Channel 连接到一个远程地址后,将调用此方法:在管道中传播 connect 事件
disconnect 当 Channel 断开连接后,将调用此方法:在管道中传播 disconnect 事件
close 当 Channel 关闭连接后,将调用此方法:在管道中传播 close 事件
deregister 当 Channel 注销后,将调用此方法:在管道中传播 deregister 事件
flush 冲刷 Channel 所有挂起的数据。在管道中传播 flush 事件
write 当消息被写入 Channel 后,将调用此方法:在管道中传播 write 事件
writeAndFlush 先调用 write() 方法,再调用 flush 方法
read 请求从 Channel 中读取更多数据

ChannelPipeline

ChannelPipeline 定义的接口与 ChannelHandler 相关,包含 ChannelHandler 的增、删、改、查、遍历等。相关接口就不展示出来了。

DefaultChannelPipeline

DefaultChannelPipeline 是 ChannelPipeline 的默认实现,功能比较多,还有 5 个内部类,我们分别从内部类结构、ChannelInboundInvoker、ChannelOutboundInvoker 以及自身 ChannelPipeline 相关接口实现讲解。

内部类结构

DefaultChannelPipeline 定义了 5 个内部类结构,可以将分为两类,一类是上下文结点,一类是挂起的Handler添加/移除任务。

HeadContext

HeadContext 既是 Inbound 处理器,也是 Outbound 处理器(分别实现 ChannelInboundHandler 和 ChannelOutboundHandler 接口)。HeadContext 任务比较多:

  1. 网络数据流经 ChannelPipeline 的第一站。底层通过 Unsafe 从通道中读取数据。
  2. 处理后的数据流经 ChannelPipeline 的最后一站。底层通过 Unsafe 向通道写入数据。
  3. 在实现 ChannelInboundHandler 接口中,大部分实现逻辑是调用 ChannelHandlerContext#fireXX 方法,主要功能是回调 ChannelPipeline 中下一个可用的 InboundHandler 相关方法。
  4. 在实现 ChannelOutboundHandler 接口中,一般是与底层通道相关,会委托 Unsafe 魔法类来完成相关操作。比如从通道中读取数据、连接通道、关闭通道等。

相关源码摘录如下:

  1. // io.netty.channel.DefaultChannelPipeline.HeadContext
  2. final class HeadContext extends AbstractChannelHandlerContext
  3. implements ChannelOutboundHandler, ChannelInboundHandler {
  4. /**
  5. * 魔法类,用来操作底层Socket
  6. */
  7. private final Unsafe unsafe;
  8. @Override
  9. public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
  10. unsafe.bind(localAddress, promise);
  11. }
  12. @Override
  13. public void connect(
  14. ChannelHandlerContext ctx,
  15. SocketAddress remoteAddress, SocketAddress localAddress,
  16. ChannelPromise promise) {
  17. unsafe.connect(remoteAddress, localAddress, promise);
  18. }
  19. @Override
  20. public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
  21. unsafe.disconnect(promise);
  22. }
  23. @Override
  24. public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
  25. unsafe.close(promise);
  26. }
  27. @Override
  28. public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
  29. unsafe.deregister(promise);
  30. }
  31. @Override
  32. public void read(ChannelHandlerContext ctx) {
  33. unsafe.beginRead();
  34. }
  35. @Override
  36. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
  37. unsafe.write(msg, promise);
  38. }
  39. @Override
  40. public void flush(ChannelHandlerContext ctx) {
  41. unsafe.flush();
  42. }
  43. @Override
  44. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  45. ctx.fireExceptionCaught(cause);
  46. }
  47. @Override
  48. public void channelRegistered(ChannelHandlerContext ctx) {
  49. invokeHandlerAddedIfNeeded();
  50. ctx.fireChannelRegistered();
  51. }
  52. @Override
  53. public void channelUnregistered(ChannelHandlerContext ctx) {
  54. ctx.fireChannelUnregistered();
  55. // Remove all handlers sequentially if channel is closed and unregistered.
  56. if (!channel.isOpen()) {
  57. destroy();
  58. }
  59. }
  60. @Override
  61. public void channelActive(ChannelHandlerContext ctx) {
  62. ctx.fireChannelActive();
  63. // #2 触发Channel的读事件
  64. readIfIsAutoRead();
  65. }
  66. @Override
  67. public void channelInactive(ChannelHandlerContext ctx) {
  68. ctx.fireChannelInactive();
  69. }
  70. @Override
  71. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  72. ctx.fireChannelRead(msg); // HeadContext主要还是触发ChannelHandler的回调方法
  73. }
  74. /**
  75. * 已把本轮次通道中数据读完,向管道传播「channelreadComplete」事件
  76. * 事件传播过程:HeadContext->TailContext->HeadContext#read()
  77. * @param ctx
  78. */
  79. @Override
  80. public void channelReadComplete(ChannelHandlerContext ctx) {
  81. // #1 向管道传播「channelreadComplete」事件
  82. ctx.fireChannelReadComplete();
  83. // #2 如果配置为自动读,则在读完之后需要判断当前
  84. // 如果没有注册OP_READ事件,则注册OP_READ。
  85. readIfIsAutoRead();
  86. }
  87. private void readIfIsAutoRead() {
  88. if (channel.config().isAutoRead()) {
  89. channel.read();
  90. }
  91. }
  92. @Override
  93. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
  94. ctx.fireUserEventTriggered(evt);
  95. }
  96. @Override
  97. public void channelWritabilityChanged(ChannelHandlerContext ctx) {
  98. ctx.fireChannelWritabilityChanged();
  99. }
  100. }

TailContext

TailContext 只寅了 ChannelInboundHandler 接口,它 Inbound 事件传播终止的地方。主要作用有:

  1. 终止 Inbound 事件。
  2. 释放数据资源。

具体源码就不看了,大部分方法都是空实现。
这里对 HeadContext 和 TailContext 做一个小结。

  1. 它们都是 ChannelHandler。存在于 ChannelPipeline 管道中。通过 ChannelPipeline 传播事件时,回调 ChannelHandler 相关方法。
  2. HeadContext 是事件传播起点和终点(包括 Inbound 和 Outbound),任务繁重。而 TailContext 仅是 Inbound 的终点,部分 API 有自动释放资源的功能。

    PendingHandlerTask

    PendingHandlerTask 服务于 ChannelPipeline。当我们向 ChannelPipeline 添加通道时,如果此时通道未完成注册,则需要调用 DefaultChannelPipeline#callHandlerCallbackLater 方法添加到 ChanelPipeline 的异步任务队列中,待通道完成注册后,调用 Pipeline#invokeHandlerAddedIfNeeded() 方法执行所有挂起的任务。而这个任务可以分成两类,一个是通道添加(PendingHandlerAddedTask),另一个是通道移除(PendingHandlerRemovedTask)。它们分别回调 ChannelHandler#handlerAdded()ChannelHandler#handlerRemove() 方法。
    PendingHandlerTask 执行时机示意图.png
    上图主要想表达 PendingHandlerTask 执行时机和通道注册的前后关系。因为只有当通道注册完成,触发 HandlerAdded 事件才会有意义。所以 Netty 为了能异步触发事件创建了 PendingHandlerTask 相关的类。
    ChannelPipeline 保存 pendingHandlerTask 首位引用.png
    PendingHandlerTask 之间构成单向链表,因此,可以通过 next() 方法获取下一个节点任务并执行。具体源码就不贴出来了,我们只需要知道为什么会存在这几个内部类,哦,别记了。我们的管道初始化工作就是在方法 pipeline.invokeHandlerAddedIfNeeded() 完成的(即回调 ChannelInitializer#initChannel() 方法),它们向管道中添加我们自定义的 ChannelHandler。

    双向链表结构

    ChannelPipeline 作用之一是维护 ChannelHandler 的顺序。内部是通过使用两个指针分别保存头结点和尾结点引用,这样无论是 addFirst() 在头部添加或是 addLast() 都能以最快速度完成。引用类型为 AbstractChannelHandlerContext,这个接下来会进行分析,AbstractChannelHandlerContext 之间构成双向链表结果。
    ChannelPipeline双向链表结构.png

    ChannelHandlerContext 接口

    ChannelHandlerContext 代表 Channelhandler 和 ChannelPipeline 之间的关联关系。ChannelHandler 和 ChannelHandlerContext 一一对应的:每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建新的 ChannelHandlerContext。ChannelHandlerContext 主要的功能有:

  3. 管理它所关联的 ChannelHandler 和在同一管道中其他 ChannelHandler 之间的交互。内部维护两个指针,分别指向前向节点和后向节点。构建双向链表。

  4. 向前或向后在管道中搜索可处理触发事件的 ChannelHandler。

下图对 ChannelHandlerContext API 进行了总结:

方法名称 描述
channel 返回绑定到这个实例的 Channel
executor 返回调度事件的 执行器
name 返回这个实例的唯一名称
handler 返回绑定到这个实例的 Channel
isRemoved 如果所关联的 ChannelHandler 已经从 ChannelPipeline 中移除则返回 true
fireChannelRegistered 触发下一个 ChannelInboundHandler 上的 channelRegistered() 方法
fireChannelUnregistered 触发下一个 ChannelInboundHandler 上的 channelUnregistered() 方法
fireChannelActive 触发下一个 ChannelInboundHandler 上的 channelActive() 方法
fireChannelInactive 触发下一个 ChannelInboundHandler 上的 channelInactive() 方法
fireExceptionCaught 触发下一个 ChannelInboundHandler 上的 exceptionCaught() 方法
fireUserEventTriggered 触发下一个 ChannelInboundHandler 上的 userEventTriggered() 方法
fireChannelRead 触发下一个 ChannelInboundHandler 上的 channelRead() 方法
fireChannelReadComplete 触发下一个 ChannelInboundHandler 上的 channelReadComplete() 方法
fireChannelWritabilityChanged 触发下一个 ChannelInboundHandler 上的 channelWritabilityChanged() 方法
read 触发下一个 ChannelOutboundHandler 上的 read() 方法
flush 触发下一个 ChannelOutboundHandler 上的 flush() 方法
pipeline 返回这个实例所关联的 ChannelPipeline
alloc 返回这个实例所关联的 ByteBufAllocator 分配器

ChannelHandlerContext 本身有很多方法,其中一些方法存在于 Channel 和 ChannelPipeline 接口中,当调用 Channel 或 ChannelPipeline 上的方法时,它们将会沿着整个 ChannelPipeline 进行传播。而调用 ChannelHandlerContext 自身相同的方法(和 Channel 和 ChannelPipeline 方法相同),则将从当前所关联的 ChannelHandler 开始,只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。

层次结构

ChannelHandlerContext.png

AbstractChannelHandlerContext

AbstractChannelHandlerContext 抽象类实现了 ChannelHandlerContext 接口的基本骨架,内部使用了多个变量维护所绑定的 ChannelHandler 的状态以及和 ChannelPipeline 交互的逻辑。
AbstractChannelHandlerContext 相关变量说明:

/**
 * {@link ChannelHandlerContext} 抽象基类
 */
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
    // 前向节点
    volatile AbstractChannelHandlerContext next;
    // 后向节点
    volatile AbstractChannelHandlerContext prev;

    // 所绑定的「handler」的状态更新器
    private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

    // Handler 状态变化为:INIT->ADD_PENDING->ADD_COMPLETE->REMOVE_COMPLETE
    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} 将要被调用
     */
    private static final int ADD_PENDING = 1;

    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} 已经被调用
     */
    private static final int ADD_COMPLETE = 2;

    /**
     * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} 已经被调用
     */

    private static final int REMOVE_COMPLETE = 3;
    /**
     * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
     * 或 {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} 没有被调用
     */
    private static final int INIT = 0;

    // 绑定的管道
    private final DefaultChannelPipeline pipeline;
    // 名称
    private final String name;
    // 是否有序。true: 执行器为空或为 OrderedEventExecutor接口的实例
    private final boolean ordered;
    // 掩码
    private final int executionMask;

    // 子执行器,可能为null
    final EventExecutor executor;
    private ChannelFuture succeededFuture;

    // 迟延实例化任务。这些任务由不同的「Executor」触发Handler的事件
    // 没有必要使用「volatile」修改此变量,因为即便在最坏的情况下,也只会创建更多的实例对象
    private Tasks invokeTasks;

    /**
     * 处理器状态,默认值为{@link AbstractChannelHandlerContext#INIT}
     */
    private volatile int handlerState = INIT;

    // ...
}

从源码可以看出:

  1. 抽象类维护两个指针引用,分别是 prev 和 next,使得 HandlerContext 构成双向链表。
  2. handlerState 表示当前绑定的 HandlerContext 处于的实时状态。状态变化:INIT->ADD_PENDING->ADD_COMPLETE->REMOVE_COMPLETE。
  3. executionMask 表示执行掩码,通过相关位运算就能得到 ChannelHandler 所拥有的回调方法。这是 HandlerContext 构造器中完成相关掩码计算操作。
  4. 内部还有一个 EventExecutor 变量,可以让 ChannelHadnler 在此 EventExecutor 执行而非当前执行线程。如果当前 ChannelHandler 耗时比较多,如果放在当前执行线程执行的话会导致吞吐量下降,而 Netty 提供另一种方式,把耗时严重的 ChannelHandler 放在单独的 EventExecutor 中执行。

抽象类 AbstractChannelHandlerContext 实现了 ChannelInboundInvoker 和 ChannelOutboundInvoker 所有的接口。而 ChannelPipeline 也实现了这两个接口,那到底两者有什么区别呢?

  • 直接调用 ChannelPipeline 方法传播事件,会从 HeadContext 开始传播。
  • 直接调用 ChannelHandlerContext 方法传播事件,会从下一个匹配成功的 ChanelHandler 开始传播。

我们先也解 fireChannelRead(Object msg) 方法,其它方法同理。

fireChannelRead

// io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
/**
 * 从下一个匹配的 {@link ChannelHandler} 开始传播「channelRead」事件
 * @param msg 上一个{@link ChannelHandler}解码得到的消息
 * @return
 */    
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    // #1 在管道中找寻此ChannelHandler下一个带有「MASK_CHANNEL_READ」标志位的「ChannelInbound」处理器
    AbstractChannelHandlerContext contextInbound = findContextInbound(MASK_CHANNEL_READ);

    // #2 回调「ChannelHandler」
    invokeChannelRead(contextInbound, msg);
    return this;
}

/**
 * 寻找下一个匹配「mask」的「InboundChannelHandler」节点
 * @param mask 掩码,
 * @return
 */
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        // 获取下一个节点
        ctx = ctx.next;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); // 判断是否跳过此节点
    return ctx;
}

/**
 * 此方法为静态方法,完成以下两个任务:
 * ① 检测资源泄漏(touch)
 * ② 根据是否为异线程封装异步任务或直接执行任务
 * @param next
 * @param msg
 */
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 用于检测资源是否存在泄漏
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

/**
 * ① 通过 {@link #invokeHandler()} 判断当前「ChannelHandler」是否需要被跳过
 * ② 不需要跳过,回调 {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}方法
 *    需要跳过,继续在管道中寻找下一个匹配ChannelHandler
 * @param msg
 */
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

方法 findContextInbound(int mask) 比较有趣,相关核心类是 io.netty.channel.ChannelHandlerMask,里面对常见的事件(如 CHANNEL_REGISTERED)进行位编码,然后通过相关位运算就能知道是否属于/不属于某个范围或某个值。
通过讲解 fireChannel() 方法,我们大致了解了在 ChannelHandlerContext 如何传播相关事件。
总结有以下几步:

  1. 在管道中寻找下一个匹配的 ChannelHandler。
  2. 针对异线程进行相关处理。
  3. 回调相关方法。在回调相关方法之前需要判断当前 ChannelHandler 的状态,如果 invokeHandler()==false,则继续向下传播事件。当前 ChannelHandler 对消息不做任何处理。

其他方法流程与 fireChannelRead() 类似,这里就不做过多描述。

HeadContext/TailContext 与 ChannelHandlerContext 的关系

  1. HeadContext 继承 AbstractChannelHandlerContext,实现了 ChannelOutboundHandler 和 ChannelInboundHandler 接口。
  2. TailContext 同样继承 AbstractChannelHandlerContext,但只实现了 ChannelInboundHandler 接口。
  3. TailContext 实现实现 ChannelInboundHandler 接口用意是释放入站资源、避免内存泄漏。继承 AbstractChannelHandlerContext 抽象类则可以调用相关方法向前传播事件。
  4. HeadContext 实现了 ChannelOutboundHandler 和 ChannelInboundHandler 接口,内部的所有接口它都有实现且,因为入站消息它是第一个拿到,出站消息它是最后一个拿到,所以此类就承载着非常重要的消息接收和消息发送功能。继承 AbstractChannelHandlerContext 抽象类则可以调用相关方法向后传播相关事件。

    异常处理

    异常处理是任务应用程序的重要组成吩,它是程序鲁棒性的重要实现方式之一。我们可以将异常分为以下几类:

  5. 入站异常

  6. 出站异常

    入站异常

    一个简单的示例:打消异常栈跟踪信息并关闭通道。
    public class HelloWorldChannelInboundHandler 
     extends SimpleChannelInboundHandler<FullHttpRequest> {
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();
         // 关闭通道
         ctx.close();
     }
    }
    
    因为异常将会继续按照入站方向流动,所以相关异常处理通常位于 ChannelPipeline 的最后。这确保所有的入站异常都会被处理,无论发生在 ChannelPipeline 哪个位置。
    如果处理异常取决于你自己的业务逻辑,比如最简单是关闭通道并记录出错日志。当然你也可以根据异常内容做出进一步判断:比如尝试恢复连接。
    入站异常处理小结:
  • ChannelHandler.exceptionCaught() 的默认实现是简单地将异常向下传播。
  • 如果异常传播到 ChannelPipeline 的尾部,它将会被记录为未被处理。
  • 如果想自定义异常处理,需要重写 exceptionCaught() 方法,根据异常类型决定是否继续在管道中向下传播此异常。

    出站异常

    对于出站想说,无论正常或异常都是通过相关通知机制完成:

  • 每个出站操作都将返回一个 ChannelFuture 对象。注册到 ChannelFuture 的 ChannelFutureListener 将在操作完成时被通知:该操作是否成功或失败。

  • 几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise 实例。它是 ChannelFuture 的子类,可以用于异步通知的监听器。当然,它也具备立即通知的可写方法:
    • ChannelPromise setSuccess()
    • ChannelPromise setFailure(Throwable cause)
  • 通常在调用出站操作的方法后面添加 ChannelFutureListener。比如:

    ChannelFuture future = channel.write(msg);
    future.addListener(new ChanelFutureListener(){
      @Override
      public void operationComplete(ChannelFuture f) {
          if (!f.isSuccess()) {
              f.cause().printStackTrace();
              f.channel().close();
          }
      }
    });
    
  • 还可以将 ChannelFutureListener 添加到作为参数传递给 ChannelOutboundHandler 的方法的 ChannelPromise。

    内置 ChannelHandler

    7. ChannelPipeline 和 ChannelHandler - 图9

    总结