1、Reactor三步曲

1.1、注册

将通道就绪事件注册到选择器Selector

AbstractBootstrap.initAndRegister() -> MultithreadEventLoopGroup.register() -> SingleThreadEventLoop.register()
-> AbstractChannel.AbstractUnsafe.register() -> AbstractChannel.register0() -> AbstractNioChannel.doRegister()

AbstractNioChannel.doRegister()代码

  1. protected void doRegister() throws Exception {
  2. boolean selected = false;
  3. while(true) {
  4. try {
  5. //把通道作为附件绑定在选择器key
  6. this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
  7. return;
  8. } catch (CancelledKeyException var3) {
  9. if (selected) {
  10. throw var3;
  11. }
  12. this.eventLoop().selectNow();
  13. selected = true;
  14. }
  15. }
  16. }

1.2、轮询

EventLoop拥有一个轮询线程和选择器,那么轮询线程是什么时候启动的呢?

时机:SocketChannel注册到eventLoop的selector之前

AbstractChannel.AbstractUnsafe.register() 代码

image.png

1、多个两个独立反应器

一般来说服务端一般划分为两个独立的反应器,一个负责新连接的监听和接收,一个负责IO事件的轮询和分发。
但是有些handler处理时间较长我们可以独立划分一个EventLoopGroup

image.png

  1. public class BootStrapServer {
  2. public static void main(String[] args) {
  3. //新的EventLoopGroup 处理耗时较长的请求
  4. DefaultEventLoopGroup longTimeHandleGroup = new DefaultEventLoopGroup();
  5. new ServerBootstrap()
  6. //boos只负责NioServerSocketChannelaccept worker负责NioSocketChannel上的读写
  7. .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
  8. .channel(NioServerSocketChannel.class)
  9. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  10. @Override
  11. protected void initChannel(NioSocketChannel ch) throws Exception {
  12. ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
  13. @Override
  14. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  15. ByteBuf byteBuf = (ByteBuf) msg;
  16. log.info(byteBuf.toString(Charset.defaultCharset()));
  17. //将消息传递到流水线的下一个Handler
  18. ctx.fireChannelRead(msg);
  19. }
  20. }).addLast(longTimeHandleGroup,"longTimeHandler",new ChannelInboundHandlerAdapter(){
  21. @Override
  22. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  23. ByteBuf byteBuf = (ByteBuf) msg;
  24. log.info("longTimeHandler" + byteBuf.toString(Charset.defaultCharset()));
  25. }
  26. });
  27. }
  28. })
  29. .bind(8989);
  30. }
  31. }

2、Handler是如何实现EventLoop切换的?


关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

  1. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  2. final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
  3. // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程,返回下一个HandlerEventLoop
  4. EventExecutor executor = next.executor();
  5. // 判断当前线程是否和下一个HandlerEventLoop为同一个线程,如果是,直接调用
  6. if (executor.inEventLoop()) {
  7. next.invokeChannelRead(m);
  8. }
  9. // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
  10. else {
  11. executor.execute(new Runnable() {
  12. @Override
  13. public void run() {
  14. next.invokeChannelRead(m);
  15. }
  16. });
  17. }
  18. }
  • 如果两个 handler 绑定的是同一个线程,那么就直接调用
  • 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用