Pipeline的创建

    NioServerSocketChannel.java

    1. public static void main(String[] args) throws InterruptedException {
    2. // parentGroup中的eventLoop将来是用于与处理客户端连接请求的parentChannel进行绑定的
    3. EventLoopGroup parentGroup = new NioEventLoopGroup();
    4. // childGroup中的eventLoop将来是用于与处理客户端读写请求的childChannel进行绑定的
    5. EventLoopGroup childGroup = new NioEventLoopGroup();
    6. final EventLoopGroup someGroup = new NioEventLoopGroup();
    7. try {
    8. ServerBootstrap bootstrap = new ServerBootstrap();
    9. bootstrap.group(parentGroup, childGroup)
    10. //从这里开始跟踪,跟踪NioServerSocketChannel.class
    11. .channel(NioServerSocketChannel.class)
    12. //这两个方法都是用于设置Channel属性的
    13. //attr的方法一般是用于parentGroup中,一般会在当客户端与服务端连接建立的时候触发调用
    14. .attr(AttributeKey.valueOf("depart"), "市场部")
    15. //chaild的方法一般用于childGroup中,一般用于客户端往服务端发送消息的时候触发调用
    16. .childAttr(AttributeKey.valueOf("addr"), "北京海淀")
    17. .childHandler(new ChannelInitializer<SocketChannel>() {
    18. @Override
    19. protected void initChannel(SocketChannel ch) throws Exception {
    20. ChannelPipeline pipeline = ch.pipeline();
    21. pipeline.addLast(new StringDecoder());
    22. pipeline.addLast(new StringEncoder());
    23. pipeline.addLast(new SomeSocketServerHandler());
    24. Object depart = ch.attr(AttributeKey.valueOf("depart")).get();
    25. Object addr = ch.attr(AttributeKey.valueOf("addr")).get();
    26. System.out.println("depart = " + depart);
    27. System.out.println("addr = " + addr);
    28. }
    29. });
    30. ChannelFuture future = bootstrap.bind(8888).sync();
    31. System.out.println("服务器已启动。。。");
    32. future.channel().closeFuture().sync();
    33. } finally {
    34. parentGroup.shutdownGracefully();
    35. childGroup.shutdownGracefully();
    36. }
    37. }

    NioServerSocketChannel.java

    1. public NioServerSocketChannel(ServerSocketChannel channel) {
    2. // 第一个参数:父channel
    3. // 第二个参数:Nio原生channel
    4. // 第三个参数:指定其关注的事件为 接收客户端连接
    5. // 跟踪super
    6. super(null, channel, SelectionKey.OP_ACCEPT);
    7. // config用于对当前channel进行配置的
    8. config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    9. }

    AbstractNioMessageChannel.java

    1. protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    2. // 跟踪super
    3. super(parent, ch, readInterestOp);
    4. }

    AbstractNioChannel.java

    1. protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    2. //跟踪super
    3. super(parent);
    4. // NIO原生channel
    5. this.ch = ch;
    6. // 初始化其关注事件
    7. this.readInterestOp = readInterestOp;
    8. try {
    9. // 指定这里使用的是非阻塞IO,即NIO
    10. ch.configureBlocking(false);
    11. } catch (IOException e) {
    12. try {
    13. ch.close();
    14. } catch (IOException e2) {
    15. if (logger.isWarnEnabled()) {
    16. logger.warn(
    17. "Failed to close a partially initialized socket.", e2);
    18. }
    19. }
    20. throw new ChannelException("Failed to enter non-blocking mode.", e);
    21. }
    22. }

    AbstractChannel.java

    1. //其Pipeline对象是在这里创建的
    2. protected AbstractChannel(Channel parent) {
    3. this.parent = parent;
    4. // 生成channel的id
    5. id = newId();
    6. // 底层操作对象
    7. unsafe = newUnsafe();
    8. // 创建了channelPipeline
    9. // 准备跟踪
    10. pipeline = newChannelPipeline();
    11. }

    DefaultChannelPipeline.java

    1. // 生成head节点的名称,简单类名后添加#0
    2. private static final String HEAD_NAME = generateName0(HeadContext.class);
    3. // 生成tail节点的名称,简单类名后添加#0
    4. private static final String TAIL_NAME = generateName0(TailContext.class);
    5. protected DefaultChannelPipeline newChannelPipeline() {
    6. return new DefaultChannelPipeline(this);
    7. }
    8. // pipeline本质上是一个双向链表
    9. protected DefaultChannelPipeline(Channel channel) {
    10. this.channel = ObjectUtil.checkNotNull(channel, "channel");
    11. succeededFuture = new SucceededChannelFuture(channel, null);
    12. voidPromise = new VoidChannelPromise(channel, true);
    13. // 尾节点
    14. // 准备跟踪
    15. tail = new TailContext(this);
    16. // 头节点
    17. head = new HeadContext(this);
    18. // 头节的下一个指针指向尾节点
    19. head.next = tail;
    20. // 尾节点的上一个指针指向头结点
    21. tail.prev = head;
    22. }
    23. // A special catch-all handler that handles both bytes and messages.
    24. // tail节点是一个inbound处理器,用于释放资源
    25. final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    26. TailContext(DefaultChannelPipeline pipeline) {
    27. // 第二个参数为当前节点所绑定的eventLoop,当前为null
    28. // 第三个参数为当前节点的名称
    29. // 准备跟踪
    30. super(pipeline, null, TAIL_NAME, TailContext.class);
    31. // 修改节点状态为ADD_COMPLETE
    32. setAddComplete();
    33. }
    34. }

    AbstractChannelHandContext.java

    1. AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
    2. String name, Class<? extends ChannelHandler> handlerClass) {
    3. this.name = ObjectUtil.checkNotNull(name, "name");
    4. this.pipeline = pipeline;
    5. // 当前节点所绑定的eventLoop
    6. this.executor = executor;
    7. // 初始化执行标记,通过该变量可以判断出当前处理器是inbound还是outbound处理器,
    8. // 可以判断出哪些方法应该跳过,哪些方法应该执行
    9. // 准备跟踪
    10. this.executionMask = mask(handlerClass);
    11. // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    12. ordered = executor == null || executor instanceof OrderedEventExecutor;
    13. }

    ChannelHeadnlerMask.java

    1. static int mask(Class<? extends ChannelHandler> clazz) {
    2. // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
    3. // lookup in the future.
    4. Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
    5. Integer mask = cache.get(clazz);
    6. if (mask == null) {
    7. // 计算mask的值
    8. mask = mask0(clazz);
    9. cache.put(clazz, mask);
    10. }
    11. return mask;
    12. }
    13. /**
    14. * Calculate the {@code executionMask}.
    15. */
    16. private static int mask0(Class<? extends ChannelHandler> handlerType) {
    17. // 初始化mask
    18. int mask = MASK_EXCEPTION_CAUGHT;
    19. try {
    20. // 处理handlerType为inbound处理器的情况
    21. if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
    22. mask |= MASK_ALL_INBOUND;
    23. // 判断channelRegistered()方法是否应该跳过执行,即不执行
    24. // 若channelRegistered()上出现了@Skip注解,则应该跳过
    25. // 像我们自定义的Handler处理器里面继承的ChannelInboundHandlerAdapter
    26. // 类里面的channelRead(),exceptionCaught等等它就不会跳过,
    27. // 因为我们实现的方法上面没有@Skip注解
    28. if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
    29. mask &= ~MASK_CHANNEL_REGISTERED;
    30. }
    31. if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
    32. mask &= ~MASK_CHANNEL_UNREGISTERED;
    33. }
    34. if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
    35. mask &= ~MASK_CHANNEL_ACTIVE;
    36. }
    37. if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
    38. mask &= ~MASK_CHANNEL_INACTIVE;
    39. }
    40. if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
    41. mask &= ~MASK_CHANNEL_READ;
    42. }
    43. if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
    44. mask &= ~MASK_CHANNEL_READ_COMPLETE;
    45. }
    46. if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
    47. mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
    48. }
    49. if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
    50. mask &= ~MASK_USER_EVENT_TRIGGERED;
    51. }
    52. }
    53. // 处理handlerType为outbound处理器的情况
    54. if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
    55. mask |= MASK_ALL_OUTBOUND;
    56. if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
    57. SocketAddress.class, ChannelPromise.class)) {
    58. mask &= ~MASK_BIND;
    59. }
    60. if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
    61. SocketAddress.class, ChannelPromise.class)) {
    62. mask &= ~MASK_CONNECT;
    63. }
    64. if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
    65. mask &= ~MASK_DISCONNECT;
    66. }
    67. if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
    68. mask &= ~MASK_CLOSE;
    69. }
    70. if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
    71. mask &= ~MASK_DEREGISTER;
    72. }
    73. if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
    74. mask &= ~MASK_READ;
    75. }
    76. if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
    77. Object.class, ChannelPromise.class)) {
    78. mask &= ~MASK_WRITE;
    79. }
    80. if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
    81. mask &= ~MASK_FLUSH;
    82. }
    83. }
    84. if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
    85. mask &= ~MASK_EXCEPTION_CAUGHT;
    86. }
    87. } catch (Exception e) {
    88. // Should never reach here.
    89. PlatformDependent.throwException(e);
    90. }
    91. return mask;
    92. }
    93. // 判断方法上是否有@Skip注解
    94. @SuppressWarnings("rawtypes")
    95. private static boolean isSkippable(
    96. final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
    97. return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
    98. @Override
    99. public Boolean run() throws Exception {
    100. return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(Skip.class);
    101. }
    102. });
    103. }

    回过头来跟踪头节点

    DefaultChannelPipeline.java

    1. // head节点既是一个inbound处理器,又是一个outbound处理器
    2. // 作为inbound处理器,其用于将消息向下一个节点传递
    3. // 作为outbound处理器,其用于完成直接的底层操作
    4. final class HeadContext extends AbstractChannelHandlerContext
    5. implements ChannelOutboundHandler, ChannelInboundHandler {
    6. private final Unsafe unsafe;
    7. HeadContext(DefaultChannelPipeline pipeline) {
    8. super(pipeline, null, HEAD_NAME, HeadContext.class);
    9. // 获取底层操作对象unsafe
    10. unsafe = pipeline.channel().unsafe();
    11. setAddComplete();
    12. }
    13. }

    像Pipeline中添加处理器

    DefaultChannelPipeline.java

    1. @Override
    2. public final ChannelPipeline addLast(ChannelHandler... handlers) {
    3. // 第一个参数是一个EventLoopGroup
    4. // 第二个参数是一个数组
    5. return addLast(null, handlers);
    6. }
    7. @Override
    8. public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    9. if (handlers == null) {
    10. throw new NullPointerException("handlers");
    11. }
    12. // 逐个添加处理器
    13. for (ChannelHandler h: handlers) {
    14. if (h == null) {
    15. break;
    16. }
    17. // 添加一个处理器到pipeline
    18. // 第二个参数为name,
    19. addLast(executor, null, h);
    20. }
    21. return this;
    22. }
    23. @Override
    24. public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    25. final AbstractChannelHandlerContext newCtx;
    26. synchronized (this) {
    27. // 检测当前handler是否被重复添加
    28. // 1:准备跟踪
    29. checkMultiplicity(handler);
    30. // 将处理器封装为一个节点
    31. // filterName() 获取节点名称
    32. // 2:准备跟踪
    33. newCtx = newContext(group, filterName(name, handler), handler);
    34. // 将新的节点添加到pipeline
    35. addLast0(newCtx);
    36. // If the registered is false it means that the channel was not registered on an eventLoop yet.
    37. // In this case we add the context to the pipeline and add a task that will call
    38. // ChannelHandler.handlerAdded(...) once the channel is registered.
    39. // 处理channel没有注册的情况
    40. if (!registered) {
    41. newCtx.setAddPending();
    42. callHandlerCallbackLater(newCtx, true);
    43. return this;
    44. }
    45. // 获取当前处理器节点的eventLoop
    46. EventExecutor executor = newCtx.executor();
    47. if (!executor.inEventLoop()) {
    48. callHandlerAddedInEventLoop(newCtx, executor);
    49. return this;
    50. }
    51. }
    52. // 触发当前处理器的handlerAdded()方法
    53. // 3:准备跟踪
    54. callHandlerAdded0(newCtx);
    55. return this;
    56. }
    57. private static void checkMultiplicity(ChannelHandler handler) {
    58. if (handler instanceof ChannelHandlerAdapter) {
    59. ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
    60. // 若当前处理器是非共享处理器,且已经被添加过了,则抛出异常
    61. // 因为非共享处理器只能被添加一次
    62. if (!h.isSharable() && h.added) {
    63. throw new ChannelPipelineException(
    64. h.getClass().getName() +
    65. " is not a @Sharable handler, so can't be added or removed multiple times.");
    66. }
    67. // 修改添加状态变量
    68. h.added = true;
    69. }
    70. }
    71. private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    72. // childExecutor() 获取当前节点所绑定的eventLoop
    73. return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    74. }
    75. private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    76. try {
    77. // 调用当前节点处理器的handlerAdded()
    78. ctx.callHandlerAdded();
    79. } catch (Throwable t) {
    80. boolean removed = false;
    81. try {
    82. remove0(ctx);
    83. ctx.callHandlerRemoved();
    84. removed = true;
    85. } catch (Throwable t2) {
    86. if (logger.isWarnEnabled()) {
    87. logger.warn("Failed to remove a handler: " + ctx.name(), t2);
    88. }
    89. }
    90. if (removed) {
    91. fireExceptionCaught(new ChannelPipelineException(
    92. ctx.handler().getClass().getName() +
    93. ".handlerAdded() has thrown an exception; removed.", t));
    94. } else {
    95. fireExceptionCaught(new ChannelPipelineException(
    96. ctx.handler().getClass().getName() +
    97. ".handlerAdded() has thrown an exception; also failed to remove.", t));
    98. }
    99. }
    100. }

    处理器在Pipeline中被删除

    ChannelInitializer.java

    1. // 其为一个可共享的处理器,即该处理器实例可以被添加到多个pipeline,可以被多次添加到pipeline
    2. @Sharable
    3. // 其为一个inbound处理器
    4. public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
    5. private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
    6. // We use a Set as a ChannelInitializer is usually shared between all Channels in a Bootstrap /
    7. // ServerBootstrap. This way we can reduce the memory usage compared to use Attributes.
    8. // 该集合中存放的是当前处理器实例所封装的处理器节点
    9. private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
    10. new ConcurrentHashMap<ChannelHandlerContext, Boolean>());
    11. // 当, 当前处理器被添加到pipeline后,该方法就会被调用
    12. @Override
    13. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    14. if (ctx.channel().isRegistered()) { // 若当前channel已经注册完毕了,则...
    15. if (initChannel(ctx)) { // 调用本地的initChannel()
    16. removeState(ctx); // 将当前处理器所封装的节点从initMap中删除
    17. }
    18. }
    19. }
    20. /**
    21. * @param ctx 当前处理器所封装为的节点
    22. * @return
    23. * @throws Exception
    24. */
    25. @SuppressWarnings("unchecked")
    26. private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    27. // 若ctx节点添加到initMap成功,则...
    28. if (initMap.add(ctx)) { // Guard against re-entrance.
    29. try {
    30. // 调用重写的initChannel(),该方法执行完毕,则其历史使命完成,就可以做收尾工作了
    31. initChannel((C) ctx.channel());
    32. } catch (Throwable cause) {
    33. // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
    34. // We do so to prevent multiple calls to initChannel(...).
    35. exceptionCaught(ctx, cause);
    36. } finally {
    37. // 收尾工作:将当前节点从当前pipeline上删除
    38. ChannelPipeline pipeline = ctx.pipeline();
    39. // 从pipeline中首先对当前处理器进行查询,若存在,则将该节点从pipeline中删除
    40. if (pipeline.context(this) != null) {
    41. // 准备跟踪
    42. pipeline.remove(this);
    43. }
    44. }
    45. return true;
    46. }
    47. return false;
    48. }
    49. }

    DefaultChannelPipeline.java

    1. @Override
    2. public final ChannelPipeline remove(ChannelHandler handler) {
    3. // 跟踪getContextOrDie()方法
    4. remove(getContextOrDie(handler));
    5. return this;
    6. }
    7. // 不成功便成仁
    8. private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    9. // 查询当前处理器所封装的节点
    10. // 准备跟踪context()方法
    11. AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    12. // 若没有找到,则直接抛出异常,否则返回找到的节点
    13. if (ctx == null) {
    14. throw new NoSuchElementException(handler.getClass().getName());
    15. } else {
    16. return ctx;
    17. }
    18. }
    19. @Override
    20. public final ChannelHandlerContext context(ChannelHandler handler) {
    21. if (handler == null) {
    22. throw new NullPointerException("handler");
    23. }
    24. // 从头节点的下一个节点开始
    25. AbstractChannelHandlerContext ctx = head.next;
    26. // 遍历整个pipeline,要么找到了返回,要么返回null
    27. for (;;) {
    28. if (ctx == null) {
    29. return null;
    30. }
    31. if (ctx.handler() == handler) {
    32. return ctx;
    33. }
    34. // 获取下一个节点
    35. ctx = ctx.next;
    36. }
    37. }

    然后回过头来再跟踪一下ChannelInitializer.java中的removeState()

    1. private void removeState(final ChannelHandlerContext ctx) {
    2. // The removal may happen in an async fashion if the EventExecutor we use does something funky.
    3. // 若当前节点的状态为 删除完毕,则将当前节点从initMap中删除
    4. if (ctx.isRemoved()) {
    5. initMap.remove(ctx);
    6. } else {
    7. // The context is not removed yet which is most likely the case because a custom EventExecutor is used.
    8. // Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.
    9. ctx.executor().execute(new Runnable() {
    10. @Override
    11. public void run() {
    12. initMap.remove(ctx);
    13. }
    14. });
    15. }
    16. }