1、Reactor三步曲
1.1、注册
将通道就绪事件注册到选择器Selector
AbstractBootstrap.initAndRegister() -> MultithreadEventLoopGroup.register() -> SingleThreadEventLoop.register()
-> AbstractChannel.AbstractUnsafe.register() -> AbstractChannel.register0() -> AbstractNioChannel.doRegister()
AbstractNioChannel.doRegister()代码
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
//把通道作为附件绑定在选择器key上
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
1.2、轮询
EventLoop拥有一个轮询线程和选择器,那么轮询线程是什么时候启动的呢?
时机:SocketChannel注册到eventLoop的selector之前
AbstractChannel.AbstractUnsafe.register() 代码
1、多个两个独立反应器
一般来说服务端一般划分为两个独立的反应器,一个负责新连接的监听和接收,一个负责IO事件的轮询和分发。
但是有些handler处理时间较长我们可以独立划分一个EventLoopGroup
public class BootStrapServer {
public static void main(String[] args) {
//新的EventLoopGroup 处理耗时较长的请求
DefaultEventLoopGroup longTimeHandleGroup = new DefaultEventLoopGroup();
new ServerBootstrap()
//boos只负责NioServerSocketChannel的accept worker负责NioSocketChannel上的读写
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info(byteBuf.toString(Charset.defaultCharset()));
//将消息传递到流水线的下一个Handler
ctx.fireChannelRead(msg);
}
}).addLast(longTimeHandleGroup,"longTimeHandler",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
log.info("longTimeHandler" + byteBuf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8989);
}
}
2、Handler是如何实现EventLoop切换的?
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程,返回下一个Handler的EventLoop
EventExecutor executor = next.executor();
// 判断当前线程是否和下一个Handler的EventLoop为同一个线程,如果是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 如果两个 handler 绑定的是同一个线程,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用