1、initAndRegister()

  1. final ChannelFuture regFuture = initAndRegister();

2、反射创建 NioServerSocketChannel

  1. final ChannelFuture initAndRegister() {
  2. Channel channel = null;
  3. try {
  4. // channelFactory = new ReflectiveChannelFactory()
  5. // channel = new NioServerSocketChannel()
  6. // channelFactory 里面保存了 NioServerSocketChannel 的构造器对象
  7. // ★★★ 当执行 channelFactory.newChannel() 会执行 constructor.newInstance(),相当于 new NioServerSocketChannel()
  8. // ★★★ 所以会执行 new NioServerSocketChannel() 的构造方法
  9. // ★★★ 在 new NioServerSocketChannel() 中创建了 pipeline 及数据结构
  10. // ★★★ 在 new NioServerSocketChannel() this.readInterestOp = SelectionKey.OP_ACCEPT
  11. channel = channelFactory.newChannel();
  12. // ★★★ 开始进行初始化,ServerBootstrap.init(channel)
  13. // ★★★ init 方法主要是将 DefaultChannelHandlerContext 添加到 pipeline 中
  14. init(channel);
  15. } catch (Throwable t) {
  16. if (channel != null) {
  17. // channel can be null if newChannel crashed (eg SocketException("too many open files"))
  18. channel.unsafe().closeForcibly();
  19. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
  20. return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
  21. }
  22. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
  23. return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
  24. }
  25. // 省略下面的代码...
  26. }

3、调用反射工厂创建 channel

  1. channel = channelFactory.newChannel();
  2. @Override
  3. public T newChannel() {
  4. try {
  5. // ★★★ 反射的方式:实例化 NioServerSocketChannel
  6. // ★★★ 指定了 this.readInterestOp = SelectionKey.OP_ACCEPT
  7. return constructor.newInstance();
  8. } catch (Throwable t) {
  9. throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
  10. }
  11. }

4、反射创建 NioServerSocketChannel 实例

  1. constructor.newInstance();
  2. @Override
  3. public T newChannel() {
  4. try {
  5. // ★★★ 反射的方式:实例化 NioServerSocketChannel
  6. // ★★★ 指定了 this.readInterestOp = SelectionKey.OP_ACCEPT
  7. return constructor.newInstance();
  8. } catch (Throwable t) {
  9. throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
  10. }
  11. }

5、给 this.readInterestOp 赋值为 SelectionKey.OP_ACCEPT

  1. public NioServerSocketChannel() {
  2. // ★★★ 当 AbstractBootstrap 执行 channelFactory.newChannel() 时
  3. // 就会调用实例化 new NioServerSocketChannel()
  4. // DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()
  5. // newSocket(DEFAULT_SELECTOR_PROVIDER) = new ServerSocketChannel()
  6. this(newSocket(DEFAULT_SELECTOR_PROVIDER));
  7. }
  8. public NioServerSocketChannel(ServerSocketChannel channel) {
  9. // ★★★ ServerSocketChannel 只关心 OP_ACCEPT 事件
  10. super(null, channel, SelectionKey.OP_ACCEPT);
  11. config = new NioServerSocketChannelConfig(this, javaChannel().socket());
  12. }
  13. protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  14. // parent = null
  15. // ch = ServerSocketChannel
  16. // readInterestOp = SelectionKey.OP_ACCEPT
  17. super(parent, ch, readInterestOp);
  18. }
  19. protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  20. // 对于服务端来说 parent = null
  21. // ch = ServerSocketChannel
  22. // readInterestOp = SelectionKey.OP_ACCEPT = 1 << 4 = 16
  23. // ★★★ 创建了 pipeline 的数据结构
  24. super(parent);
  25. this.ch = ch;
  26. this.readInterestOp = readInterestOp; // SelectionKey.OP_ACCEPT = 1 << 4 = 16
  27. try {
  28. // serverSocketChannel.configureBlocking(false); 设置非阻塞
  29. ch.configureBlocking(false);
  30. } catch (IOException e) {
  31. try {
  32. ch.close();
  33. } catch (IOException e2) {
  34. logger.warn(
  35. "Failed to close a partially initialized socket.", e2);
  36. }
  37. throw new ChannelException("Failed to enter non-blocking mode.", e);
  38. }
  39. }

6、启动线程 startThread();


  1. private void startThread() {
  2. // 第一次 state = ST_NOT_STARTED = 1
  3. if (state == ST_NOT_STARTED) {
  4. // CAS 判断,并将 state 设置为 2
  5. if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
  6. boolean success = false;
  7. try {
  8. // ★★★ 启动线程
  9. doStartThread();
  10. success = true;
  11. } finally {
  12. if (!success) {
  13. STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
  14. }
  15. }
  16. }
  17. }
  18. }
  1. SingleThreadEventExecutor.this.run();

7、处理 IO 事件

  1. processSelectedKeys();
  1. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  2. // ★★★ unsafe = NioMessageUnsafe
  3. unsafe.read();
  4. }

8、pipeline 完成事件传播

  1. pipeline.fireChannelReadComplete();
  2. @Override
  3. public final ChannelPipeline fireChannelReadComplete() {
  4. AbstractChannelHandlerContext.invokeChannelReadComplete(head);
  5. return this;
  6. }
  7. static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
  8. EventExecutor executor = next.executor();
  9. if (executor.inEventLoop()) {
  10. next.invokeChannelReadComplete();
  11. } else {
  12. Tasks tasks = next.invokeTasks;
  13. if (tasks == null) {
  14. next.invokeTasks = tasks = new Tasks(next);
  15. }
  16. executor.execute(tasks.invokeChannelReadCompleteTask);
  17. }
  18. }
  19. @Override
  20. public void channelReadComplete(ChannelHandlerContext ctx) {
  21. ctx.fireChannelReadComplete();
  22. // 如果自定读
  23. readIfIsAutoRead();
  24. }
  25. private void readIfIsAutoRead() {
  26. // 返回 1
  27. if (channel.config().isAutoRead()) {
  28. // ★★★ 开启读事件,如果是服务端就注册 OP_ACCEPT 事件
  29. channel.read();
  30. }
  31. }
  32. @Override
  33. public ChannelHandlerContext read() {
  34. // next = head
  35. final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
  36. EventExecutor executor = next.executor();
  37. if (executor.inEventLoop()) {
  38. // ★★★ 执行读,如果是服务端就注册 OP_ACCEPT
  39. next.invokeRead();
  40. } else {
  41. Tasks tasks = next.invokeTasks;
  42. if (tasks == null) {
  43. next.invokeTasks = tasks = new Tasks(next);
  44. }
  45. executor.execute(tasks.invokeReadTask);
  46. }
  47. return this;
  48. }
  49. private void invokeRead() {
  50. if (invokeHandler()) {
  51. try {
  52. // ★★★ handler() = head
  53. ((ChannelOutboundHandler) handler()).read(this);
  54. } catch (Throwable t) {
  55. notifyHandlerException(t);
  56. }
  57. } else {
  58. read();
  59. }
  60. }

9、调用 unsafe 的 beginRead() 方法

  1. @Override
  2. public void read(ChannelHandlerContext ctx) {
  3. unsafe.beginRead();
  4. }
  5. @Override
  6. public final void beginRead() {
  7. assertEventLoop();
  8. if (!isActive()) {
  9. return;
  10. }
  11. try {
  12. // ★★★ 注册 OP_ACCEPT,可以接受客户端连接请求了
  13. doBeginRead();
  14. } catch (final Exception e) {
  15. invokeLater(new Runnable() {
  16. @Override
  17. public void run() {
  18. pipeline.fireExceptionCaught(e);
  19. }
  20. });
  21. close(voidPromise());
  22. }
  23. }

10、通过 AbstractNioChannel 的 doBeginRead(),改变为 OP_ACCEPT

  1. @Override
  2. protected void doBeginRead() throws Exception {
  3. // Channel.read() or ChannelHandlerContext.read() was called
  4. // 在 doRegister() 的时候 给 this.selectionKey 赋值
  5. // this.selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
  6. final SelectionKey selectionKey = this.selectionKey;
  7. if (!selectionKey.isValid()) {
  8. return;
  9. }
  10. readPending = true;
  11. // interestOps = 0
  12. final int interestOps = selectionKey.interestOps();
  13. // readInterestOp = SelectionKey.OP_ACCEPT = 1 << 4 = 16
  14. // readInterestOp 是在创建 new NioServerSocket() 的时候给定的值
  15. // (0 & 16) == 0
  16. if ((interestOps & readInterestOp) == 0) {
  17. // 0 | 16 == 16
  18. // ★★★ 指定 selectionKey.interestOps(SelectionKey.OP_ACCEPT) 接受客户端连接
  19. selectionKey.interestOps(interestOps | readInterestOp);
  20. }
  21. }