在 server
端收到请求请求并且进入 unsafe.read
后,开始对channel进行处理,这里主要描述对 OP_ACCEEPT
的处理.
<br />这里的 `unsafe` 还是 `NioServerSocketChannel` 中引用的 `unsafe` . <br />`unsafe` 对应关系
- NioServerSocketChannel ==> NioMessageUnsafe
- NioSocketChannel ==> NioSocketChannelUnsafe(NioByteUnsafe)
说明,服务端启动时,accept到的数据是在NioMessageUnsafe中进行处理的,而对应的客户端channel则是由NioSocketChannelUnsafe(NioByteUnsafe) 来进行读取处理.
读取细节
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
//确保是在同一个线程进行处理
assert eventLoop().inEventLoop();
//获取NioServerSocketChannel启动的配置,配置也很简单
final ChannelConfig config = AbstractNioMessageChannel.this.config();
//获取NioServerSocketChannel对应的pipeline
final ChannelPipeline pipeline = AbstractNioMessageChannel.this.pipeline();
//一个配置信息(动态调节buffer的大小)
final RecvByteBufAllocator.Handle allocHandle =
AbstractNioMessageChannel.this.unsafe().recvBufAllocHandle();
//跳过
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//这里建立OP_ACCEPT阶段,所以实现比较简单
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//对于每一个NioSocketChannel 都触发一下read事件.. 经由pipeline传播该事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
//调节buffer是增是减
allocHandle.readComplete();
//触发读取完成事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
//触发异常事件
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
- fireChannelRead:每一次读取都会触发一次
- fireChannelReadComplete: 最终完成的时候读取一次
如果一次TCP传输不能够传输完成,那么fireChannelRead中的数据是不能直接作用于业务的. 因为数据不全.
可以在fireChannelReadComplete进行处理.
doReadMessages
初始化一个NioSocketChannel 用来代表 服务端对端的socket
(即客户端socket)
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
pipeline 传播读取事件
pipeline.fireChannelRead(readBuf.get(i));
同理是从 head
—-> tail
…
那么现在pipeline上除去head和tail之外还有有多少个Context呢.
@Override
public void bind(Integer port) {
boss = new NioEventLoopGroup(1);
work = new NioEventLoopGroup(1);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
final ServerTransactionHandler serverTransactionHandler = new ServerTransactionHandler(this.applicationContext);
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new MessageDecoder(serialization))
.addLast(new MessageEncoder(serialization))
.addLast(new IdleStateHandler(20, 40, 60))
.addLast(serverTransactionHandler);
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
channelFuture = serverBootstrap.bind(port).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
ServerBootstrapAcceptor: 客户端链接的channel进来时通过这里的 read
配置 childOption
,随后使用 work
线程组将客户端的channel绑定到 work
的一个线程上,这里的过程和服务端的注册过程一致,少有的几个区别分别是
- 线程组由boss变成了work(意味着绑定的线程由work替换了boss)
- channel由server变成了clinet
- unsafe由NioMessageUnsafe变成了NioSocketChannelUnsafe
- pipeline的实例话过程依旧,但是流水线的节点变成了配置的以下4个.
- MessageDecoder
- MessageEncoder
- IdleStateHandler
- serverTransactionHandler
核心关注的莫过于 initChannel
方法在何时被调用.
服务启动
pipeline 传播读取完成事件
到这一步其实没有什么区别了. 只不过触发的事件不一致而已. 可能出现的错误也在上边进行了描述.