要求 😯

最少能看懂 netty-exampleecho范例

示例 😄

  1. public void bind(Integer port) {
  2. NioEventLoopGroup boss = new NioEventLoopGroup(1);
  3. NioEventLoopGroup work = new NioEventLoopGroup(1);
  4. try {
  5. ServerBootstrap serverBootstrap = new ServerBootstrap();
  6. final ServerTransactionHandler serverTransactionHandler = new ServerTransactionHandler(this.applicationContext);
  7. serverBootstrap.group(boss, work)
  8. .channel(NioServerSocketChannel.class)
  9. .handler(new LoggingHandler())
  10. .childHandler(new ChannelInitializer<SocketChannel>() {
  11. @Override
  12. public void initChannel(SocketChannel ch) {
  13. ch.pipeline()
  14. .addLast(new MessageDecoder(serialization))
  15. .addLast(new MessageEncoder(serialization))
  16. .addLast(new IdleStateHandler(20, 40, 60))
  17. .addLast(serverTransactionHandler);
  18. }
  19. }).option(ChannelOption.SO_BACKLOG, 128)
  20. .childOption(ChannelOption.SO_KEEPALIVE, true);
  21. channelFuture = serverBootstrap.bind(port).sync();
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. }

讲述过程 😂

  • NioEventLoopGroup 初始化
    • 类图
    • NioEventLoop
  • ServerBootstrap 启动
    • channel
    • handler 和 childHandler
    • bind (重点)
      • ChannelPipeline (稍微提及)
      • ChannelHandlerContext (稍微提及)
  • NioServerSocketChannel 初始化
  • Pipeline和ChannelHandlerContext的初始化

ServerBootstrap 和 NioServerSocketChannel 不是完全按照这里描述的顺序出现,还是按照Netty启动的顺序出现. 这里这是单独罗列. 😂

能收获到什么 📌

希望看完后,能理解他们之间的关系. 😂

视频版 🌲

文字版 👷

NioEventLoopGroup的初始化

image.png

NioEventLoopGroup 构造方法

  1. public NioEventLoopGroup(int nThreads) {
  2. this(nThreads, (Executor) null);
  3. }
  4. public NioEventLoopGroup(int nThreads, Executor executor) {
  5. this(nThreads, executor, SelectorProvider.provider());
  6. }
  7. public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
  8. this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
  9. }
  10. public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
  11. final SelectStrategyFactory selectStrategyFactory) {
  12. //看UML
  13. super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
  14. }

MultithreadEventLoopGroup 构造方法

  1. protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
  2. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  3. }

MultithreadEventExecutorGroup 构造方法

  1. protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
  2. this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
  3. }
  4. protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
  5. this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
  6. }

初始化核心逻辑

  1. protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
  2. EventExecutorChooserFactory chooserFactory, Object... args) {
  3. //跳过校验
  4. //这里只是一个数组. 还没有实例化填充数据
  5. //children 数组对象 EventExecutor
  6. children = new EventExecutor[nThreads];
  7. for (int i = 0; i < nThreads; i ++) {
  8. boolean success = false;
  9. try {
  10. //交给子类去实例化,子类可以自由选择,可以是NIO,可以是Epoll也可以是Kqueue等等
  11. children[i] = newChild(executor, args);
  12. success = true;
  13. } catch (Exception e) {
  14. // TODO: Think about if this is a good exception type
  15. throw new IllegalStateException("failed to create a child event loop", e);
  16. } finally {
  17. //初始化失败以后的善后工作... 先跳过
  18. }
  19. }
  20. //根据children数组的长度选择EventExecutorChooser,如果是2的n次幂使用
  21. //PowerOfTwoEventExecutorChooser 否则使用 GenericEventExecutorChooser(轮询)
  22. chooser = chooserFactory.newChooser(children);
  23. //终止的监听器
  24. final FutureListener<Object> terminationListener = new FutureListener<Object>() {
  25. @Override
  26. public void operationComplete(Future<Object> future) throws Exception {
  27. if (terminatedChildren.incrementAndGet() == children.length) {
  28. terminationFuture.setSuccess(null);
  29. }
  30. }
  31. };
  32. //设置上
  33. for (EventExecutor e: children) {
  34. e.terminationFuture().addListener(terminationListener);
  35. }
  36. Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
  37. Collections.addAll(childrenSet, children);
  38. readonlyChildren = Collections.unmodifiableSet(childrenSet);
  39. }

MultithreadEventExecutorGroup#newChild

看下 NioEventLoopGroup 的实现

  1. @Override
  2. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  3. return new NioEventLoop(this, executor, (SelectorProvider) args[0],
  4. ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
  5. }

NioEventLoop

image.png
初始化比较简单,具体可以看代码, UML 的关系相对比较重要,关注他的核心方法

  • register
  • SingleThreadEventExecutor#run

    总结

    NioEventLoopGroup 的初始化比较简单,主要是了解一个 Group 中存在几个 NioEventLoop ,数量在初始化的时候指定,默认是CPU核数的2倍

ServerBootstrap 👀

  1. ServerBootstrap serverBootstrap = new ServerBootstrap();

ServerBootstrap 是一个启动类,非线程安全,但是启动仅一次,也没啥关系 (目前还没看到过需要启动多次的情况)
group,handler等设置的方法可以看下代码,简单就直接跳过了

channel

也是一个设置方法,但是设计到一个Channel 的factory

  1. public B channel(Class<? extends C> channelClass) {
  2. if (channelClass == null) {
  3. throw new NullPointerException("channelClass");
  4. } else {
  5. return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
  6. }
  7. }

代码很简单,就是设置一个 ReflectiveChannelFactory 到字段 channelFactory
ReflectiveChannelFactory 何许人也,一个channel的工厂类,生成模板的方式是反射. 具体的类型需要外网传递进来,例如这里传递进来的就是 NioServerSocketChannel.class . 后文讲述 initAndRegister 还会描述到。

childHandler

设置childHandler,代码也比较简单,一个普通的set方法,这里提一下他的特殊性.

  • child是什么?
  • 存在child,那么parent又是什么?

child和parent是相对而已,这里的parent可以理解是ServerSocketChannel. child则是SocketChannel. 所以 childHandler 是给后续链接上来的 Channel 使用的 handler .. 理解了这一层, optionchildOption 就不在话下了

我们继续往下走..

bind 📖

绑定端口到socket上,在NIO中讲述的时候比较简单,获取 ServerSocketChannel ,然后调用 Socketbind 就可以完成一个NIO的启动了. 但是在Netty中

  • ServerSocketChannel 在哪里,只有一个 NioServerSocketChannel.class ,还是 Netty 包的.
  • Socketbind 到底是在哪里调用的.
  1. public ChannelFuture bind(int inetPort) {
  2. return this.bind(new InetSocketAddress(inetPort));
  3. }
  4. public ChannelFuture bind(SocketAddress localAddress) {
  5. this.validate();
  6. if (localAddress == null) {
  7. throw new NullPointerException("localAddress");
  8. } else {
  9. return this.doBind(localAddress);
  10. }
  11. }

doBind 🐎

  1. private ChannelFuture doBind(final SocketAddress localAddress) {
  2. //initAndRegister 字面意思,好像逻辑都在这里边了
  3. final ChannelFuture regFuture = this.initAndRegister();
  4. final Channel channel = regFuture.channel();
  5. //如果初始化和注册存在异常,说明注册失败.
  6. if (regFuture.cause() != null) {
  7. return regFuture;
  8. } else if (regFuture.isDone()) {
  9. //注册完成了那就去执行bind操作
  10. ChannelPromise promise = channel.newPromise();
  11. doBind0(regFuture, channel, localAddress, promise);
  12. return promise;
  13. } else {
  14. //还在处理当中,那就等注册完成的时候回调这里完成doBind0
  15. final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
  16. regFuture.addListener(new ChannelFutureListener() {
  17. public void operationComplete(ChannelFuture future) throws Exception {
  18. Throwable cause = future.cause();
  19. if (cause != null) {
  20. promise.setFailure(cause);
  21. } else {
  22. promise.registered();
  23. AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
  24. }
  25. }
  26. });
  27. return promise;
  28. }
  29. }

initAndRegister 💥

  1. final ChannelFuture initAndRegister() {
  2. Channel channel = null;
  3. try {
  4. //channelFactory 就是上文提到的 ReflectiveChannelFactory
  5. //生成的方式很简单,就是根据传递到底class进行一次反射调用
  6. channel = this.channelFactory.newChannel();
  7. this.init(channel);
  8. } catch (Throwable var3) {
  9. if (channel != null) {
  10. channel.unsafe().closeForcibly();
  11. return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
  12. }
  13. return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
  14. }
  15. //反射成功以后,执行注册操作
  16. ChannelFuture regFuture = this.config().group().register(channel);
  17. if (regFuture.cause() != null) {
  18. if (channel.isRegistered()) {
  19. channel.close();
  20. } else {
  21. channel.unsafe().closeForcibly();
  22. }
  23. }
  24. return regFuture;
  25. }

🦑 由于这里涉及到 NioServerSocketChannel 的初始化,这里先打断,看完他的反射初始化逻辑

NioServerSocketChannel 的初始化🔨

ReflectiveChannelFactory的反射

  1. public T newChannel() {
  2. try {
  3. return (Channel)this.clazz.getConstructor().newInstance();
  4. } catch (Throwable var2) {
  5. throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
  6. }
  7. }

image.png

  1. public NioServerSocketChannel() {
  2. this(newSocket(DEFAULT_SELECTOR_PROVIDER));
  3. }
  4. private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
  5. try {
  6. return provider.openServerSocketChannel();
  7. } catch (IOException var2) {
  8. throw new ChannelException("Failed to open a server socket.", var2);
  9. }
  10. }
  11. public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
  12. super((Channel)null, channel, 16);
  13. this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket());
  14. }

AbstractNioMessageChannel 的实例化

  1. protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  2. super(parent, ch, readInterestOp);
  3. }

AbstractNioChannel 的实例化

  1. protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  2. super(parent);
  3. this.ch = ch;
  4. //设置感兴趣的事件列表,后续在注册的时候会使用到
  5. this.readInterestOp = readInterestOp;
  6. //设置为非阻塞.. 调用的是jdk SelectableChannel的方法
  7. ch.configureBlocking(false);
  8. }

AbstractChannel 的实例化

  1. protected AbstractChannel(Channel parent) {
  2. this.parent = parent;
  3. this.id = this.newId();
  4. //unsafe的实例化,这里到最后描述
  5. this.unsafe = this.newUnsafe();
  6. //pipeline的实例化,这里到最后描述
  7. this.pipeline = this.newChannelPipeline();
  8. }

初始化channel

反射完成 channel 实例化, 现在进入 init() 的过程.

  1. @Override
  2. void init(Channel channel) throws Exception {
  3. final Map<ChannelOption<?>, Object> options = options0();
  4. synchronized (options) {
  5. setChannelOptions(channel, options, logger);
  6. }
  7. final Map<AttributeKey<?>, Object> attrs = attrs0();
  8. synchronized (attrs) {
  9. for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) {
  10. @SuppressWarnings("unchecked")
  11. AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
  12. channel.attr(key).set(e.getValue());
  13. }
  14. }
  15. //获取pipeline
  16. ChannelPipeline p = channel.pipeline();
  17. final EventLoopGroup currentChildGroup = childGroup;
  18. final ChannelHandler currentChildHandler = childHandler;
  19. final Entry<ChannelOption<?>, Object>[] currentChildOptions;
  20. final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
  21. //填充option
  22. synchronized (childOptions) {
  23. currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
  24. }
  25. synchronized (childAttrs) {
  26. currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
  27. }
  28. //NioServerSocket 初始化的时候会进入到这里来到
  29. p.addLast(new ChannelInitializer<Channel>() {
  30. @Override
  31. public void initChannel(final Channel ch) throws Exception {
  32. final ChannelPipeline pipeline = ch.pipeline();
  33. ChannelHandler handler = config.handler();
  34. if (handler != null) {
  35. pipeline.addLast(handler);
  36. }
  37. ch.eventLoop().execute(new Runnable() {
  38. @Override
  39. public void run() {
  40. //继续添加一个ServerBootstrapAcceptor,用于处理即将到来的链接
  41. pipeline.addLast(new ServerBootstrapAcceptor(
  42. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  43. }
  44. });
  45. }
  46. });
  47. }

如果没有看完全部的,这里可能看的有点蒙B,但是没关系,先看一下,等到看完全部的时候就会得到升华或者是忘记了. 😂
pipeline执行addLast的时候是直接执行的,等到讲Pipeline和ChannelHandlerContext 的时候再来回顾好了.

ServerBootstrap 的bind后续

initAndRegister 的后续中,补充了 NioServerSocketChannel 的初始化,目前已经完成了 init 的过程,现在继续看 register 的过程.

  1. ChannelFuture regFuture = this.config().group().register(channel);
  • this.config() 上文描述的 ServerBootstrapConfig
  • group() 获取的是ServerBootstrapConfig中初始化的 group , 而这个group在server端是NioEventLoopGroup

所以我们继续看 NioEventLoopGroup 的注册逻辑. 发现这个接口的实现有5个,排除调测试的还有4个
image.png

这个时候又会过过头看下 NioEventLoopGroup 的类图,发现 MultithreadEventLoopGroup 是他的👨父类.我们继续往下走..

MultithreadEventLoopGroup的注册

  1. @Override
  2. public ChannelFuture register(Channel channel) {
  3. return next().register(channel);
  4. }

嗯,现在继续看下 next() 是何方神圣.

  1. @Override
  2. public EventLoop next() {
  3. return (EventLoop) super.next();
  4. }

继续看下父类(MultithreadEventExecutorGroup)的实现

具体那个父类,可以继续看UML类图

  1. @Override
  2. public EventExecutor next() {
  3. //很好的chooser,好像在哪里看到过
  4. //确实在NioEventLoopGroup的初始化的时候确认过眼神
  5. //根据children数组的长度选择EventExecutorChooser,如果是2的n次幂使用
  6. //PowerOfTwoEventExecutorChooser 否则使用 GenericEventExecutorChooser(轮询)
  7. return chooser.next();
  8. }
  • 我们可以选择进去看看到底是怎么选择
  • 我们也可以选择不看了.

我们已经知道了 next() ===> EventLoop ===> newChild 实例化的对象 ===> NioEventLoop

NioEventLoop 的register

实例化NioEventLoopGroup的时候稍微介绍过 NioEventLoop , 说了他的实例化很简单,只需要简单的关注他的

  • register和run方法

现在我们就遇到了他的 Registry

😅 结果我们在 NioEventLoop 中没有找到 Registry 方法, 继续看父类,必须父类有自己就有. 果不其然,在 SingleThreadEventLoop 找到了

再次重申UML类图的重要性

  1. @Override
  2. public ChannelFuture register(Channel channel) {
  3. return register(new DefaultChannelPromise(channel, this));
  4. }
  5. @Override
  6. public ChannelFuture register(final ChannelPromise promise) {
  7. ObjectUtil.checkNotNull(promise, "promise");
  8. promise.channel().unsafe().register(this, promise);
  9. return promise;
  10. }

嗯, unsafe() 又是何许人也.

unsafe

经历了几个跳转以后,我们千万别忘了 channel() 就是我们刚开始反射创建的 NioServerSocketChannel

NioServerSocketChannel 初始化的时候,补充了2个注释

  • unsafe的实例化,这里到最后描述
  • pipeline的实例化,这里到最后描述

终于到最后了.
继续到 AbstractChannel 的构造方法中看看是如何实现的.

  1. protected AbstractChannel(Channel parent) {
  2. this.parent = parent;
  3. id = newId();
  4. unsafe = newUnsafe();
  5. pipeline = newChannelPipeline();
  6. }
  7. protected abstract AbstractUnsafe newUnsafe();
  8. protected DefaultChannelPipeline newChannelPipeline() {
  9. return new DefaultChannelPipeline(this);
  10. }

这里的 newUnsafe 是一个抽象方法,看看子类是怎么实现的.
子类稍微有点多,就算把测试排除完,也有好几个. 但是我们这里描述的是 NioServerSocketChannel ,只需要看它的类图上出现了哪些即可,经过精确查找,我们找到了 AbstractNioMessageChannel

  1. @Override
  2. protected AbstractNioUnsafe newUnsafe() {
  3. return new NioMessageUnsafe();
  4. }

NioMessageUnsafe 什么东西,先上类图.
image.png
确实是 Unsafe 的孩子,没有错,继续顺藤摸瓜找到 registry方法 ,类图是可以直接找到该方法的,但是不想搞一个这么大的图了. 沿着 NioMessageUnsafe 最终在 AbstractUnsafe 中找到了,我们继续看下.

  1. @Override
  2. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  3. if (eventLoop == null) {
  4. throw new NullPointerException("eventLoop");
  5. }
  6. if (isRegistered()) {
  7. promise.setFailure(new IllegalStateException("registered to an event loop already"));
  8. return;
  9. }
  10. //只能自己注册自己,不能让别人来自己
  11. if (!isCompatible(eventLoop)) {
  12. promise.setFailure(
  13. new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
  14. return;
  15. }
  16. //这个channel和这个eventLoop绑定起来 一个channel和一个线程绑定起来
  17. AbstractChannel.this.eventLoop = eventLoop;
  18. if (eventLoop.inEventLoop()) {
  19. register0(promise);
  20. } else {
  21. try {
  22. eventLoop.execute(new Runnable() {
  23. @Override
  24. public void run() {
  25. register0(promise);
  26. }
  27. });
  28. } catch (Throwable t) {
  29. logger.warn(
  30. "Force-closing a channel whose registration task was not accepted by an event loop: {}",
  31. AbstractChannel.this, t);
  32. closeForcibly();
  33. closeFuture.setClosed();
  34. safeSetFailure(promise, t);
  35. }
  36. }
  37. }

这个注册方法稍微有点大,我们拆开一点一点看.
先看看参数

  • EventLoop eventLoop 它的实现是SingleThreadEventLoop,但是更具体的应该是 NioEventLoop 具体就不描述了,整个调用链路下来很清晰了.
  • final ChannelPromise promise 看调用链路即可

isCompatible(eventLoop)

compatible 英文翻译兼容, 看下它的实现.

AbstractNioChannel#isCompatible

为啥是它呢,看UML

  1. @Override
  2. protected boolean isCompatible(EventLoop loop) {
  3. return loop instanceof NioEventLoop;
  4. }

既然在这里补充了这么多,那就在补充一下这句

  1. //这个channel和这个eventLoop绑定起来 一个channel和一个线程绑定起来
  2. AbstractChannel.this.eventLoop = eventLoop;

为啥是 AbstractChannel.this.eventLoop ,因为 AbstractUnsafe 是在 AbstractChannel 中的内部类,所以是这种写法
另外

  • 每一个 Channel 都会和一个 EventLoop 进行绑定,就是 NioServerSocketChannel 也不例外,当然 NioSocketChannel 那就更加不例外了. 所以这里的设置是将 eventLoop 给对应的 channel 进行绑定

    eventLoop.inEventLoop()

    也是一个接口,从URL和实现找到它在那个类中, AbstractEventExecutor 可以找到方法
    1. @Override
    2. public boolean inEventLoop() {
    3. return inEventLoop(Thread.currentThread());
    4. }
    继续找, SingleThreadEventExecutor
    1. @Override
    2. public boolean inEventLoop(Thread thread) {
    3. return thread == this.thread;
    4. }
    thread 参数好理解, this.thread 是什么东西,一路上从未遇到过, 再看看它是在哪里 赋值
    使用 thread =SingleThreadEventExecutor 中查找,仅有5处出现.
    image.png
    doStartThread 处是正确赋值的地方. 相比大家应该都知道了..
    image.png
    image.png
    image.png
    一路下来就是这里调用的了,那么基本上可以确定, this.thread == null 了,所以 eventLoop.inEventLoop() 会返回false. 走异步逻辑去进行注册. 而走异步注册逻辑的时候,刚好调用的是上图的 execute ,巧不巧. 😂

除了这种思路是可以说明以外,其实还有一种.

如果现在是一个main方法在跑,那么现在的主流程还是在 main 线程上,那么main线程会等同于 NioEventLoop 中的 thread 吗,想必是不会的.

register0

提交了注册任务到 NioEventLoop 后,什么时候执行,就要看这个线程争不争气了.

  1. private void register0(ChannelPromise promise) {
  2. //去掉了异常和判断的变量,不影响阅读,注释保留
  3. doRegister();
  4. // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
  5. // user may already fire events through the pipeline in the ChannelFutureListener.
  6. pipeline.invokeHandlerAddedIfNeeded();
  7. safeSetSuccess(promise);
  8. pipeline.fireChannelRegistered();
  9. // Only fire a channelActive if the channel has never been registered. This prevents firing
  10. // multiple channel actives if the channel is deregistered and re-registered.
  11. if (isActive()) {
  12. if (firstRegistration) {
  13. pipeline.fireChannelActive();
  14. } else if (config().isAutoRead()) {
  15. // This channel was registered before and autoRead() is set. This means we need to begin read
  16. // again so that we process inbound data.
  17. //
  18. // See https://github.com/netty/netty/issues/4805
  19. beginRead();
  20. }
  21. }
  22. }

这段注册流程涉及了 pipeline , 所以一行一行的看下

  • doRegister 调用子类来进行注册,代码在 AbstractNioChannel 中,比较简单. 唯一需要关注的就是0这个参数,为什么是0呢,注册的时候不应该是 OP_ACCEPT

    1. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
  • invokeHandlerAddedIfNeeded 下一个小节描述.

  • fireChannelRegistered 下下一个小节描述
  • isActive channel是否调用了 bind 的系统调用,很显然到目前为止并未调用,所以是 false .

到这里为主,registry的流程算是真正的完成了. 但是 bind 的流程也就走完了初始化 channel的过程,真实的系统调用 bind 还没有执行呢,所以我们继续往下看,但是真实的看 bind 之前,我们先差个楼,看下 pipeline 的庐山真面目.

Pipeline 🐂

pipeline 英文翻译流水线和Filter有异曲同工的意思。
先上UML类图
image.png

Pipeline的初始化

  1. protected DefaultChannelPipeline(Channel channel) {
  2. this.channel = ObjectUtil.checkNotNull(channel, "channel");
  3. succeededFuture = new SucceededChannelFuture(channel, null);
  4. voidPromise = new VoidChannelPromise(channel, true);
  5. //初始化双向链表的头部和尾部
  6. tail = new TailContext(this);
  7. head = new HeadContext(this);
  8. //双向链表 头部指向尾部,尾部指向头部.
  9. head.next = tail;
  10. tail.prev = head;
  11. }

Pipeline 的初始化很简单,就是在pipeline内部建立一个双向链表.
链表的头部是一个 ChannelHandlerContext
尾部也是一个 ChannelHandlerContext

Pipeline 添加节点

Pipeline 作为一个链表,作为核心的莫过于添加节点和删除节点,那么添加和删除的时候还有哪些额外的操作呢.

  1. @Override
  2. public final ChannelPipeline addLast(ChannelHandler... handlers) {
  3. return addLast(null, handlers);
  4. }
  5. @Override
  6. public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
  7. for (ChannelHandler h: handlers) {
  8. addLast(executor, null, h);
  9. }
  10. return this;
  11. }
  12. //添加节点核心逻辑
  13. @Override
  14. public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
  15. final AbstractChannelHandlerContext newCtx;
  16. synchronized (this) {
  17. //检查handler是否重复
  18. checkMultiplicity(handler);
  19. //建立一个context上下文, 和handler建立绑定关系,具体可以看下一个小节的讲述
  20. //new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
  21. //newContext的就是实例化一个DefaultChannelHandlerContext对象
  22. newCtx = newContext(group, filterName(name, handler), handler);
  23. //将newCtx添加到链表中,具体的操作请看数据结构链表一节.
  24. addLast0(newCtx);
  25. //到这一步为止registered可能是false也是true
  26. //如果是server刚刚初始化的时候,registerred 肯定是false.
  27. //所以将任务添加到待执行任务的尾部(待执行任务也是一个链表,等待channel注册的时候这些context的
  28. //任务会一并触发),并修改context的状态标识位.
  29. if (!registered) {
  30. //设置状态标识位
  31. newCtx.setAddPending();
  32. //填充任务到链表
  33. callHandlerCallbackLater(newCtx, true);
  34. return this;
  35. }
  36. //如果已经注册完成了,
  37. EventExecutor executor = newCtx.executor();
  38. if (!executor.inEventLoop()) {
  39. //设置状态标识位
  40. newCtx.setAddPending();
  41. //执行任务(执行完任务后会修改状态标志为ADD_COMPLETE)
  42. executor.execute(new Runnable() {
  43. @Override
  44. public void run() {
  45. callHandlerAdded0(newCtx);
  46. }
  47. });
  48. return this;
  49. }
  50. }
  51. callHandlerAdded0(newCtx);
  52. return this;
  53. }

这一点是pipeline添加节点的核心内容,那么添加节点的任务是在什么时候触发的呢 ?

其实在服务启动的时候已经有过描述,在 初始化channel 一节 的时候为其添加了一个 handler . 那个时候 pipeline 的结构是这样的

添加前
image.png
添加后
image.png
Context存在一个转换标志位的过程,那么这个过程是在什么时间节点执行的呢

在讲述 Regiseter0 的时候还遗留了2个问题到下下小节讲解,现在来解决第一个问题.

  • invokeHandlerAddedIfNeeded

这个是为了防止用户使用ChannelFutureListener来触发pipeline的一些事件,所以需要提前吧pipeline上节点的Context进行初始化.

  1. final void invokeHandlerAddedIfNeeded() {
  2. assert channel.eventLoop().inEventLoop();
  3. if (firstRegistration) {
  4. firstRegistration = false;
  5. callHandlerAddedForAllHandlers();
  6. }
  7. }

继续调用所有添加的handler

  1. private void callHandlerAddedForAllHandlers() {
  2. final PendingHandlerCallback pendingHandlerCallbackHead;
  3. synchronized (this) {
  4. assert !registered;
  5. // This Channel itself was registered.
  6. registered = true;
  7. //获取链表头部节点
  8. pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
  9. // Null out so it can be GC'ed.
  10. this.pendingHandlerCallbackHead = null;
  11. }
  12. //头部节点的类型是什么呢 PendingHandlerCallback是一个抽象类
  13. //它的实现有PendingHandlerAddedTask
  14. //PendingHandlerRemovedTask
  15. PendingHandlerCallback task = pendingHandlerCallbackHead;
  16. while (task != null) {
  17. //轮询执行任务
  18. task.execute();
  19. task = task.next;
  20. }
  21. }

所以在执行的时候会触发PendingHandlerAddedTask任务,继续看看该任务做了啥

  1. @Override
  2. void execute() {
  3. EventExecutor executor = ctx.executor();
  4. if (executor.inEventLoop()) {
  5. //继续调用
  6. callHandlerAdded0(ctx);
  7. } else {
  8. try {
  9. executor.execute(this);
  10. } catch (RejectedExecutionException e) {
  11. remove0(ctx);
  12. ctx.setRemoved();
  13. }
  14. }
  15. }

added0

  1. private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
  2. //异常处理过滤掉
  3. ctx.setAddComplete();
  4. ctx.handler().handlerAdded(ctx);
  5. }

这里就完成了2件事

  • 设置标志位,表明pipeline上的context节点可以使用了
  • handlerAdded 将Context设置到handler中去,表明了Handler已经可以处理任何事件了.

到目前为止,pipeline已经可以处理即将到来的事件了,这也就表明后续的register也会被pipeline进行处理.

总结

总体来说,Netty中的pipeline是一个责任链模式 (filter chain) , 通过一个双向链表将 ChannelHandlerContext 勾连起来,既然是责任链模式,那我们核心需要关注的就是链的头部,也就是什么时候开展的,如果我们过于沉迷于中间的 handler , 那么就会陷入 Debug 的汪洋大海.

ChannelHandlerContext和ChannelHandler 🏁

将这两者放在一起说是他们存在一个上下文的关系.

  • ChannelHandler 具体处理业务逻辑的地方. 例如读取,写入,出现异常如何处理等等

    • handlerAdded
    • handlerRemoved
    • exceptionCaught

      子类有很多的拓展,这里不一一在描述了

  • ChannelHandlerContext

    • channel
    • executor
    • handler 和哪一个handler绑定

      ChannelHandlerContext 同它的名字一样肯定是和一个 ChannelHandler 进行绑定的,从它的方法也可已看的出来.

这样就形成了一个一对一的关系 , 一个 ChannelHandlerContext 和 ChannelHandler 成一个整体,共同成了 Pipeline 上的一个点.

ChannelHandlerContext

image.png

ChannelHandler

image.png

回顾Pipeline

在经历了 PipelineChannelHandlerContext 后,我们在来回顾一开始遇到的问题

  • pipeline.fireChannelRegistered 触发一个 Channel 注册事件. 且是在 Pipeline 中进行触发.
    1. @Override
    2. public final ChannelPipeline fireChannelRegistered() {
    3. //静态方法 从head头部开始
    4. AbstractChannelHandlerContext.invokeChannelRegistered(head);
    5. return this;
    6. }
    ```java static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { //next.executor ==> 注册channel的NioEventLoop //特别注意这里的next是ChannelHandlerContext实例对象,不是EventLoopGroup实例对象 EventExecutor executor = next.executor(); if (executor.inEventLoop()) {
    1. //成功触发
    2. next.invokeChannelRegistered();
    } else {
    1. executor.execute(new Runnable() {
    2. @Override
    3. public void run() {
    4. next.invokeChannelRegistered();
    5. }
    6. });
    } }
  1. 触发Context中注册事件
  2. ```java
  3. private void invokeChannelRegistered() {
  4. if (invokeHandler()) {
  5. try {
  6. ((ChannelInboundHandler) handler()).channelRegistered(this);
  7. } catch (Throwable t) {
  8. notifyHandlerException(t);
  9. }
  10. } else {
  11. fireChannelRegistered();
  12. }
  13. }

如果handler的状态标识位是添加完成就可以处理

  1. private boolean invokeHandler() {
  2. //初始化的时候会调用CAS,将handlerState设置成ADD_COMPLETE
  3. int handlerState = this.handlerState;
  4. //这里跳过order,因为ordered一直都是true
  5. return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
  6. }

标志位的设置是在注册触发前执行的. 所以这里必然是ADD_COMPLETE
使用Context对应的Handler处理注册逻辑. 这里的Context 是Head

  1. @Override
  2. public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
  3. invokeHandlerAddedIfNeeded(); //这里的逻辑上文已经描述过
  4. ctx.fireChannelRegistered();//继续触发下一次的注册逻辑.
  5. }

继续执行下一次的逻辑

  1. @Override
  2. public ChannelHandlerContext fireChannelRegistered() {
  3. //findContextInbound() ,上面说过,context是一个链表,这里就是循环遍历链表获取下一个Context
  4. invokeChannelRegistered(findContextInbound());
  5. return this;
  6. }
  1. private AbstractChannelHandlerContext findContextInbound() {
  2. AbstractChannelHandlerContext ctx = this;
  3. do {
  4. ctx = ctx.next;
  5. } while (!ctx.inbound);
  6. return ctx;
  7. }

这里就完成链一个环的构建。如果需要添加一个新的handler,直接在 Pipeline 上进行处理即可.

故事说到这里就完成 pipeline.fireChannelRegistered , 那问题来了,我们在ServerBootStrap中设置的handler是什么时候填充上去的呢. 别急我们继续往下走。

bind 后续

至此,我们已经完成了 Channel 的初始化, 已经selector的注册操作,虽然注册的是 0 ,并不是我们想要的 OP_ACCEPT , 但好说也是注册上了。 顺道看了 Pipeline 的数据结构以及它是如何初始化的. 同时也了解了 ChannelHandlerContextHandlerContext ,这里我们继续了解 bind 的后续,我们将一步一步走向 Socket 是如何执行 bind 这个系统调用的.

  1. private ChannelFuture doBind(final SocketAddress localAddress) {
  2. //异常部分就跳过了.
  3. final ChannelFuture regFuture = initAndRegister();
  4. final Channel channel = regFuture.channel();
  5. if (regFuture.isDone()) {
  6. ChannelPromise promise = channel.newPromise();
  7. doBind0(regFuture, channel, localAddress, promise);
  8. return promise;
  9. } else {
  10. // Registration future is almost always fulfilled already, but just in case it's not.
  11. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
  12. regFuture.addListener(new ChannelFutureListener() {
  13. @Override
  14. public void operationComplete(ChannelFuture future) throws Exception {
  15. promise.registered();
  16. doBind0(regFuture, channel, localAddress, promise);
  17. }
  18. });
  19. return promise;
  20. }
  21. }

上边已经讲完了 initAndRegister ,并且顺便说明了 Register 是在新起的 NioEventLoop 中执行的,那么到这里的时候是否已经执行完了,我们不得而知,所以用了2个if分支来进行处理

  • 如果回调说明 isDone, 那么直接主线程执行 doBind0()
  • 如果此时尚未完成注册,那么在注册完成回调的时候执行 doBind0()

不论是否完成,我们继续走向 doBind0()

  1. private static void doBind0(
  2. final ChannelFuture regFuture, final Channel channel,
  3. final SocketAddress localAddress, final ChannelPromise promise) {
  4. // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
  5. // the pipeline in its channelRegistered() implementation.
  6. channel.eventLoop().execute(new Runnable() {
  7. @Override
  8. public void run() {
  9. if (regFuture.isSuccess()) {
  10. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  11. } else {
  12. promise.setFailure(regFuture.cause());
  13. }
  14. }
  15. });
  16. }

channel 是 NioServerSocketChannel , 在AbstrctChannel中找到该方法

  1. @Override
  2. public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
  3. return pipeline.bind(localAddress, promise);
  4. }

由pipeline,触发bind事件

  1. @Override
  2. public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
  3. return tail.bind(localAddress, promise);
  4. }

从尾部的上下文开始一路进行 bind , 可以大胆推测它的bind逻辑是否和 register 时是否一致。

  1. @Override
  2. public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
  3. //这些都是老朋友了findContextOutbound,executor等等
  4. final AbstractChannelHandlerContext next = findContextOutbound();
  5. EventExecutor executor = next.executor();
  6. if (executor.inEventLoop()) {
  7. next.invokeBind(localAddress, promise);
  8. } else {
  9. safeExecute(executor, new Runnable() {
  10. @Override
  11. public void run() {
  12. //肯定会走这里的逻辑
  13. next.invokeBind(localAddress, promise);
  14. }
  15. }, promise, null);
  16. }
  17. return promise;
  18. }

继续执行bind逻辑

  1. private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
  2. if (invokeHandler()) {
  3. try {
  4. ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
  5. } catch (Throwable t) {
  6. notifyOutboundHandlerException(t, promise);
  7. }
  8. } else {
  9. bind(localAddress, promise);
  10. }
  11. }

我们的推测和代码逻辑是一致的,bind的过程和register是一致的,也是不断的调用context,然后使用对应的handler触发业务逻辑,然后在获取上一个context,如此反复.

既然如此,那么bind的底在哪里了.
既然 Pipeline 是一个双向链表,那么最终会到底链表的头部. Head 处,那我们支持看 Head 处的bind.

  1. @Override
  2. public void bind(
  3. ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
  4. throws Exception {
  5. //unsafe==channel.unsafe
  6. unsafe.bind(localAddress, promise);
  7. }

嗯,又是和register一样,还是来到了 unsafe ,最终我们在 register 的下一个方法找到了bind方法

  1. @Override
  2. public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
  3. assertEventLoop();
  4. //现在肯定是false,还没有调用过bind呢
  5. boolean wasActive = isActive();
  6. try {
  7. doBind(localAddress);
  8. } catch (Throwable t) {
  9. safeSetFailure(promise, t);
  10. closeIfClosed();
  11. return;
  12. }
  13. //现在肯定是true了
  14. if (!wasActive && isActive()) {
  15. invokeLater(new Runnable() {
  16. @Override
  17. public void run() {
  18. //触发channel active事件
  19. pipeline.fireChannelActive();
  20. }
  21. });
  22. }
  23. safeSetSuccess(promise);
  24. }

doBind(**) 是一个抽象方法,它的实现在 NioServerSocketChannel

  1. @Override
  2. protected void doBind(SocketAddress localAddress) throws Exception {
  3. if (PlatformDependent.javaVersion() >= 7) {
  4. javaChannel().bind(localAddress, config.getBacklog());
  5. } else {
  6. javaChannel().socket().bind(localAddress, config.getBacklog());
  7. }
  8. }

这里就是直接调用jdk的socket绑定了,nice,重要看到bind的系统调用了.
到这里为止,register和bind算是完成了

既然bind完成了,那么我们继续向下走,把方法走完本次分析就结束了.

  1. pipeline.fireChannelActive();

bind结束后,会触发一个channel active事件,这里肯定也是pipeline的事件传播. 但是因为我们的selector 注册的还是0,所以继续看看.

  1. @Override
  2. public final ChannelPipeline fireChannelActive() {
  3. AbstractChannelHandlerContext.invokeChannelActive(head);
  4. return this;
  5. }

active事件是从头部开始的,看下它的handler实现是啥子.

  1. @Override
  2. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  3. ctx.fireChannelActive();
  4. readIfIsAutoRead();
  5. }

嗯,head的实现比较简单

  • fireChannelActive 继续传播active事件
  • readIfIsAutoRead 读取?继续看看

    1. private void readIfIsAutoRead() {
    2. //channel.config 牵扯的太多了,本次分析跳过config,这里返回的true
    3. if (channel.config().isAutoRead()) {
    4. channel.read();
    5. }
    6. }

    😣 由于牵扯的太多了,慢慢的分析下来完全超出了我的预料,所以这里先不谈 channel.config
    尽管已经写了这么,但是在注册的流程中,不要忘了channel===> NioServerSocketChannel, 感觉去看看实现吧

    1. @Override
    2. public Channel read() {
    3. pipeline.read();
    4. return this;
    5. }

    我擦,又进入了 pipeline ,继续

    1. @Override
    2. public final ChannelPipeline read() {
    3. tail.read();
    4. return this;
    5. }

    从尾部开始读取的,直接到链表的最后head看看

    1. @Override
    2. public void read(ChannelHandlerContext ctx) {
    3. unsafe.beginRead();
    4. }

    没错,又是unsafe,涉及到读写等IO操作到好像都是它来干的.

    1. @Override
    2. public final void beginRead() {
    3. assertEventLoop();
    4. if (!isActive()) {
    5. return;
    6. }
    7. try {
    8. doBeginRead();
    9. } catch (final Exception e) {
    10. invokeLater(new Runnable() {
    11. @Override
    12. public void run() {
    13. pipeline.fireExceptionCaught(e);
    14. }
    15. });
    16. close(voidPromise());
    17. }
    18. }

    继续瞅瞅子类AbstractNioChannel的实现

    1. @Override
    2. protected void doBeginRead() throws Exception {
    3. // Channel.read() or ChannelHandlerContext.read() was called
    4. final SelectionKey selectionKey = this.selectionKey;
    5. if (!selectionKey.isValid()) {
    6. return;
    7. }
    8. readPending = true;
    9. //这里的interestOps 就是我们一开始设置的0
    10. final int interestOps = selectionKey.interestOps();
    11. if ((interestOps & readInterestOp) == 0) {
    12. selectionKey.interestOps(interestOps | readInterestOp);
    13. }
    14. }

    😂 终于出来了 readInterestOp 就是反射实例话 Channel 时传递进去的数据呀.

    1. public NioServerSocketChannel(ServerSocketChannel channel) {
    2. super(null, channel, SelectionKey.OP_ACCEPT);
    3. config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    4. }

    至此,本次分析终于完成了,最后的设置Promise就很简单了. 大家可以自己看看.

总结 🀄️

Netty的启动流程写出来比我想象中的要难一点,整个流程比较长,涉及的类也较多,不可能面面俱到.

从一个高的层面去看它的实现,不要一下子就陷入Debug的汪洋大海中去.

耐心一点,慢慢看总会明白的