1、initAndRegister()
final ChannelFuture regFuture = initAndRegister();
2、反射创建 NioServerSocketChannel
final ChannelFuture initAndRegister() { Channel channel = null; try { // channelFactory = new ReflectiveChannelFactory() // channel = new NioServerSocketChannel() // channelFactory 里面保存了 NioServerSocketChannel 的构造器对象 // ★★★ 当执行 channelFactory.newChannel() 会执行 constructor.newInstance(),相当于 new NioServerSocketChannel() // ★★★ 所以会执行 new NioServerSocketChannel() 的构造方法 // ★★★ 在 new NioServerSocketChannel() 中创建了 pipeline 及数据结构 // ★★★ 在 new NioServerSocketChannel() this.readInterestOp = SelectionKey.OP_ACCEPT channel = channelFactory.newChannel(); // ★★★ 开始进行初始化,ServerBootstrap.init(channel) // ★★★ init 方法主要是将 DefaultChannelHandlerContext 添加到 pipeline 中 init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 省略下面的代码... }
3、调用反射工厂创建 channel
channel = channelFactory.newChannel();@Overridepublic T newChannel() { try { // ★★★ 反射的方式:实例化 NioServerSocketChannel // ★★★ 指定了 this.readInterestOp = SelectionKey.OP_ACCEPT return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); }}
4、反射创建 NioServerSocketChannel 实例
constructor.newInstance();@Overridepublic T newChannel() { try { // ★★★ 反射的方式:实例化 NioServerSocketChannel // ★★★ 指定了 this.readInterestOp = SelectionKey.OP_ACCEPT return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); }}
5、给 this.readInterestOp 赋值为 SelectionKey.OP_ACCEPT
public NioServerSocketChannel() { // ★★★ 当 AbstractBootstrap 执行 channelFactory.newChannel() 时 // 就会调用实例化 new NioServerSocketChannel() // DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider() // newSocket(DEFAULT_SELECTOR_PROVIDER) = new ServerSocketChannel() this(newSocket(DEFAULT_SELECTOR_PROVIDER));}public NioServerSocketChannel(ServerSocketChannel channel) { // ★★★ ServerSocketChannel 只关心 OP_ACCEPT 事件 super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket());}protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // parent = null // ch = ServerSocketChannel // readInterestOp = SelectionKey.OP_ACCEPT super(parent, ch, readInterestOp);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // 对于服务端来说 parent = null // ch = ServerSocketChannel // readInterestOp = SelectionKey.OP_ACCEPT = 1 << 4 = 16 // ★★★ 创建了 pipeline 的数据结构 super(parent); this.ch = ch; this.readInterestOp = readInterestOp; // SelectionKey.OP_ACCEPT = 1 << 4 = 16 try { // serverSocketChannel.configureBlocking(false); 设置非阻塞 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially initialized socket.", e2); } throw new ChannelException("Failed to enter non-blocking mode.", e); }}
6、启动线程 startThread();
private void startThread() { // 第一次 state = ST_NOT_STARTED = 1 if (state == ST_NOT_STARTED) { // CAS 判断,并将 state 设置为 2 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { // ★★★ 启动线程 doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } }}
SingleThreadEventExecutor.this.run();
7、处理 IO 事件
processSelectedKeys();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // ★★★ unsafe = NioMessageUnsafe unsafe.read(); }
8、pipeline 完成事件传播
pipeline.fireChannelReadComplete(); @Overridepublic final ChannelPipeline fireChannelReadComplete() { AbstractChannelHandlerContext.invokeChannelReadComplete(head); return this;}static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelReadComplete(); } else { Tasks tasks = next.invokeTasks; if (tasks == null) { next.invokeTasks = tasks = new Tasks(next); } executor.execute(tasks.invokeChannelReadCompleteTask); }}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) { ctx.fireChannelReadComplete(); // 如果自定读 readIfIsAutoRead();}private void readIfIsAutoRead() { // 返回 1 if (channel.config().isAutoRead()) { // ★★★ 开启读事件,如果是服务端就注册 OP_ACCEPT 事件 channel.read(); }}@Overridepublic ChannelHandlerContext read() { // next = head final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // ★★★ 执行读,如果是服务端就注册 OP_ACCEPT next.invokeRead(); } else { Tasks tasks = next.invokeTasks; if (tasks == null) { next.invokeTasks = tasks = new Tasks(next); } executor.execute(tasks.invokeReadTask); } return this;}private void invokeRead() { if (invokeHandler()) { try { // ★★★ handler() = head ((ChannelOutboundHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); } } else { read(); }}
9、调用 unsafe 的 beginRead() 方法
@Overridepublic void read(ChannelHandlerContext ctx) { unsafe.beginRead();}@Overridepublic final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { // ★★★ 注册 OP_ACCEPT,可以接受客户端连接请求了 doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); }}
10、通过 AbstractNioChannel 的 doBeginRead(),改变为 OP_ACCEPT
@Overrideprotected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called // 在 doRegister() 的时候 给 this.selectionKey 赋值 // this.selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; // interestOps = 0 final int interestOps = selectionKey.interestOps(); // readInterestOp = SelectionKey.OP_ACCEPT = 1 << 4 = 16 // readInterestOp 是在创建 new NioServerSocket() 的时候给定的值 // (0 & 16) == 0 if ((interestOps & readInterestOp) == 0) { // 0 | 16 == 16 // ★★★ 指定 selectionKey.interestOps(SelectionKey.OP_ACCEPT) 接受客户端连接 selectionKey.interestOps(interestOps | readInterestOp); }}