server 端收到请求请求并且进入 unsafe.read 后,开始对channel进行处理,这里主要描述对 OP_ACCEEPT 的处理.

  1. ![image.png](https://cdn.nlark.com/yuque/0/2020/png/191132/1608991963596-e9d78cb0-9093-41bf-9012-09578a3b159d.png#align=left&display=inline&height=541&margin=%5Bobject%20Object%5D&name=image.png&originHeight=1082&originWidth=1078&size=74441&status=done&style=none&width=539)<br />这里的 `unsafe` 还是 `NioServerSocketChannel` 中引用的 `unsafe` . <br />`unsafe` 对应关系
  • NioServerSocketChannel ==> NioMessageUnsafe
  • NioSocketChannel ==> NioSocketChannelUnsafe(NioByteUnsafe)

说明,服务端启动时,accept到的数据是在NioMessageUnsafe中进行处理的,而对应的客户端channel则是由NioSocketChannelUnsafe(NioByteUnsafe) 来进行读取处理.

读取细节

  1. private final class NioMessageUnsafe extends AbstractNioUnsafe {
  2. private final List<Object> readBuf = new ArrayList<Object>();
  3. @Override
  4. public void read() {
  5. //确保是在同一个线程进行处理
  6. assert eventLoop().inEventLoop();
  7. //获取NioServerSocketChannel启动的配置,配置也很简单
  8. final ChannelConfig config = AbstractNioMessageChannel.this.config();
  9. //获取NioServerSocketChannel对应的pipeline
  10. final ChannelPipeline pipeline = AbstractNioMessageChannel.this.pipeline();
  11. //一个配置信息(动态调节buffer的大小)
  12. final RecvByteBufAllocator.Handle allocHandle =
  13. AbstractNioMessageChannel.this.unsafe().recvBufAllocHandle();
  14. //跳过
  15. allocHandle.reset(config);
  16. boolean closed = false;
  17. Throwable exception = null;
  18. try {
  19. try {
  20. do {
  21. //这里建立OP_ACCEPT阶段,所以实现比较简单
  22. int localRead = doReadMessages(readBuf);
  23. if (localRead == 0) {
  24. break;
  25. }
  26. if (localRead < 0) {
  27. closed = true;
  28. break;
  29. }
  30. allocHandle.incMessagesRead(localRead);
  31. } while (allocHandle.continueReading());
  32. } catch (Throwable t) {
  33. exception = t;
  34. }
  35. int size = readBuf.size();
  36. for (int i = 0; i < size; i ++) {
  37. readPending = false;
  38. //对于每一个NioSocketChannel 都触发一下read事件.. 经由pipeline传播该事件
  39. pipeline.fireChannelRead(readBuf.get(i));
  40. }
  41. readBuf.clear();
  42. //调节buffer是增是减
  43. allocHandle.readComplete();
  44. //触发读取完成事件
  45. pipeline.fireChannelReadComplete();
  46. if (exception != null) {
  47. closed = closeOnReadError(exception);
  48. //触发异常事件
  49. pipeline.fireExceptionCaught(exception);
  50. }
  51. if (closed) {
  52. inputShutdown = true;
  53. if (isOpen()) {
  54. close(voidPromise());
  55. }
  56. }
  57. } finally {
  58. // Check if there is a readPending which was not processed yet.
  59. // This could be for two reasons:
  60. // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
  61. // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
  62. //
  63. // See https://github.com/netty/netty/issues/2254
  64. if (!readPending && !config.isAutoRead()) {
  65. removeReadOp();
  66. }
  67. }
  68. }
  69. }
  • fireChannelRead:每一次读取都会触发一次
  • fireChannelReadComplete: 最终完成的时候读取一次

如果一次TCP传输不能够传输完成,那么fireChannelRead中的数据是不能直接作用于业务的. 因为数据不全.
可以在fireChannelReadComplete进行处理.

doReadMessages

初始化一个NioSocketChannel 用来代表 服务端对端的socket (即客户端socket)

  1. @Override
  2. protected int doReadMessages(List<Object> buf) throws Exception {
  3. SocketChannel ch = SocketUtils.accept(javaChannel());
  4. try {
  5. if (ch != null) {
  6. buf.add(new NioSocketChannel(this, ch));
  7. return 1;
  8. }
  9. } catch (Throwable t) {
  10. logger.warn("Failed to create a new channel from an accepted socket.", t);
  11. try {
  12. ch.close();
  13. } catch (Throwable t2) {
  14. logger.warn("Failed to close a socket.", t2);
  15. }
  16. }
  17. return 0;
  18. }

pipeline 传播读取事件

  1. pipeline.fireChannelRead(readBuf.get(i));

同理是从 head —-> tail
那么现在pipeline上除去head和tail之外还有有多少个Context呢.

  1. @Override
  2. public void bind(Integer port) {
  3. boss = new NioEventLoopGroup(1);
  4. work = new NioEventLoopGroup(1);
  5. try {
  6. ServerBootstrap serverBootstrap = new ServerBootstrap();
  7. final ServerTransactionHandler serverTransactionHandler = new ServerTransactionHandler(this.applicationContext);
  8. serverBootstrap.group(boss, work)
  9. .channel(NioServerSocketChannel.class)
  10. .handler(new LoggingHandler())
  11. .childHandler(new ChannelInitializer<SocketChannel>() {
  12. @Override
  13. public void initChannel(SocketChannel ch) {
  14. ch.pipeline()
  15. .addLast(new MessageDecoder(serialization))
  16. .addLast(new MessageEncoder(serialization))
  17. .addLast(new IdleStateHandler(20, 40, 60))
  18. .addLast(serverTransactionHandler);
  19. }
  20. }).option(ChannelOption.SO_BACKLOG, 128)
  21. .childOption(ChannelOption.SO_KEEPALIVE, true);
  22. channelFuture = serverBootstrap.bind(port).sync();
  23. } catch (Exception e) {
  24. e.printStackTrace();
  25. }
  26. }

image.png
ServerBootstrapAcceptor: 客户端链接的channel进来时通过这里的 read 配置 childOption ,随后使用 work 线程组将客户端的channel绑定到 work 的一个线程上,这里的过程和服务端的注册过程一致,少有的几个区别分别是

  • 线程组由boss变成了work(意味着绑定的线程由work替换了boss)
  • channel由server变成了clinet
  • unsafe由NioMessageUnsafe变成了NioSocketChannelUnsafe
  • pipeline的实例话过程依旧,但是流水线的节点变成了配置的以下4个.
    • MessageDecoder
    • MessageEncoder
    • IdleStateHandler
    • serverTransactionHandler

核心关注的莫过于 initChannel 方法在何时被调用.
服务启动

pipeline 传播读取完成事件

到这一步其实没有什么区别了. 只不过触发的事件不一致而已. 可能出现的错误也在上边进行了描述.