在 server 端收到请求请求并且进入 unsafe.read 后,开始对channel进行处理,这里主要描述对 OP_ACCEEPT 的处理.
<br />这里的 `unsafe` 还是 `NioServerSocketChannel` 中引用的 `unsafe` . <br />`unsafe` 对应关系
- NioServerSocketChannel ==> NioMessageUnsafe
- NioSocketChannel ==> NioSocketChannelUnsafe(NioByteUnsafe)
说明,服务端启动时,accept到的数据是在NioMessageUnsafe中进行处理的,而对应的客户端channel则是由NioSocketChannelUnsafe(NioByteUnsafe) 来进行读取处理.
读取细节
private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();@Overridepublic void read() {//确保是在同一个线程进行处理assert eventLoop().inEventLoop();//获取NioServerSocketChannel启动的配置,配置也很简单final ChannelConfig config = AbstractNioMessageChannel.this.config();//获取NioServerSocketChannel对应的pipelinefinal ChannelPipeline pipeline = AbstractNioMessageChannel.this.pipeline();//一个配置信息(动态调节buffer的大小)final RecvByteBufAllocator.Handle allocHandle =AbstractNioMessageChannel.this.unsafe().recvBufAllocHandle();//跳过allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {//这里建立OP_ACCEPT阶段,所以实现比较简单int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;//对于每一个NioSocketChannel 都触发一下read事件.. 经由pipeline传播该事件pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();//调节buffer是增是减allocHandle.readComplete();//触发读取完成事件pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);//触发异常事件pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
- fireChannelRead:每一次读取都会触发一次
- fireChannelReadComplete: 最终完成的时候读取一次
如果一次TCP传输不能够传输完成,那么fireChannelRead中的数据是不能直接作用于业务的. 因为数据不全.
可以在fireChannelReadComplete进行处理.
doReadMessages
初始化一个NioSocketChannel 用来代表 服务端对端的socket (即客户端socket)
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}
pipeline 传播读取事件
pipeline.fireChannelRead(readBuf.get(i));
同理是从 head —-> tail …
那么现在pipeline上除去head和tail之外还有有多少个Context呢.
@Overridepublic void bind(Integer port) {boss = new NioEventLoopGroup(1);work = new NioEventLoopGroup(1);try {ServerBootstrap serverBootstrap = new ServerBootstrap();final ServerTransactionHandler serverTransactionHandler = new ServerTransactionHandler(this.applicationContext);serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new MessageDecoder(serialization)).addLast(new MessageEncoder(serialization)).addLast(new IdleStateHandler(20, 40, 60)).addLast(serverTransactionHandler);}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);channelFuture = serverBootstrap.bind(port).sync();} catch (Exception e) {e.printStackTrace();}}

ServerBootstrapAcceptor: 客户端链接的channel进来时通过这里的 read 配置 childOption ,随后使用 work 线程组将客户端的channel绑定到 work 的一个线程上,这里的过程和服务端的注册过程一致,少有的几个区别分别是
- 线程组由boss变成了work(意味着绑定的线程由work替换了boss)
- channel由server变成了clinet
- unsafe由NioMessageUnsafe变成了NioSocketChannelUnsafe
- pipeline的实例话过程依旧,但是流水线的节点变成了配置的以下4个.
- MessageDecoder
- MessageEncoder
- IdleStateHandler
- serverTransactionHandler
核心关注的莫过于 initChannel 方法在何时被调用.
服务启动
pipeline 传播读取完成事件
到这一步其实没有什么区别了. 只不过触发的事件不一致而已. 可能出现的错误也在上边进行了描述.
