要求 😯
最少能看懂 netty-example
的 echo范例
示例 😄
public void bind(Integer port) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup(1);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
final ServerTransactionHandler serverTransactionHandler = new ServerTransactionHandler(this.applicationContext);
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new MessageDecoder(serialization))
.addLast(new MessageEncoder(serialization))
.addLast(new IdleStateHandler(20, 40, 60))
.addLast(serverTransactionHandler);
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
channelFuture = serverBootstrap.bind(port).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
讲述过程 😂
- NioEventLoopGroup 初始化
- 类图
- NioEventLoop
- ServerBootstrap 启动
- channel
- handler 和 childHandler
- bind (重点)
- ChannelPipeline (稍微提及)
- ChannelHandlerContext (稍微提及)
- NioServerSocketChannel 初始化
- Pipeline和ChannelHandlerContext的初始化
ServerBootstrap 和 NioServerSocketChannel 不是完全按照这里描述的顺序出现,还是按照Netty启动的顺序出现. 这里这是单独罗列. 😂
能收获到什么 📌
希望看完后,能理解他们之间的关系. 😂
视频版 🌲
文字版 👷
NioEventLoopGroup的初始化
NioEventLoopGroup 构造方法
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
//看UML
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
MultithreadEventLoopGroup 构造方法
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
MultithreadEventExecutorGroup 构造方法
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
初始化核心逻辑
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//跳过校验
//这里只是一个数组. 还没有实例化填充数据
//children 数组对象 EventExecutor
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//交给子类去实例化,子类可以自由选择,可以是NIO,可以是Epoll也可以是Kqueue等等
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//初始化失败以后的善后工作... 先跳过
}
}
//根据children数组的长度选择EventExecutorChooser,如果是2的n次幂使用
//PowerOfTwoEventExecutorChooser 否则使用 GenericEventExecutorChooser(轮询)
chooser = chooserFactory.newChooser(children);
//终止的监听器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
//设置上
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
MultithreadEventExecutorGroup#newChild
看下 NioEventLoopGroup
的实现
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop
初始化比较简单,具体可以看代码, UML
的关系相对比较重要,关注他的核心方法
- register
- SingleThreadEventExecutor#run
总结
NioEventLoopGroup
的初始化比较简单,主要是了解一个Group
中存在几个NioEventLoop
,数量在初始化的时候指定,默认是CPU核数的2倍
ServerBootstrap 👀
ServerBootstrap serverBootstrap = new ServerBootstrap();
ServerBootstrap
是一个启动类,非线程安全,但是启动仅一次,也没啥关系 (目前还没看到过需要启动多次的情况)
group,handler等设置的方法可以看下代码,简单就直接跳过了
channel
也是一个设置方法,但是设计到一个Channel 的factory
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
} else {
return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass)));
}
}
代码很简单,就是设置一个 ReflectiveChannelFactory
到字段 channelFactory
上ReflectiveChannelFactory
何许人也,一个channel的工厂类,生成模板的方式是反射. 具体的类型需要外网传递进来,例如这里传递进来的就是 NioServerSocketChannel.class
. 后文讲述 initAndRegister
还会描述到。
childHandler
设置childHandler,代码也比较简单,一个普通的set方法,这里提一下他的特殊性.
- child是什么?
- 存在child,那么parent又是什么?
child和parent是相对而已,这里的parent可以理解是ServerSocketChannel. child则是SocketChannel. 所以 childHandler
是给后续链接上来的 Channel
使用的 handler
.. 理解了这一层, option
和 childOption
就不在话下了
我们继续往下走..
bind 📖
绑定端口到socket上,在NIO中讲述的时候比较简单,获取 ServerSocketChannel
,然后调用 Socket
的 bind
就可以完成一个NIO的启动了. 但是在Netty中
ServerSocketChannel
在哪里,只有一个NioServerSocketChannel.class
,还是Netty
包的.Socket
的bind
到底是在哪里调用的.
public ChannelFuture bind(int inetPort) {
return this.bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
this.validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
} else {
return this.doBind(localAddress);
}
}
doBind 🐎
private ChannelFuture doBind(final SocketAddress localAddress) {
//initAndRegister 字面意思,好像逻辑都在这里边了
final ChannelFuture regFuture = this.initAndRegister();
final Channel channel = regFuture.channel();
//如果初始化和注册存在异常,说明注册失败.
if (regFuture.cause() != null) {
return regFuture;
} else if (regFuture.isDone()) {
//注册完成了那就去执行bind操作
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
//还在处理当中,那就等注册完成的时候回调这里完成doBind0
final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
initAndRegister 💥
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//channelFactory 就是上文提到的 ReflectiveChannelFactory
//生成的方式很简单,就是根据传递到底class进行一次反射调用
channel = this.channelFactory.newChannel();
this.init(channel);
} catch (Throwable var3) {
if (channel != null) {
channel.unsafe().closeForcibly();
return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3);
}
//反射成功以后,执行注册操作
ChannelFuture regFuture = this.config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
🦑 由于这里涉及到 NioServerSocketChannel
的初始化,这里先打断,看完他的反射初始化逻辑
NioServerSocketChannel 的初始化🔨
ReflectiveChannelFactory的反射
public T newChannel() {
try {
return (Channel)this.clazz.getConstructor().newInstance();
} catch (Throwable var2) {
throw new ChannelException("Unable to create Channel from class " + this.clazz, var2);
}
}
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2);
}
}
public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
super((Channel)null, channel, 16);
this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket());
}
AbstractNioMessageChannel 的实例化
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
AbstractNioChannel 的实例化
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
//设置感兴趣的事件列表,后续在注册的时候会使用到
this.readInterestOp = readInterestOp;
//设置为非阻塞.. 调用的是jdk SelectableChannel的方法
ch.configureBlocking(false);
}
AbstractChannel 的实例化
protected AbstractChannel(Channel parent) {
this.parent = parent;
this.id = this.newId();
//unsafe的实例化,这里到最后描述
this.unsafe = this.newUnsafe();
//pipeline的实例化,这里到最后描述
this.pipeline = this.newChannelPipeline();
}
初始化channel
反射完成 channel
实例化, 现在进入 init()
的过程.
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//获取pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
//填充option
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
//NioServerSocket 初始化的时候会进入到这里来到
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//继续添加一个ServerBootstrapAcceptor,用于处理即将到来的链接
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
如果没有看完全部的,这里可能看的有点蒙B,但是没关系,先看一下,等到看完全部的时候就会得到升华或者是忘记了. 😂
pipeline执行addLast的时候是直接执行的,等到讲Pipeline和ChannelHandlerContext 的时候再来回顾好了.
ServerBootstrap 的bind后续
在 initAndRegister
的后续中,补充了 NioServerSocketChannel
的初始化,目前已经完成了 init
的过程,现在继续看 register
的过程.
ChannelFuture regFuture = this.config().group().register(channel);
- this.config() 上文描述的 ServerBootstrapConfig
- group() 获取的是ServerBootstrapConfig中初始化的
group
, 而这个group在server端是NioEventLoopGroup
所以我们继续看 NioEventLoopGroup
的注册逻辑. 发现这个接口的实现有5个,排除调测试的还有4个
这个时候又会过过头看下 NioEventLoopGroup
的类图,发现 MultithreadEventLoopGroup
是他的👨父类.我们继续往下走..
MultithreadEventLoopGroup的注册
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
嗯,现在继续看下 next()
是何方神圣.
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
继续看下父类(MultithreadEventExecutorGroup)的实现
具体那个父类,可以继续看UML类图
@Override
public EventExecutor next() {
//很好的chooser,好像在哪里看到过
//确实在NioEventLoopGroup的初始化的时候确认过眼神
//根据children数组的长度选择EventExecutorChooser,如果是2的n次幂使用
//PowerOfTwoEventExecutorChooser 否则使用 GenericEventExecutorChooser(轮询)
return chooser.next();
}
- 我们可以选择进去看看到底是怎么选择
- 我们也可以选择不看了.
我们已经知道了 next()
===> EventLoop
===> newChild
实例化的对象 ===> NioEventLoop
NioEventLoop 的register
实例化NioEventLoopGroup的时候稍微介绍过 NioEventLoop
, 说了他的实例化很简单,只需要简单的关注他的
- register和run方法
现在我们就遇到了他的 Registry
😅 结果我们在 NioEventLoop
中没有找到 Registry
方法, 继续看父类,必须父类有自己就有. 果不其然,在 SingleThreadEventLoop
找到了
再次重申UML类图的重要性
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
嗯, unsafe()
又是何许人也.
unsafe
经历了几个跳转以后,我们千万别忘了 channel()
就是我们刚开始反射创建的 NioServerSocketChannel
在 NioServerSocketChannel
初始化的时候,补充了2个注释
- unsafe的实例化,这里到最后描述
- pipeline的实例化,这里到最后描述
终于到最后了.
继续到 AbstractChannel
的构造方法中看看是如何实现的.
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected abstract AbstractUnsafe newUnsafe();
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
这里的 newUnsafe
是一个抽象方法,看看子类是怎么实现的.
子类稍微有点多,就算把测试排除完,也有好几个. 但是我们这里描述的是 NioServerSocketChannel
,只需要看它的类图上出现了哪些即可,经过精确查找,我们找到了 AbstractNioMessageChannel
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
NioMessageUnsafe
什么东西,先上类图.
确实是 Unsafe
的孩子,没有错,继续顺藤摸瓜找到 registry方法
,类图是可以直接找到该方法的,但是不想搞一个这么大的图了. 沿着 NioMessageUnsafe
最终在 AbstractUnsafe
中找到了,我们继续看下.
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
//只能自己注册自己,不能让别人来自己
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//这个channel和这个eventLoop绑定起来 一个channel和一个线程绑定起来
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
这个注册方法稍微有点大,我们拆开一点一点看.
先看看参数
- EventLoop eventLoop 它的实现是SingleThreadEventLoop,但是更具体的应该是
NioEventLoop
具体就不描述了,整个调用链路下来很清晰了. - final ChannelPromise promise 看调用链路即可
isCompatible(eventLoop)
AbstractNioChannel#isCompatible
为啥是它呢,看UML
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioEventLoop;
}
既然在这里补充了这么多,那就在补充一下这句
//这个channel和这个eventLoop绑定起来 一个channel和一个线程绑定起来
AbstractChannel.this.eventLoop = eventLoop;
为啥是 AbstractChannel.this.eventLoop
,因为 AbstractUnsafe
是在 AbstractChannel
中的内部类,所以是这种写法
另外
- 每一个
Channel
都会和一个EventLoop
进行绑定,就是NioServerSocketChannel
也不例外,当然NioSocketChannel
那就更加不例外了. 所以这里的设置是将 eventLoop 给对应的 channel 进行绑定eventLoop.inEventLoop()
也是一个接口,从URL和实现找到它在那个类中,AbstractEventExecutor
可以找到方法
继续找,@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
SingleThreadEventExecutor
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
thread
参数好理解,this.thread
是什么东西,一路上从未遇到过, 再看看它是在哪里赋值
的
使用thread =
在SingleThreadEventExecutor
中查找,仅有5处出现.doStartThread
处是正确赋值的地方. 相比大家应该都知道了..
一路下来就是这里调用的了,那么基本上可以确定,this.thread == null
了,所以eventLoop.inEventLoop()
会返回false. 走异步逻辑去进行注册. 而走异步注册逻辑的时候,刚好调用的是上图的execute
,巧不巧. 😂
除了这种思路是可以说明以外,其实还有一种.
如果现在是一个main方法在跑,那么现在的主流程还是在 main
线程上,那么main线程会等同于 NioEventLoop
中的 thread
吗,想必是不会的.
register0
提交了注册任务到 NioEventLoop
后,什么时候执行,就要看这个线程争不争气了.
private void register0(ChannelPromise promise) {
//去掉了异常和判断的变量,不影响阅读,注释保留
doRegister();
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
}
这段注册流程涉及了 pipeline
, 所以一行一行的看下
doRegister 调用子类来进行注册,代码在
AbstractNioChannel
中,比较简单. 唯一需要关注的就是0这个参数,为什么是0呢,注册的时候不应该是OP_ACCEPT
吗selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
invokeHandlerAddedIfNeeded 下一个小节描述.
- fireChannelRegistered 下下一个小节描述
- isActive channel是否调用了
bind
的系统调用,很显然到目前为止并未调用,所以是false
.
到这里为主,registry的流程算是真正的完成了. 但是 bind
的流程也就走完了初始化 channel的过程,真实的系统调用 bind
还没有执行呢,所以我们继续往下看,但是真实的看 bind
之前,我们先差个楼,看下 pipeline
的庐山真面目.
Pipeline 🐂
pipeline 英文翻译流水线和Filter有异曲同工的意思。
先上UML类图
Pipeline的初始化
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//初始化双向链表的头部和尾部
tail = new TailContext(this);
head = new HeadContext(this);
//双向链表 头部指向尾部,尾部指向头部.
head.next = tail;
tail.prev = head;
}
Pipeline
的初始化很简单,就是在pipeline内部建立一个双向链表.
链表的头部是一个 ChannelHandlerContext
尾部也是一个 ChannelHandlerContext
Pipeline 添加节点
Pipeline 作为一个链表,作为核心的莫过于添加节点和删除节点,那么添加和删除的时候还有哪些额外的操作呢.
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) {
addLast(executor, null, h);
}
return this;
}
//添加节点核心逻辑
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//检查handler是否重复
checkMultiplicity(handler);
//建立一个context上下文, 和handler建立绑定关系,具体可以看下一个小节的讲述
//new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
//newContext的就是实例化一个DefaultChannelHandlerContext对象
newCtx = newContext(group, filterName(name, handler), handler);
//将newCtx添加到链表中,具体的操作请看数据结构链表一节.
addLast0(newCtx);
//到这一步为止registered可能是false也是true
//如果是server刚刚初始化的时候,registerred 肯定是false.
//所以将任务添加到待执行任务的尾部(待执行任务也是一个链表,等待channel注册的时候这些context的
//任务会一并触发),并修改context的状态标识位.
if (!registered) {
//设置状态标识位
newCtx.setAddPending();
//填充任务到链表
callHandlerCallbackLater(newCtx, true);
return this;
}
//如果已经注册完成了,
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
//设置状态标识位
newCtx.setAddPending();
//执行任务(执行完任务后会修改状态标志为ADD_COMPLETE)
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
这一点是pipeline添加节点的核心内容,那么添加节点的任务是在什么时候触发的呢 ?
其实在服务启动的时候已经有过描述,在 初始化channel 一节 的时候为其添加了一个 handler
. 那个时候 pipeline
的结构是这样的
添加前
添加后
Context存在一个转换标志位的过程,那么这个过程是在什么时间节点执行的呢
在讲述 Regiseter0
的时候还遗留了2个问题到下下小节讲解,现在来解决第一个问题.
- invokeHandlerAddedIfNeeded
这个是为了防止用户使用ChannelFutureListener来触发pipeline的一些事件,所以需要提前吧pipeline上节点的Context进行初始化.
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
继续调用所有添加的handler
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
//获取链表头部节点
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
//头部节点的类型是什么呢 PendingHandlerCallback是一个抽象类
//它的实现有PendingHandlerAddedTask
//PendingHandlerRemovedTask
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
//轮询执行任务
task.execute();
task = task.next;
}
}
所以在执行的时候会触发PendingHandlerAddedTask任务,继续看看该任务做了啥
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
//继续调用
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
remove0(ctx);
ctx.setRemoved();
}
}
}
added0
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
//异常处理过滤掉
ctx.setAddComplete();
ctx.handler().handlerAdded(ctx);
}
这里就完成了2件事
- 设置标志位,表明pipeline上的context节点可以使用了
- handlerAdded 将Context设置到handler中去,表明了Handler已经可以处理任何事件了.
到目前为止,pipeline已经可以处理即将到来的事件了,这也就表明后续的register也会被pipeline进行处理.
总结
总体来说,Netty中的pipeline是一个责任链模式 (filter chain) , 通过一个双向链表将 ChannelHandlerContext
勾连起来,既然是责任链模式,那我们核心需要关注的就是链的头部,也就是什么时候开展的,如果我们过于沉迷于中间的 handler
, 那么就会陷入 Debug
的汪洋大海.
ChannelHandlerContext和ChannelHandler 🏁
将这两者放在一起说是他们存在一个上下文的关系.
ChannelHandler 具体处理业务逻辑的地方. 例如读取,写入,出现异常如何处理等等
- handlerAdded
- handlerRemoved
- exceptionCaught
子类有很多的拓展,这里不一一在描述了
ChannelHandlerContext
- channel
- executor
- handler 和哪一个handler绑定
ChannelHandlerContext 同它的名字一样肯定是和一个 ChannelHandler 进行绑定的,从它的方法也可已看的出来.
这样就形成了一个一对一的关系 , 一个 ChannelHandlerContext 和 ChannelHandler 成一个整体,共同成了 Pipeline
上的一个点.
ChannelHandlerContext
ChannelHandler
回顾Pipeline
在经历了 Pipeline
和 ChannelHandlerContext
后,我们在来回顾一开始遇到的问题
- pipeline.fireChannelRegistered 触发一个
Channel
注册事件. 且是在Pipeline
中进行触发.
```java static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { //next.executor ==> 注册channel的NioEventLoop //特别注意这里的next是ChannelHandlerContext实例对象,不是EventLoopGroup实例对象 EventExecutor executor = next.executor(); if (executor.inEventLoop()) {@Override
public final ChannelPipeline fireChannelRegistered() {
//静态方法 从head头部开始
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
} else {//成功触发
next.invokeChannelRegistered();
} }executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
触发Context中注册事件
```java
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
如果handler的状态标识位是添加完成就可以处理
private boolean invokeHandler() {
//初始化的时候会调用CAS,将handlerState设置成ADD_COMPLETE
int handlerState = this.handlerState;
//这里跳过order,因为ordered一直都是true
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
标志位的设置是在注册触发前执行的. 所以这里必然是ADD_COMPLETE
使用Context对应的Handler处理注册逻辑. 这里的Context 是Head
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded(); //这里的逻辑上文已经描述过
ctx.fireChannelRegistered();//继续触发下一次的注册逻辑.
}
继续执行下一次的逻辑
@Override
public ChannelHandlerContext fireChannelRegistered() {
//findContextInbound() ,上面说过,context是一个链表,这里就是循环遍历链表获取下一个Context
invokeChannelRegistered(findContextInbound());
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
这里就完成链一个环的构建。如果需要添加一个新的handler,直接在 Pipeline
上进行处理即可.
故事说到这里就完成 pipeline.fireChannelRegistered
, 那问题来了,我们在ServerBootStrap中设置的handler是什么时候填充上去的呢. 别急我们继续往下走。
bind 后续
至此,我们已经完成了 Channel
的初始化, 已经selector的注册操作,虽然注册的是 0
,并不是我们想要的 OP_ACCEPT
, 但好说也是注册上了。 顺道看了 Pipeline
的数据结构以及它是如何初始化的. 同时也了解了 ChannelHandlerContext
和 HandlerContext
,这里我们继续了解 bind
的后续,我们将一步一步走向 Socket
是如何执行 bind
这个系统调用的.
private ChannelFuture doBind(final SocketAddress localAddress) {
//异常部分就跳过了.
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
上边已经讲完了 initAndRegister
,并且顺便说明了 Register
是在新起的 NioEventLoop
中执行的,那么到这里的时候是否已经执行完了,我们不得而知,所以用了2个if分支来进行处理
- 如果回调说明 isDone, 那么直接主线程执行
doBind0()
- 如果此时尚未完成注册,那么在注册完成回调的时候执行
doBind0()
不论是否完成,我们继续走向 doBind0()
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
channel 是 NioServerSocketChannel , 在AbstrctChannel中找到该方法
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
由pipeline,触发bind事件
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
从尾部的上下文开始一路进行 bind
, 可以大胆推测它的bind逻辑是否和 register
时是否一致。
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
//这些都是老朋友了findContextOutbound,executor等等
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
//肯定会走这里的逻辑
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
继续执行bind逻辑
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
我们的推测和代码逻辑是一致的,bind的过程和register是一致的,也是不断的调用context,然后使用对应的handler触发业务逻辑,然后在获取上一个context,如此反复.
既然如此,那么bind的底在哪里了.
既然 Pipeline
是一个双向链表,那么最终会到底链表的头部. Head
处,那我们支持看 Head
处的bind.
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
//unsafe==channel.unsafe
unsafe.bind(localAddress, promise);
}
嗯,又是和register一样,还是来到了 unsafe
,最终我们在 register
的下一个方法找到了bind方法
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
//现在肯定是false,还没有调用过bind呢
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//现在肯定是true了
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
//触发channel active事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
doBind(**)
是一个抽象方法,它的实现在 NioServerSocketChannel
中
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
这里就是直接调用jdk的socket绑定了,nice,重要看到bind的系统调用了.
到这里为止,register和bind算是完成了
既然bind完成了,那么我们继续向下走,把方法走完本次分析就结束了.
pipeline.fireChannelActive();
bind结束后,会触发一个channel active事件,这里肯定也是pipeline的事件传播. 但是因为我们的selector 注册的还是0,所以继续看看.
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
active事件是从头部开始的,看下它的handler实现是啥子.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
嗯,head的实现比较简单
- fireChannelActive 继续传播active事件
readIfIsAutoRead 读取?继续看看
private void readIfIsAutoRead() {
//channel.config 牵扯的太多了,本次分析跳过config,这里返回的true
if (channel.config().isAutoRead()) {
channel.read();
}
}
😣 由于牵扯的太多了,慢慢的分析下来完全超出了我的预料,所以这里先不谈
channel.config
了
尽管已经写了这么,但是在注册的流程中,不要忘了channel===> NioServerSocketChannel, 感觉去看看实现吧@Override
public Channel read() {
pipeline.read();
return this;
}
我擦,又进入了
pipeline
,继续@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
从尾部开始读取的,直接到链表的最后head看看
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
没错,又是unsafe,涉及到读写等IO操作到好像都是它来干的.
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
继续瞅瞅子类AbstractNioChannel的实现
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
//这里的interestOps 就是我们一开始设置的0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
😂 终于出来了
readInterestOp
就是反射实例话Channel
时传递进去的数据呀.public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
至此,本次分析终于完成了,最后的设置Promise就很简单了. 大家可以自己看看.
总结 🀄️
Netty的启动流程写出来比我想象中的要难一点,整个流程比较长,涉及的类也较多,不可能面面俱到.
从一个高的层面去看它的实现,不要一下子就陷入Debug的汪洋大海中去.
耐心一点,慢慢看总会明白的