Pipeline是基于责任链模式来设计的,其内部是一个双向链表结构,能够支持动态的添加和删除业务处理器

1、入站和出站处理流程

入站处理器流动次序:从前到后
出站处理器流动次序:从后到前

h1->h2->h3->h4->h5->h6

  1. @Slf4j
  2. public class TestPipeline {
  3. public static void main(String[] args) {
  4. new ServerBootstrap()
  5. .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
  6. .channel(NioServerSocketChannel.class)
  7. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  8. @Override
  9. protected void initChannel(NioSocketChannel ch) throws Exception {
  10. ch.pipeline().addLast("h1",new ChannelInboundHandlerAdapter(){
  11. @Override
  12. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  13. log.info("入站处理器h1");
  14. super.channelRead(ctx, msg);
  15. }
  16. }).addLast("h2",new ChannelInboundHandlerAdapter(){
  17. @Override
  18. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  19. log.info("入站处理器h2");
  20. super.channelRead(ctx, msg);
  21. }
  22. }).addLast("h3",new ChannelInboundHandlerAdapter(){
  23. @Override
  24. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  25. log.info("入站处理器h3");
  26. super.channelRead(ctx, msg);
  27. ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes(StandardCharsets.UTF_8)));
  28. //ChannelHandlerContext调用无法触发出站处理器
  29. // ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes(StandardCharsets.UTF_8)));
  30. }
  31. }).addLast("h4",new ChannelOutboundHandlerAdapter(){
  32. @Override
  33. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  34. log.info("出站处理器h4");
  35. super.write(ctx, msg, promise);
  36. }
  37. }).addLast("h5",new ChannelOutboundHandlerAdapter(){
  38. @Override
  39. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  40. log.info("出站处理器h5");
  41. super.write(ctx, msg, promise);
  42. }
  43. }).addLast("h6",new ChannelOutboundHandlerAdapter(){
  44. @Override
  45. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
  46. log.info("出站处理器h6");
  47. super.write(ctx, msg, promise);
  48. }
  49. });
  50. }
  51. })
  52. .bind(8989);
  53. }
  54. }

输出

  1. 16:22:00.521 [nioEventLoopGroup-3-1] INFO com.jx.netty.netty.bootstrap.TestPipeline - 入站处理器h1
  2. 16:22:00.521 [nioEventLoopGroup-3-1] INFO com.jx.netty.netty.bootstrap.TestPipeline - 入站处理器h2
  3. 16:22:00.521 [nioEventLoopGroup-3-1] INFO com.jx.netty.netty.bootstrap.TestPipeline - 入站处理器h3
  4. 16:22:00.521 [nioEventLoopGroup-3-1] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message PooledUnsafeDirectByteBuf(ridx: 0, widx: 3, cap: 2048) that reached at the tail of the pipeline. Please check your pipeline configuration.
  5. 16:22:00.526 [nioEventLoopGroup-3-1] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded message pipeline : [h1, h2, h3, h4, h5, h6, DefaultChannelPipeline$TailContext#0]. Channel : [id: 0xd4642744, L:/127.0.0.1:8989 - R:/127.0.0.1:3432].
  6. 16:22:00.527 [nioEventLoopGroup-3-1] INFO com.jx.netty.netty.bootstrap.TestPipeline - 出站处理器h6
  7. 16:22:00.527 [nioEventLoopGroup-3-1] INFO com.jx.netty.netty.bootstrap.TestPipeline - 出站处理器h5
  8. 16:22:00.527 [nioEventLoopGroup-3-1] INFO com.jx.netty.netty.bootstrap.TestPipeline - 出站处理器h4

2、ChannelHandlerContext

ChannelHandlerContext 主要封装了ChannelHandler(通道处理器)和ChannelPipeline(通道流水线)之间的关联关系

Channel,Handler,ChannelHandlerContext三者关系:
Channel拥有一条ChannelPipeline,流水线中每个流水线节点为一个ChannelHandlerContext上下文对象,每个上下文对象包含了一个ChannelHandler

ChannelHandlerContext 包含很多方法,主要分为两类:

  • 第一类是获取上下文所关联的Netty组件实例,如关联的通道,流水线,上下文内部Handler业务处理器实例等
  • 第二类是入站和出站处理方法

Channel、ChannelPopeline、ChannelHandlerContext中都存在入站出站处理方法,功能有何不同? 如果通过Channel或者ChannelPopeline实例调用入站和出站处理方法,> 会在整条流水线传播> 如果通过ChannelHandlerContext调用入站和出站方法,> 只会从当前节点开始往同一类型的下一个处理器传播。** 在上节示例入站处理器3中如果采用ChannelHandlerContext调用写消息方法,将不会经过出站处理器

image.png

3、HeadContext和TailContext

  • 通道流水线在未加入任何业务处理器前装配了两个默认的处理器上下文:一个头部上下文HeadContext,一个尾部上下文TailContext
  • 每个双向链表结构从一开始就存在了HeadContextTailContext两个节点,后面添加的处理器上下文节点都添加在HeadContextTailContext实例之间。
  1. protected DefaultChannelPipeline(Channel channel) {
  2. this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
  3. this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
  4. this.voidPromise = new VoidChannelPromise(channel, true);
  5. this.tail = new DefaultChannelPipeline.TailContext(this);
  6. this.head = new DefaultChannelPipeline.HeadContext(this);
  7. this.head.next = this.tail;
  8. this.tail.prev = this.head;
  9. }

3.1、TailContext

TailContext为一个入站处理器,实现了所有入站处理的回调方法,这些回调实现的主要工作基本上都是有关收尾处理的,如释放缓冲区对象、完成异常处理等。

  1. final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
  2. TailContext(DefaultChannelPipeline pipeline) {
  3. super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, DefaultChannelPipeline.TailContext.class);
  4. this.setAddComplete();
  5. }
  6. public ChannelHandler handler() {
  7. return this;
  8. }
  9. public void channelRegistered(ChannelHandlerContext ctx) {
  10. }
  11. public void channelUnregistered(ChannelHandlerContext ctx) {
  12. }
  13. public void channelActive(ChannelHandlerContext ctx) {
  14. DefaultChannelPipeline.this.onUnhandledInboundChannelActive();
  15. }
  16. public void channelInactive(ChannelHandlerContext ctx) {
  17. DefaultChannelPipeline.this.onUnhandledInboundChannelInactive();
  18. }
  19. public void channelWritabilityChanged(ChannelHandlerContext ctx) {
  20. DefaultChannelPipeline.this.onUnhandledChannelWritabilityChanged();
  21. }
  22. public void handlerAdded(ChannelHandlerContext ctx) {
  23. }
  24. public void handlerRemoved(ChannelHandlerContext ctx) {
  25. }
  26. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
  27. DefaultChannelPipeline.this.onUnhandledInboundUserEventTriggered(evt);
  28. }
  29. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  30. DefaultChannelPipeline.this.onUnhandledInboundException(cause);
  31. }
  32. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  33. DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
  34. }
  35. public void channelReadComplete(ChannelHandlerContext ctx) {
  36. DefaultChannelPipeline.this.onUnhandledInboundChannelReadComplete();
  37. }
  38. }

3.2、HeadContext

既是一个出站处理器,也是一个入站处理器,还保存了一个unsafe实例(完成实际通道传输的类),也就是HeadContext还要负责最终的通道传输工作。

  1. final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
  2. //此类仅供Netty内部使用,应用程序不能用
  3. private final Unsafe unsafe;
  4. HeadContext(DefaultChannelPipeline pipeline) {
  5. super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, DefaultChannelPipeline.HeadContext.class);
  6. this.unsafe = pipeline.channel().unsafe();
  7. this.setAddComplete();
  8. }
  9. public ChannelHandler handler() {
  10. return this;
  11. }
  12. public void handlerAdded(ChannelHandlerContext ctx) {
  13. }
  14. public void handlerRemoved(ChannelHandlerContext ctx) {
  15. }
  16. public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
  17. this.unsafe.bind(localAddress, promise);
  18. }
  19. public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
  20. this.unsafe.connect(remoteAddress, localAddress, promise);
  21. }
  22. public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
  23. this.unsafe.disconnect(promise);
  24. }
  25. public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
  26. this.unsafe.close(promise);
  27. }
  28. public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
  29. this.unsafe.deregister(promise);
  30. }
  31. public void read(ChannelHandlerContext ctx) {
  32. this.unsafe.beginRead();
  33. }
  34. public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
  35. this.unsafe.write(msg, promise);
  36. }
  37. public void flush(ChannelHandlerContext ctx) {
  38. this.unsafe.flush();
  39. }
  40. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
  41. ctx.fireExceptionCaught(cause);
  42. }
  43. public void channelRegistered(ChannelHandlerContext ctx) {
  44. DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();
  45. ctx.fireChannelRegistered();
  46. }
  47. public void channelUnregistered(ChannelHandlerContext ctx) {
  48. ctx.fireChannelUnregistered();
  49. if (!DefaultChannelPipeline.this.channel.isOpen()) {
  50. DefaultChannelPipeline.this.destroy();
  51. }
  52. }
  53. public void channelActive(ChannelHandlerContext ctx) {
  54. ctx.fireChannelActive();
  55. this.readIfIsAutoRead();
  56. }
  57. public void channelInactive(ChannelHandlerContext ctx) {
  58. ctx.fireChannelInactive();
  59. }
  60. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  61. ctx.fireChannelRead(msg);
  62. }
  63. public void channelReadComplete(ChannelHandlerContext ctx) {
  64. ctx.fireChannelReadComplete();
  65. this.readIfIsAutoRead();
  66. }
  67. private void readIfIsAutoRead() {
  68. if (DefaultChannelPipeline.this.channel.config().isAutoRead()) {
  69. DefaultChannelPipeline.this.channel.read();
  70. }
  71. }
  72. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
  73. ctx.fireUserEventTriggered(evt);
  74. }
  75. public void channelWritabilityChanged(ChannelHandlerContext ctx) {
  76. ctx.fireChannelWritabilityChanged();
  77. }
  78. }