1、Reactor三步曲
1.1、注册
将通道就绪事件注册到选择器Selector
AbstractBootstrap.initAndRegister() -> MultithreadEventLoopGroup.register() -> SingleThreadEventLoop.register()
-> AbstractChannel.AbstractUnsafe.register() -> AbstractChannel.register0() -> AbstractNioChannel.doRegister()
AbstractNioChannel.doRegister()代码
protected void doRegister() throws Exception {boolean selected = false;while(true) {try {//把通道作为附件绑定在选择器key上this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException var3) {if (selected) {throw var3;}this.eventLoop().selectNow();selected = true;}}}
1.2、轮询
EventLoop拥有一个轮询线程和选择器,那么轮询线程是什么时候启动的呢?
时机:SocketChannel注册到eventLoop的selector之前
AbstractChannel.AbstractUnsafe.register() 代码
1、多个两个独立反应器
一般来说服务端一般划分为两个独立的反应器,一个负责新连接的监听和接收,一个负责IO事件的轮询和分发。
但是有些handler处理时间较长我们可以独立划分一个EventLoopGroup

public class BootStrapServer {public static void main(String[] args) {//新的EventLoopGroup 处理耗时较长的请求DefaultEventLoopGroup longTimeHandleGroup = new DefaultEventLoopGroup();new ServerBootstrap()//boos只负责NioServerSocketChannel的accept worker负责NioSocketChannel上的读写.group(new NioEventLoopGroup(),new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;log.info(byteBuf.toString(Charset.defaultCharset()));//将消息传递到流水线的下一个Handlerctx.fireChannelRead(msg);}}).addLast(longTimeHandleGroup,"longTimeHandler",new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;log.info("longTimeHandler" + byteBuf.toString(Charset.defaultCharset()));}});}}).bind(8989);}}
2、Handler是如何实现EventLoop切换的?
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程,返回下一个Handler的EventLoopEventExecutor executor = next.executor();// 判断当前线程是否和下一个Handler的EventLoop为同一个线程,如果是,直接调用if (executor.inEventLoop()) {next.invokeChannelRead(m);}// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)else {executor.execute(new Runnable() {@Overridepublic void run() {next.invokeChannelRead(m);}});}}
- 如果两个 handler 绑定的是同一个线程,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用

