1.前言
我们知道,在Netty中使用了一套管道模式来处理网络传输上的数据,虽然Netty分为了ChannelInboundHandler和ChannelOutboundHandler两个实现,但是都是在存在同一个管道中,只是调用的时候分开了,接下来我们看看在Netty中具体是如何实现的。
2.结构
我们知道管道模式中,首先得有一个容器让链路在容器之中,所以ChannelPipeline
就刚好充当这容器的角色。
有了容器之后,我们就要开始准备组成我们的链路了,对于链路的方式,第一时间想到的数据结构就是链表了,所以链路的组成是一个链表的类,在Netty中实现链路的类是ChannelHandlerContext
接口的实现类。链表如下。
我们知道数据出入栈的逻辑具体在ChannelHandler
中实现的,所以这里的ChannelHandlerContext
的作用就是类似于一个阀门,可以控制管道的流通以及通过同步还是异步调用等逻辑进行封装。其中也是关联这HandlerContext
来执行具体的出入栈逻辑.
现在我们pipeline的具体细节有了,还差最后一个就是对pipeline生命周期的管理了,只需在pipeline外面在包一层,即可对pipeline进行管理,也就是我们的Channel
以上就是我们netty中关于管道的完整视图了。
3.源码分析
通过上面的图我们已经知道这些接口的具体职责,那么接下来我们就具体分析源码中的实现,首先看看一个正常的netty配置是怎样样的。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作线程组,老板线程组会把任务丢给他,让手下线程组去做任务,服务客户
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// 用于配置Server相关参数,并启动Server
ServerBootstrap b = new ServerBootstrap();
// 配置parentGroup和ChildGroup
b.group(bossGroup, workerGroup)
// 配置通道
.channel(NioServerSocketChannel.class)
// 配置通道的ChannelPipeline
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
// p.addLast(serverHandler);
p.addLast(new EchoClientHandler());
}
});
// 绑定端口,并启动server,同时设置启动方式为同步
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// 等待服务端监听端口关闭
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
3.1 Channel
在启动netty时,我们指定了netty的channel为NioServerSocketChannel
,netty会默认使用一个channel的工厂类
// channelClass:NioServerSocketChannel
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
在BootStrap进行端口绑定时b.bind(PORT)
,就开始通过构造器的newInstance
生成NioServerSocketChannel
的实例对象
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 调用工厂的实例化方法
channel = channelFactory.newChannel();
init(channel);
}
}
现在我们知道netty默认提供的Channel类就是NioServerSocketChannel
,看下其中的继承结构图
在NIO中Channel可以用来对我们的网络数据进行读写等操作,这里不做过多讨论,只关注pipeline的部分。接下来对Channel的元数据进行分析,既然channel能对pipeline进行管理,那么channel中肯定存在着对pipeline对象的引用。在AbstractChannel
中
private final DefaultChannelPipeline pipeline;
通过代码发现,pipeline的生成是在AbstractChannel
的构造函数中,也就是生成NioServerSocketChannel
对象时,就会默认为我们生成一个DefaultChannelPipeline
对象
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 默认生成pipeline
pipeline = newChannelPipeline();
}
在Channel类生成后,netty会添加一个默认的ChannelHandler加入到链表中,后面在调用channel的注册方式时,会执行里面的回调逻辑,讲我们配置的handler(new LoggingHandler(LogLevel.INFO))
加入到链表中
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
3.2 ChannelPipeline
在Netty创建的默认的Channel
实现类中,在其构造函数里会默认帮我们生成一个ChannelPipeline
类,其默认实现类是DefaultChannelPipeline
。如下图所示
我们先想想pipeline相当于一个容器,管理着里面的链路,那么肯定得保存着链路的引用,所以有如下两个元数据
// 头节点
final AbstractChannelHandlerContext head;
// 尾节点
final AbstractChannelHandlerContext tail;
pipeline是存在于Channel中,所以也需要一个Channel的引用
private final Channel channel;
接下来我们看看在该类的构造函数,可以看到在创建管道时,会默认为管道生成头尾两个ChannelHandlerContext
,也就是说我们的链路执行一定是最开始执行HeadContext
,最后执行TailContext
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
pipeline主要作用就是对链表的节点进行管理和调度,我们这里讨论,pipeline对链表的调度分析,通过源码的接口分析,pipeline对链表的调度有如下方法
// ChannelHandlerContext注册事件
@Override
ChannelPipeline fireChannelRegistered();
// ChannelHandlerContext取消注册事件
@Override
ChannelPipeline fireChannelUnregistered();
// 活动事件
@Override
ChannelPipeline fireChannelActive();
// 生命周期结束事件
@Override
ChannelPipeline fireChannelInactive();
// 异常捕获
@Override
ChannelPipeline fireExceptionCaught(Throwable cause);
// 如果触发了用户事件,则被调用。
@Override
ChannelPipeline fireUserEventTriggered(Object event);
// 数据读取事件
@Override
ChannelPipeline fireChannelRead(Object msg);
// 数据读取完成事件
@Override
ChannelPipeline fireChannelReadComplete();
// channel读写状态变更事件
@Override
ChannelPipeline fireChannelWritabilityChanged();
pipeline提供了对出入站数据处理的调用逻辑,默认都会从head头节点开始调用
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
3.3 ChannelHandlerContext
我们知道管道中的链路是由一个个ChannelHandlerContext
实现类组成,其中netty为我们提供的抽象父类为AbstractChannelHandlerContext
类,每个pipeline中的节点都继承了该父类,那么我们就对该类的元数据进行分析。
pipeline的链路是由一个双向链表组成的,那么我们需要一个上下节点的引用关系,如下
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
再然后我们的链表要依托在哪个管道中执行,所以还存着管道的对象
private final DefaultChannelPipeline pipeline;
我们知道ChannelHandlerContext
充当着管道阀的作用,那么阀自然就有状态去控制去开启还是关闭,在ChannelHandlerContext
有如下几个状态
// 添加中
private static final int ADD_PENDING = 1;
// 添加完成
private static final int ADD_COMPLETE = 2;
// 移除
private static final int REMOVE_COMPLETE = 3;
// 初始化
private static final int INIT = 0;
然后在我们执行的时候,首先就会判断当前ChannelHandlerContext
所处的状态,以读取数据方法为例
- 首先判断所处的状态是否为ADD_COMPLETE或者状态处于ADD_PENDING且ordered是无序的
- 如果为true,则执行
ChannelHandler
的事件
通过这样,即便当前节点被加入到了链表中,但是状态值不对的话,那这个处理器是不会执行的
private void invokeChannelRead(Object msg) {
// 如果返回false,则不会调用channelHandler
if (invokeHandler()) {
try {
// 执行handler事件
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
// 选取下个节点
fireChannelRead(msg);
}
}
private boolean invokeHandler() {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
// 判断状态
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
接下来我们就对这几个状态进行分析
在AbstractChannelHandlerContext
类中,有个handlerState
字段标注当前阀所处的状态,其源码如下
private volatile int handlerState = INIT;
可以发现所有ChannelHandlerContext
的初始状态都是init,即0。那么阀的状态在什么时候会进行变更呢?我们设想一下如果想改变当前阀的状态一般有两种方法
- 一个是在当前类
ChannelHandlerContext
主动调用进行状态变更 - 调用该类的类中进行变更,即我们的pipeline中对链表节点的状态进行变更。
在pipeline中进行状态的变更好处就是可以让ChannelHandlerContext
只需关注自己的逻辑而无需控制状态,在netty中采用的是第二种方式,pipeline的主要作用就是对其中的链表进行管理,所以关于状态变更的逻辑也在添加、删除节点中进行了调用,下面看下pipeline中addLast方法的实现
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
// 添加到tail节点前
addLast0(newCtx);
// 如果该值为false时,表示当前channel还没有注册到eventloop上
// 此时会将context的状态设置为
if (!registered) {
// 修改context状态为ADD_PENDING
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 后面进来的context会将状态直接设置为ADD_COMPLETE
callHandlerAdded0(newCtx);
return this;
}
具体只关系状态变更的逻辑,流程如下
- 首先会判断当前channel是否注册到eventloop上
- 如果没有,那么设置当前context的状态为pending
- 同时将当前context添加到PendingHandlerCallback类的链表中
- 如果当前pipeline的registered属性为true的话,那么下面就直接将context的状态为complete
PendingHandlerCallback类实际上是一个线程,我们初始的数据加入到该链表中后,后面netty会执行该线程,并执行callHandlerAdded()方法,将context的状态设置为complete,并且执行handlerAdd回调方法
这时我们已经有个大概的了解了,在pipeline对链表节点进行添加删除时,还会对context的状态进行变更,其中变更的方法如下
final void callHandlerAdded() throws Exception {
if (setAddComplete()) {
handler().handlerAdded(this);
}
}
可以看到,在设置了context的状态后,同时还触发了context的一个添加事件。那么这里就明确了在context状态变更的时候会伴随着回调事件的发生,查看源码发现,context只有两个事件,一个是添加事件,一个是删除事件。
- 当状态变为complete时,执行handlerAdded回调事件
当状态变为REMOVE_COMPLETE时,执行handlerRemoved回调事件
public interface ChannelHandler {
/**
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
}
当然这些事件只会执行一次。
最后我们在对context进行一个总结和回顾
首先context使我们pipeline中比较重要的一个环节,他是我们链路的主要载体,所有的节点之间构成一个双向链表组成一个pipeline,其中netty默认指定了头节点和尾节点
通过之前对context的元数据分析,我们知道context有4个状态,状态的流转如下,其中complete和remove状态还对应着两个回调事件供我们处理
当然ChannelHandlerContext的方法还远不止这些,剩下的通过ChannelHandler一起探寻。3.4 ChannelHandler
ChannelHandler
中分为ChannelInboundHandler
和ChannelOutboundHandler
,这两个接口只是分别用于入站和出站的处理,这里我们只讨论Inbound的逻辑。在我们平时开发时,一般要实现ChannelHandler
只需要继承SimpleChannelInboundHandler
类实现其中的方法即可,那我们就从这个地方开始,首先看下类图
对于我们使用者来说,一般不出意外继承SimpleChannelInboundHandler
就行了,在Adapter中封装了一些公共的逻辑,比如Context的注册等。
这里我们看下ChannelHandler
和ChannelInBoundHandler
中的方法有哪些public interface ChannelHandler {
/**
* 添加到链路中时执行
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* 移除到链路中时执行
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
}
```java public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; }
通过方法名可以发现,都是一些生命周期的回调方法,下面直接给ChannelHandler的生命周期调用顺序<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/12949707/1640671385177-43f6f576-9933-4c0a-bbf0-4c48c1d4eab4.png#clientId=u67506f57-2ccd-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=536&id=ucb7df4f1&margin=%5Bobject%20Object%5D&name=image.png&originHeight=536&originWidth=472&originalType=binary&ratio=1&rotation=0&showTitle=false&size=19936&status=done&style=none&taskId=u81a2f517-d8ec-4aea-bb1b-890187bd065&title=&width=472)<br />对于前两个事件,添加和注册则是在启动netty时注册Channel的时候进行调用,其逻辑如下
```java
private void register0(ChannelPromise promise) {
try {
// 执行add
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 执行register
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
// 执行active
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
上面简化了部分代码,不过关于handler的声明周期调用还是很明显的。其中active方法只有netty进行读写时首先会经过ServerBootStrap在启动时默认添加进链表的一个节点ServerBootstrapAcceptor
,在节点在读取数据是,会调用一次AbstractChannel的register方法,这时会调用链路的fireChannelActive
方法。
对上面的逻辑捋一捋其实就是,在进行ChannelRead
方法时,还伴随着ChannelHandler整个生命周期的调用。