要求 😯
最少能看懂 netty-example 的 echo范例
示例 😄
public void bind(Integer port) {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup work = new NioEventLoopGroup(1);try {ServerBootstrap serverBootstrap = new ServerBootstrap();final ServerTransactionHandler serverTransactionHandler = new ServerTransactionHandler(this.applicationContext);serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new MessageDecoder(serialization)).addLast(new MessageEncoder(serialization)).addLast(new IdleStateHandler(20, 40, 60)).addLast(serverTransactionHandler);}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);channelFuture = serverBootstrap.bind(port).sync();} catch (Exception e) {e.printStackTrace();}}
讲述过程 😂
- NioEventLoopGroup 初始化
- 类图
- NioEventLoop
- ServerBootstrap 启动
- channel
- handler 和 childHandler
- bind (重点)
- ChannelPipeline (稍微提及)
- ChannelHandlerContext (稍微提及)
- NioServerSocketChannel 初始化
- Pipeline和ChannelHandlerContext的初始化
ServerBootstrap 和 NioServerSocketChannel 不是完全按照这里描述的顺序出现,还是按照Netty启动的顺序出现. 这里这是单独罗列. 😂
能收获到什么 📌
希望看完后,能理解他们之间的关系. 😂
视频版 🌲
文字版 👷
NioEventLoopGroup的初始化

NioEventLoopGroup 构造方法
public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null);}public NioEventLoopGroup(int nThreads, Executor executor) {this(nThreads, executor, SelectorProvider.provider());}public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {//看UMLsuper(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}
MultithreadEventLoopGroup 构造方法
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}
MultithreadEventExecutorGroup 构造方法
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);}protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}
初始化核心逻辑
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {//跳过校验//这里只是一个数组. 还没有实例化填充数据//children 数组对象 EventExecutorchildren = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {//交给子类去实例化,子类可以自由选择,可以是NIO,可以是Epoll也可以是Kqueue等等children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {//初始化失败以后的善后工作... 先跳过}}//根据children数组的长度选择EventExecutorChooser,如果是2的n次幂使用//PowerOfTwoEventExecutorChooser 否则使用 GenericEventExecutorChooser(轮询)chooser = chooserFactory.newChooser(children);//终止的监听器final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};//设置上for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}
MultithreadEventExecutorGroup#newChild
看下 NioEventLoopGroup 的实现
@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}
NioEventLoop

初始化比较简单,具体可以看代码, UML 的关系相对比较重要,关注他的核心方法
- register
- SingleThreadEventExecutor#run
总结
NioEventLoopGroup的初始化比较简单,主要是了解一个Group中存在几个NioEventLoop,数量在初始化的时候指定,默认是CPU核数的2倍
ServerBootstrap 👀
ServerBootstrap serverBootstrap = new ServerBootstrap();
ServerBootstrap 是一个启动类,非线程安全,但是启动仅一次,也没啥关系 (目前还没看到过需要启动多次的情况)
group,handler等设置的方法可以看下代码,简单就直接跳过了
channel
也是一个设置方法,但是设计到一个Channel 的factory
public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");} else {return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));}}
代码很简单,就是设置一个 ReflectiveChannelFactory 到字段 channelFactory 上ReflectiveChannelFactory 何许人也,一个channel的工厂类,生成模板的方式是反射. 具体的类型需要外网传递进来,例如这里传递进来的就是 NioServerSocketChannel.class . 后文讲述 initAndRegister 还会描述到。
childHandler
设置childHandler,代码也比较简单,一个普通的set方法,这里提一下他的特殊性.
- child是什么?
- 存在child,那么parent又是什么?
child和parent是相对而已,这里的parent可以理解是ServerSocketChannel. child则是SocketChannel. 所以 childHandler 是给后续链接上来的 Channel 使用的 handler .. 理解了这一层, option 和 childOption 就不在话下了
我们继续往下走..
bind 📖
绑定端口到socket上,在NIO中讲述的时候比较简单,获取 ServerSocketChannel ,然后调用 Socket 的 bind 就可以完成一个NIO的启动了. 但是在Netty中
ServerSocketChannel在哪里,只有一个NioServerSocketChannel.class,还是Netty包的.Socket的bind到底是在哪里调用的.
public ChannelFuture bind(int inetPort) {return this.bind(new InetSocketAddress(inetPort));}public ChannelFuture bind(SocketAddress localAddress) {this.validate();if (localAddress == null) {throw new NullPointerException("localAddress");} else {return this.doBind(localAddress);}}
doBind 🐎
private ChannelFuture doBind(final SocketAddress localAddress) {//initAndRegister 字面意思,好像逻辑都在这里边了final ChannelFuture regFuture = this.initAndRegister();final Channel channel = regFuture.channel();//如果初始化和注册存在异常,说明注册失败.if (regFuture.cause() != null) {return regFuture;} else if (regFuture.isDone()) {//注册完成了那就去执行bind操作ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {//还在处理当中,那就等注册完成的时候回调这里完成doBind0final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause);} else {promise.registered();AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}
initAndRegister 💥
final ChannelFuture initAndRegister() {Channel channel = null;try {//channelFactory 就是上文提到的 ReflectiveChannelFactory//生成的方式很简单,就是根据传递到底class进行一次反射调用channel = this.channelFactory.newChannel();this.init(channel);} catch (Throwable var3) {if (channel != null) {channel.unsafe().closeForcibly();return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);}return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);}//反射成功以后,执行注册操作ChannelFuture regFuture = this.config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture;}
🦑 由于这里涉及到 NioServerSocketChannel 的初始化,这里先打断,看完他的反射初始化逻辑
NioServerSocketChannel 的初始化🔨
ReflectiveChannelFactory的反射
public T newChannel() {try {return (Channel)this.clazz.getConstructor().newInstance();} catch (Throwable var2) {throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);}}

public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {try {return provider.openServerSocketChannel();} catch (IOException var2) {throw new ChannelException("Failed to open a server socket.", var2);}}public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {super((Channel)null, channel, 16);this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket());}
AbstractNioMessageChannel 的实例化
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);}
AbstractNioChannel 的实例化
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;//设置感兴趣的事件列表,后续在注册的时候会使用到this.readInterestOp = readInterestOp;//设置为非阻塞.. 调用的是jdk SelectableChannel的方法ch.configureBlocking(false);}
AbstractChannel 的实例化
protected AbstractChannel(Channel parent) {this.parent = parent;this.id = this.newId();//unsafe的实例化,这里到最后描述this.unsafe = this.newUnsafe();//pipeline的实例化,这里到最后描述this.pipeline = this.newChannelPipeline();}
初始化channel
反射完成 channel 实例化, 现在进入 init() 的过程.
@Overridevoid init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}//获取pipelineChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;//填充optionsynchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}//NioServerSocket 初始化的时候会进入到这里来到p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {//继续添加一个ServerBootstrapAcceptor,用于处理即将到来的链接pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}
如果没有看完全部的,这里可能看的有点蒙B,但是没关系,先看一下,等到看完全部的时候就会得到升华或者是忘记了. 😂
pipeline执行addLast的时候是直接执行的,等到讲Pipeline和ChannelHandlerContext 的时候再来回顾好了.
ServerBootstrap 的bind后续
在 initAndRegister 的后续中,补充了 NioServerSocketChannel 的初始化,目前已经完成了 init 的过程,现在继续看 register 的过程.
ChannelFuture regFuture = this.config().group().register(channel);
- this.config() 上文描述的 ServerBootstrapConfig
- group() 获取的是ServerBootstrapConfig中初始化的
group, 而这个group在server端是NioEventLoopGroup
所以我们继续看 NioEventLoopGroup 的注册逻辑. 发现这个接口的实现有5个,排除调测试的还有4个
这个时候又会过过头看下 NioEventLoopGroup 的类图,发现 MultithreadEventLoopGroup 是他的👨父类.我们继续往下走..
MultithreadEventLoopGroup的注册
@Overridepublic ChannelFuture register(Channel channel) {return next().register(channel);}
嗯,现在继续看下 next() 是何方神圣.
@Overridepublic EventLoop next() {return (EventLoop) super.next();}
继续看下父类(MultithreadEventExecutorGroup)的实现
具体那个父类,可以继续看UML类图
@Overridepublic EventExecutor next() {//很好的chooser,好像在哪里看到过//确实在NioEventLoopGroup的初始化的时候确认过眼神//根据children数组的长度选择EventExecutorChooser,如果是2的n次幂使用//PowerOfTwoEventExecutorChooser 否则使用 GenericEventExecutorChooser(轮询)return chooser.next();}
- 我们可以选择进去看看到底是怎么选择
- 我们也可以选择不看了.
我们已经知道了 next() ===> EventLoop ===> newChild 实例化的对象 ===> NioEventLoop
NioEventLoop 的register
实例化NioEventLoopGroup的时候稍微介绍过 NioEventLoop , 说了他的实例化很简单,只需要简单的关注他的
- register和run方法
现在我们就遇到了他的 Registry
😅 结果我们在 NioEventLoop 中没有找到 Registry 方法, 继续看父类,必须父类有自己就有. 果不其然,在 SingleThreadEventLoop 找到了
再次重申UML类图的重要性
@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}
嗯, unsafe() 又是何许人也.
unsafe
经历了几个跳转以后,我们千万别忘了 channel() 就是我们刚开始反射创建的 NioServerSocketChannel
在 NioServerSocketChannel 初始化的时候,补充了2个注释
- unsafe的实例化,这里到最后描述
- pipeline的实例化,这里到最后描述
终于到最后了.
继续到 AbstractChannel 的构造方法中看看是如何实现的.
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();}protected abstract AbstractUnsafe newUnsafe();protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}
这里的 newUnsafe 是一个抽象方法,看看子类是怎么实现的.
子类稍微有点多,就算把测试排除完,也有好几个. 但是我们这里描述的是 NioServerSocketChannel ,只需要看它的类图上出现了哪些即可,经过精确查找,我们找到了 AbstractNioMessageChannel
@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();}
NioMessageUnsafe 什么东西,先上类图. 
确实是 Unsafe 的孩子,没有错,继续顺藤摸瓜找到 registry方法 ,类图是可以直接找到该方法的,但是不想搞一个这么大的图了. 沿着 NioMessageUnsafe 最终在 AbstractUnsafe 中找到了,我们继续看下.
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}//只能自己注册自己,不能让别人来自己if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}//这个channel和这个eventLoop绑定起来 一个channel和一个线程绑定起来AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}
这个注册方法稍微有点大,我们拆开一点一点看.
先看看参数
- EventLoop eventLoop 它的实现是SingleThreadEventLoop,但是更具体的应该是
NioEventLoop具体就不描述了,整个调用链路下来很清晰了. - final ChannelPromise promise 看调用链路即可
isCompatible(eventLoop)
AbstractNioChannel#isCompatible
为啥是它呢,看UML
@Overrideprotected boolean isCompatible(EventLoop loop) {return loop instanceof NioEventLoop;}
既然在这里补充了这么多,那就在补充一下这句
//这个channel和这个eventLoop绑定起来 一个channel和一个线程绑定起来AbstractChannel.this.eventLoop = eventLoop;
为啥是 AbstractChannel.this.eventLoop ,因为 AbstractUnsafe 是在 AbstractChannel 中的内部类,所以是这种写法
另外
- 每一个
Channel都会和一个EventLoop进行绑定,就是NioServerSocketChannel也不例外,当然NioSocketChannel那就更加不例外了. 所以这里的设置是将 eventLoop 给对应的 channel 进行绑定eventLoop.inEventLoop()
也是一个接口,从URL和实现找到它在那个类中,AbstractEventExecutor可以找到方法
继续找,@Overridepublic boolean inEventLoop() {return inEventLoop(Thread.currentThread());}
SingleThreadEventExecutor@Overridepublic boolean inEventLoop(Thread thread) {return thread == this.thread;}
thread参数好理解,this.thread是什么东西,一路上从未遇到过, 再看看它是在哪里赋值的
使用thread =在SingleThreadEventExecutor中查找,仅有5处出现.
doStartThread处是正确赋值的地方. 相比大家应该都知道了..


一路下来就是这里调用的了,那么基本上可以确定,this.thread == null了,所以eventLoop.inEventLoop()会返回false. 走异步逻辑去进行注册. 而走异步注册逻辑的时候,刚好调用的是上图的execute,巧不巧. 😂
除了这种思路是可以说明以外,其实还有一种.
如果现在是一个main方法在跑,那么现在的主流程还是在 main 线程上,那么main线程会等同于 NioEventLoop 中的 thread 吗,想必是不会的.
register0
提交了注册任务到 NioEventLoop 后,什么时候执行,就要看这个线程争不争气了.
private void register0(ChannelPromise promise) {//去掉了异常和判断的变量,不影响阅读,注释保留doRegister();// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}}
这段注册流程涉及了 pipeline , 所以一行一行的看下
doRegister 调用子类来进行注册,代码在
AbstractNioChannel中,比较简单. 唯一需要关注的就是0这个参数,为什么是0呢,注册的时候不应该是OP_ACCEPT吗selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
invokeHandlerAddedIfNeeded 下一个小节描述.
- fireChannelRegistered 下下一个小节描述
- isActive channel是否调用了
bind的系统调用,很显然到目前为止并未调用,所以是false.
到这里为主,registry的流程算是真正的完成了. 但是 bind 的流程也就走完了初始化 channel的过程,真实的系统调用 bind 还没有执行呢,所以我们继续往下看,但是真实的看 bind 之前,我们先差个楼,看下 pipeline 的庐山真面目.
Pipeline 🐂
pipeline 英文翻译流水线和Filter有异曲同工的意思。
先上UML类图
Pipeline的初始化
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise = new VoidChannelPromise(channel, true);//初始化双向链表的头部和尾部tail = new TailContext(this);head = new HeadContext(this);//双向链表 头部指向尾部,尾部指向头部.head.next = tail;tail.prev = head;}
Pipeline 的初始化很简单,就是在pipeline内部建立一个双向链表.
链表的头部是一个 ChannelHandlerContext
尾部也是一个 ChannelHandlerContext
Pipeline 添加节点
Pipeline 作为一个链表,作为核心的莫过于添加节点和删除节点,那么添加和删除的时候还有哪些额外的操作呢.
@Overridepublic final ChannelPipeline addLast(ChannelHandler... handlers) {return addLast(null, handlers);}@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {for (ChannelHandler h: handlers) {addLast(executor, null, h);}return this;}//添加节点核心逻辑@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {//检查handler是否重复checkMultiplicity(handler);//建立一个context上下文, 和handler建立绑定关系,具体可以看下一个小节的讲述//new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);//newContext的就是实例化一个DefaultChannelHandlerContext对象newCtx = newContext(group, filterName(name, handler), handler);//将newCtx添加到链表中,具体的操作请看数据结构链表一节.addLast0(newCtx);//到这一步为止registered可能是false也是true//如果是server刚刚初始化的时候,registerred 肯定是false.//所以将任务添加到待执行任务的尾部(待执行任务也是一个链表,等待channel注册的时候这些context的//任务会一并触发),并修改context的状态标识位.if (!registered) {//设置状态标识位newCtx.setAddPending();//填充任务到链表callHandlerCallbackLater(newCtx, true);return this;}//如果已经注册完成了,EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {//设置状态标识位newCtx.setAddPending();//执行任务(执行完任务后会修改状态标志为ADD_COMPLETE)executor.execute(new Runnable() {@Overridepublic void run() {callHandlerAdded0(newCtx);}});return this;}}callHandlerAdded0(newCtx);return this;}
这一点是pipeline添加节点的核心内容,那么添加节点的任务是在什么时候触发的呢 ?
其实在服务启动的时候已经有过描述,在 初始化channel 一节 的时候为其添加了一个 handler . 那个时候 pipeline 的结构是这样的
添加前
添加后
Context存在一个转换标志位的过程,那么这个过程是在什么时间节点执行的呢
在讲述 Regiseter0 的时候还遗留了2个问题到下下小节讲解,现在来解决第一个问题.
- invokeHandlerAddedIfNeeded
这个是为了防止用户使用ChannelFutureListener来触发pipeline的一些事件,所以需要提前吧pipeline上节点的Context进行初始化.
final void invokeHandlerAddedIfNeeded() {assert channel.eventLoop().inEventLoop();if (firstRegistration) {firstRegistration = false;callHandlerAddedForAllHandlers();}}
继续调用所有添加的handler
private void callHandlerAddedForAllHandlers() {final PendingHandlerCallback pendingHandlerCallbackHead;synchronized (this) {assert !registered;// This Channel itself was registered.registered = true;//获取链表头部节点pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;// Null out so it can be GC'ed.this.pendingHandlerCallbackHead = null;}//头部节点的类型是什么呢 PendingHandlerCallback是一个抽象类//它的实现有PendingHandlerAddedTask//PendingHandlerRemovedTaskPendingHandlerCallback task = pendingHandlerCallbackHead;while (task != null) {//轮询执行任务task.execute();task = task.next;}}
所以在执行的时候会触发PendingHandlerAddedTask任务,继续看看该任务做了啥
@Overridevoid execute() {EventExecutor executor = ctx.executor();if (executor.inEventLoop()) {//继续调用callHandlerAdded0(ctx);} else {try {executor.execute(this);} catch (RejectedExecutionException e) {remove0(ctx);ctx.setRemoved();}}}
added0
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {//异常处理过滤掉ctx.setAddComplete();ctx.handler().handlerAdded(ctx);}
这里就完成了2件事
- 设置标志位,表明pipeline上的context节点可以使用了
- handlerAdded 将Context设置到handler中去,表明了Handler已经可以处理任何事件了.
到目前为止,pipeline已经可以处理即将到来的事件了,这也就表明后续的register也会被pipeline进行处理.
总结
总体来说,Netty中的pipeline是一个责任链模式 (filter chain) , 通过一个双向链表将 ChannelHandlerContext 勾连起来,既然是责任链模式,那我们核心需要关注的就是链的头部,也就是什么时候开展的,如果我们过于沉迷于中间的 handler , 那么就会陷入 Debug 的汪洋大海.
ChannelHandlerContext和ChannelHandler 🏁
将这两者放在一起说是他们存在一个上下文的关系.
ChannelHandler 具体处理业务逻辑的地方. 例如读取,写入,出现异常如何处理等等
- handlerAdded
- handlerRemoved
- exceptionCaught
子类有很多的拓展,这里不一一在描述了
ChannelHandlerContext
- channel
- executor
- handler 和哪一个handler绑定
ChannelHandlerContext 同它的名字一样肯定是和一个 ChannelHandler 进行绑定的,从它的方法也可已看的出来.
这样就形成了一个一对一的关系 , 一个 ChannelHandlerContext 和 ChannelHandler 成一个整体,共同成了 Pipeline 上的一个点.
ChannelHandlerContext
ChannelHandler
回顾Pipeline
在经历了 Pipeline 和 ChannelHandlerContext 后,我们在来回顾一开始遇到的问题
- pipeline.fireChannelRegistered 触发一个
Channel注册事件. 且是在Pipeline中进行触发.
```java static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { //next.executor ==> 注册channel的NioEventLoop //特别注意这里的next是ChannelHandlerContext实例对象,不是EventLoopGroup实例对象 EventExecutor executor = next.executor(); if (executor.inEventLoop()) {@Overridepublic final ChannelPipeline fireChannelRegistered() {//静态方法 从head头部开始AbstractChannelHandlerContext.invokeChannelRegistered(head);return this;}
} else {//成功触发next.invokeChannelRegistered();
} }executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRegistered();}});
触发Context中注册事件```javaprivate void invokeChannelRegistered() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRegistered(this);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRegistered();}}
如果handler的状态标识位是添加完成就可以处理
private boolean invokeHandler() {//初始化的时候会调用CAS,将handlerState设置成ADD_COMPLETEint handlerState = this.handlerState;//这里跳过order,因为ordered一直都是truereturn handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);}
标志位的设置是在注册触发前执行的. 所以这里必然是ADD_COMPLETE
使用Context对应的Handler处理注册逻辑. 这里的Context 是Head
@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {invokeHandlerAddedIfNeeded(); //这里的逻辑上文已经描述过ctx.fireChannelRegistered();//继续触发下一次的注册逻辑.}
继续执行下一次的逻辑
@Overridepublic ChannelHandlerContext fireChannelRegistered() {//findContextInbound() ,上面说过,context是一个链表,这里就是循环遍历链表获取下一个ContextinvokeChannelRegistered(findContextInbound());return this;}
private AbstractChannelHandlerContext findContextInbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.next;} while (!ctx.inbound);return ctx;}
这里就完成链一个环的构建。如果需要添加一个新的handler,直接在 Pipeline 上进行处理即可.
故事说到这里就完成 pipeline.fireChannelRegistered , 那问题来了,我们在ServerBootStrap中设置的handler是什么时候填充上去的呢. 别急我们继续往下走。
bind 后续
至此,我们已经完成了 Channel 的初始化, 已经selector的注册操作,虽然注册的是 0 ,并不是我们想要的 OP_ACCEPT , 但好说也是注册上了。 顺道看了 Pipeline 的数据结构以及它是如何初始化的. 同时也了解了 ChannelHandlerContext 和 HandlerContext ,这里我们继续了解 bind 的后续,我们将一步一步走向 Socket 是如何执行 bind 这个系统调用的.
private ChannelFuture doBind(final SocketAddress localAddress) {//异常部分就跳过了.final ChannelFuture regFuture = initAndRegister();final Channel channel = regFuture.channel();if (regFuture.isDone()) {ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {promise.registered();doBind0(regFuture, channel, localAddress, promise);}});return promise;}}
上边已经讲完了 initAndRegister ,并且顺便说明了 Register 是在新起的 NioEventLoop 中执行的,那么到这里的时候是否已经执行完了,我们不得而知,所以用了2个if分支来进行处理
- 如果回调说明 isDone, 那么直接主线程执行
doBind0() - 如果此时尚未完成注册,那么在注册完成回调的时候执行
doBind0()
不论是否完成,我们继续走向 doBind0()
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}
channel 是 NioServerSocketChannel , 在AbstrctChannel中找到该方法
@Overridepublic ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}
由pipeline,触发bind事件
@Overridepublic final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}
从尾部的上下文开始一路进行 bind , 可以大胆推测它的bind逻辑是否和 register 时是否一致。
@Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {//这些都是老朋友了findContextOutbound,executor等等final AbstractChannelHandlerContext next = findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeBind(localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {//肯定会走这里的逻辑next.invokeBind(localAddress, promise);}}, promise, null);}return promise;}
继续执行bind逻辑
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {bind(localAddress, promise);}}
我们的推测和代码逻辑是一致的,bind的过程和register是一致的,也是不断的调用context,然后使用对应的handler触发业务逻辑,然后在获取上一个context,如此反复.
既然如此,那么bind的底在哪里了.
既然 Pipeline 是一个双向链表,那么最终会到底链表的头部. Head 处,那我们支持看 Head 处的bind.
@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)throws Exception {//unsafe==channel.unsafeunsafe.bind(localAddress, promise);}
嗯,又是和register一样,还是来到了 unsafe ,最终我们在 register 的下一个方法找到了bind方法
@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();//现在肯定是false,还没有调用过bind呢boolean wasActive = isActive();try {doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}//现在肯定是true了if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {//触发channel active事件pipeline.fireChannelActive();}});}safeSetSuccess(promise);}
doBind(**) 是一个抽象方法,它的实现在 NioServerSocketChannel 中
@Overrideprotected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}
这里就是直接调用jdk的socket绑定了,nice,重要看到bind的系统调用了.
到这里为止,register和bind算是完成了
既然bind完成了,那么我们继续向下走,把方法走完本次分析就结束了.
pipeline.fireChannelActive();
bind结束后,会触发一个channel active事件,这里肯定也是pipeline的事件传播. 但是因为我们的selector 注册的还是0,所以继续看看.
@Overridepublic final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;}
active事件是从头部开始的,看下它的handler实现是啥子.
@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();readIfIsAutoRead();}
嗯,head的实现比较简单
- fireChannelActive 继续传播active事件
readIfIsAutoRead 读取?继续看看
private void readIfIsAutoRead() {//channel.config 牵扯的太多了,本次分析跳过config,这里返回的trueif (channel.config().isAutoRead()) {channel.read();}}
😣 由于牵扯的太多了,慢慢的分析下来完全超出了我的预料,所以这里先不谈
channel.config了
尽管已经写了这么,但是在注册的流程中,不要忘了channel===> NioServerSocketChannel, 感觉去看看实现吧@Overridepublic Channel read() {pipeline.read();return this;}
我擦,又进入了
pipeline,继续@Overridepublic final ChannelPipeline read() {tail.read();return this;}
从尾部开始读取的,直接到链表的最后head看看
@Overridepublic void read(ChannelHandlerContext ctx) {unsafe.beginRead();}
没错,又是unsafe,涉及到读写等IO操作到好像都是它来干的.
@Overridepublic final void beginRead() {assertEventLoop();if (!isActive()) {return;}try {doBeginRead();} catch (final Exception e) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireExceptionCaught(e);}});close(voidPromise());}}
继续瞅瞅子类AbstractNioChannel的实现
@Overrideprotected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;//这里的interestOps 就是我们一开始设置的0final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {selectionKey.interestOps(interestOps | readInterestOp);}}
😂 终于出来了
readInterestOp就是反射实例话Channel时传递进去的数据呀.public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
至此,本次分析终于完成了,最后的设置Promise就很简单了. 大家可以自己看看.
总结 🀄️
Netty的启动流程写出来比我想象中的要难一点,整个流程比较长,涉及的类也较多,不可能面面俱到.
从一个高的层面去看它的实现,不要一下子就陷入Debug的汪洋大海中去.
耐心一点,慢慢看总会明白的
