title: Netty04-优化与源码
top: false
cover: true
author: 张文军
date: 2021-04-17 05:36:46
tags:

  • 网络
  • netty
    category:
  • netty
  • 网络
    summary: Netty04-优化与源码

Java快速开发学习

锁清秋

四. 优化与源码

1. 优化

1.1 扩展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下

  1. // 反序列化
  2. byte[] body = new byte[bodyLength];
  3. byteByf.readBytes(body);
  4. ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
  5. Message message = (Message) in.readObject();
  6. message.setSequenceId(sequenceId);
  7. // 序列化
  8. ByteArrayOutputStream out = new ByteArrayOutputStream();
  9. new ObjectOutputStream(out).writeObject(message);
  10. byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口

  1. public interface Serializer {
  2. // 反序列化方法
  3. <T> T deserialize(Class<T> clazz, byte[] bytes);
  4. // 序列化方法
  5. <T> byte[] serialize(T object);
  6. }

提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中

  1. enum SerializerAlgorithm implements Serializer {
  2. // Java 实现
  3. Java {
  4. @Override
  5. public <T> T deserialize(Class<T> clazz, byte[] bytes) {
  6. try {
  7. ObjectInputStream in =
  8. new ObjectInputStream(new ByteArrayInputStream(bytes));
  9. Object object = in.readObject();
  10. return (T) object;
  11. } catch (IOException | ClassNotFoundException e) {
  12. throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);
  13. }
  14. }
  15. @Override
  16. public <T> byte[] serialize(T object) {
  17. try {
  18. ByteArrayOutputStream out = new ByteArrayOutputStream();
  19. new ObjectOutputStream(out).writeObject(object);
  20. return out.toByteArray();
  21. } catch (IOException e) {
  22. throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);
  23. }
  24. }
  25. },
  26. // Json 实现(引入了 Gson 依赖)
  27. Json {
  28. @Override
  29. public <T> T deserialize(Class<T> clazz, byte[] bytes) {
  30. return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
  31. }
  32. @Override
  33. public <T> byte[] serialize(T object) {
  34. return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
  35. }
  36. };
  37. // 需要从协议的字节中得到是哪种序列化算法
  38. public static SerializerAlgorithm getByInt(int type) {
  39. SerializerAlgorithm[] array = SerializerAlgorithm.values();
  40. if (type < 0 || type > array.length - 1) {
  41. throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");
  42. }
  43. return array[type];
  44. }
  45. }

增加配置类和配置文件

  1. public abstract class Config {
  2. static Properties properties;
  3. static {
  4. try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
  5. properties = new Properties();
  6. properties.load(in);
  7. } catch (IOException e) {
  8. throw new ExceptionInInitializerError(e);
  9. }
  10. }
  11. public static int getServerPort() {
  12. String value = properties.getProperty("server.port");
  13. if(value == null) {
  14. return 8080;
  15. } else {
  16. return Integer.parseInt(value);
  17. }
  18. }
  19. public static Serializer.Algorithm getSerializerAlgorithm() {
  20. String value = properties.getProperty("serializer.algorithm");
  21. if(value == null) {
  22. return Serializer.Algorithm.Java;
  23. } else {
  24. return Serializer.Algorithm.valueOf(value);
  25. }
  26. }
  27. }

配置文件

  1. serializer.algorithm=Json

修改编解码器

  1. /**
  2. * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
  3. */
  4. public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
  5. @Override
  6. public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
  7. ByteBuf out = ctx.alloc().buffer();
  8. // 1. 4 字节的魔数
  9. out.writeBytes(new byte[]{1, 2, 3, 4});
  10. // 2. 1 字节的版本,
  11. out.writeByte(1);
  12. // 3. 1 字节的序列化方式 jdk 0 , json 1
  13. out.writeByte(Config.getSerializerAlgorithm().ordinal());
  14. // 4. 1 字节的指令类型
  15. out.writeByte(msg.getMessageType());
  16. // 5. 4 个字节
  17. out.writeInt(msg.getSequenceId());
  18. // 无意义,对齐填充
  19. out.writeByte(0xff);
  20. // 6. 获取内容的字节数组
  21. byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
  22. // 7. 长度
  23. out.writeInt(bytes.length);
  24. // 8. 写入内容
  25. out.writeBytes(bytes);
  26. outList.add(out);
  27. }
  28. @Override
  29. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  30. int magicNum = in.readInt();
  31. byte version = in.readByte();
  32. byte serializerAlgorithm = in.readByte(); // 0 或 1
  33. byte messageType = in.readByte(); // 0,1,2...
  34. int sequenceId = in.readInt();
  35. in.readByte();
  36. int length = in.readInt();
  37. byte[] bytes = new byte[length];
  38. in.readBytes(bytes, 0, length);
  39. // 找到反序列化算法
  40. Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
  41. // 确定具体消息类型
  42. Class<? extends Message> messageClass = Message.getMessageClass(messageType);
  43. Message message = algorithm.deserialize(messageClass, bytes);
  44. // log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
  45. // log.debug("{}", message);
  46. out.add(message);
  47. }
  48. }

其中确定具体消息类型,可以根据 消息类型字节 获取到对应的 消息 class

  1. @Data
  2. public abstract class Message implements Serializable {
  3. /**
  4. * 根据消息类型字节,获得对应的消息 class
  5. * @param messageType 消息类型字节
  6. * @return 消息 class
  7. */
  8. public static Class<? extends Message> getMessageClass(int messageType) {
  9. return messageClasses.get(messageType);
  10. }
  11. private int sequenceId;
  12. private int messageType;
  13. public abstract int getMessageType();
  14. public static final int LoginRequestMessage = 0;
  15. public static final int LoginResponseMessage = 1;
  16. public static final int ChatRequestMessage = 2;
  17. public static final int ChatResponseMessage = 3;
  18. public static final int GroupCreateRequestMessage = 4;
  19. public static final int GroupCreateResponseMessage = 5;
  20. public static final int GroupJoinRequestMessage = 6;
  21. public static final int GroupJoinResponseMessage = 7;
  22. public static final int GroupQuitRequestMessage = 8;
  23. public static final int GroupQuitResponseMessage = 9;
  24. public static final int GroupChatRequestMessage = 10;
  25. public static final int GroupChatResponseMessage = 11;
  26. public static final int GroupMembersRequestMessage = 12;
  27. public static final int GroupMembersResponseMessage = 13;
  28. public static final int PingMessage = 14;
  29. public static final int PongMessage = 15;
  30. private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
  31. static {
  32. messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
  33. messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
  34. messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
  35. messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
  36. messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
  37. messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
  38. messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
  39. messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
  40. messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
  41. messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
  42. messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
  43. messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
  44. messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
  45. messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
  46. }
  47. }

1.2 参数调优

1)CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal 参数
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
  • SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间
  1. @Slf4j
  2. public class TestConnectionTimeout {
  3. public static void main(String[] args) {
  4. NioEventLoopGroup group = new NioEventLoopGroup();
  5. try {
  6. Bootstrap bootstrap = new Bootstrap()
  7. .group(group)
  8. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
  9. .channel(NioSocketChannel.class)
  10. .handler(new LoggingHandler());
  11. ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
  12. future.sync().channel().closeFuture().sync(); // 断点1
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. log.debug("timeout");
  16. } finally {
  17. group.shutdownGracefully();
  18. }
  19. }
  20. }

另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

  1. @Override
  2. public final void connect(
  3. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  4. // ...
  5. // Schedule connect timeout.
  6. int connectTimeoutMillis = config().getConnectTimeoutMillis();
  7. if (connectTimeoutMillis > 0) {
  8. connectTimeoutFuture = eventLoop().schedule(new Runnable() {
  9. @Override
  10. public void run() {
  11. ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
  12. ConnectTimeoutException cause =
  13. new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2
  14. if (connectPromise != null && connectPromise.tryFailure(cause)) {
  15. close(voidPromise());
  16. }
  17. }
  18. }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
  19. }
  20. // ...
  21. }

2)SO_BACKLOG

  • 属于 ServerSocketChannal 参数 ``` sequenceDiagram

participant c as client participant s as server participant sq as syns queue participant aq as accept queue

s ->> s : bind() s ->> s : listen() c ->> c : connect() c ->> s : 1. SYN Note left of c : SYN_SEND s ->> sq : put Note right of s : SYN_RCVD s ->> c : 2. SYN + ACK Note left of c : ESTABLISHED c ->> s : 3. ACK sq ->> aq : put Note right of s : ESTABLISHED aq —>> s : s ->> s : accept()

  1. 1. 第一次握手,client 发送 SYN server,状态修改为 SYN_SENDserver 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 1. 第二次握手,server 回复 SYN + ACK clientclient 收到,状态改变为 ESTABLISHED,并发送 ACK server
  3. 1. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
  4. 其中
  5. -
  6. linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
  7. -
  8. sync queue - 半连接队列
  9. - 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 `syncookies` 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  10. -
  11. accept queue - 全连接队列
  12. - 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
  13. - 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
  14. netty 中<br />可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小<br />可以通过下面源码查看默认大小

public class DefaultServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig {

  1. private volatile int backlog = NetUtil.SOMAXCONN;
  2. // ...

}

  1. 课堂调试关键断点为:`io.netty.channel.nio.NioEventLoop#processSelectedKey`<br />oio 中更容易说明,不用 debug 模式

public class Server { public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(8888, 2); Socket accept = ss.accept(); System.out.println(accept); System.in.read(); } }

  1. 客户端启动 4

public class Client { public static void main(String[] args) throws IOException { try { Socket s = new Socket(); System.out.println(new Date()+” connecting…”); s.connect(new InetSocketAddress(“localhost”, 8888),1000); System.out.println(new Date()+” connected…”); s.getOutputStream().write(1); System.in.read(); } catch (IOException e) { System.out.println(new Date()+” connecting timeout…”); e.printStackTrace(); } } }

  1. 123 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue

Tue Apr 21 20:30:28 CST 2020 connecting… Tue Apr 21 20:30:28 CST 2020 connected…

  1. 4 个客户端连接时

Tue Apr 21 20:53:58 CST 2020 connecting… Tue Apr 21 20:53:59 CST 2020 connecting timeout… java.net.SocketTimeoutException: connect timed out

  1. #### 3)ulimit -n
  2. - 属于操作系统参数
  3. #### 4)TCP_NODELAY
  4. - 属于 SocketChannal 参数
  5. #### 5)SO_SNDBUF & SO_RCVBUF
  6. - SO_SNDBUF 属于 SocketChannal 参数
  7. - SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
  8. #### 6)ALLOCATOR
  9. - 属于 SocketChannal 参数
  10. - 用来分配 ByteBuf ctx.alloc()
  11. #### 7)RCVBUF_ALLOCATOR
  12. - 属于 SocketChannal 参数
  13. - 控制 netty 接收缓冲区大小
  14. - 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
  15. ### 1.3 RPC 框架
  16. #### 1)准备工作
  17. 这些代码可以认为是现成的,无需从头编写练习<br />为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

@Data public abstract class Message implements Serializable {

  1. // 省略旧的代码
  2. public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
  3. public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
  4. static {
  5. // ...
  6. messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
  7. messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
  8. }

}

  1. 请求消息

@Getter @ToString(callSuper = true) public class RpcRequestMessage extends Message {

  1. /**
  2. * 调用的接口全限定名,服务端根据它找到实现
  3. */
  4. private String interfaceName;
  5. /**
  6. * 调用接口中的方法名
  7. */
  8. private String methodName;
  9. /**
  10. * 方法返回类型
  11. */
  12. private Class<?> returnType;
  13. /**
  14. * 方法参数类型数组
  15. */
  16. private Class[] parameterTypes;
  17. /**
  18. * 方法参数值数组
  19. */
  20. private Object[] parameterValue;
  21. public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
  22. super.setSequenceId(sequenceId);
  23. this.interfaceName = interfaceName;
  24. this.methodName = methodName;
  25. this.returnType = returnType;
  26. this.parameterTypes = parameterTypes;
  27. this.parameterValue = parameterValue;
  28. }
  29. @Override
  30. public int getMessageType() {
  31. return RPC_MESSAGE_TYPE_REQUEST;
  32. }

}

  1. 响应消息

@Data @ToString(callSuper = true) public class RpcResponseMessage extends Message { /**

  1. * 返回值
  2. */
  3. private Object returnValue;
  4. /**
  5. * 异常值
  6. */
  7. private Exception exceptionValue;
  8. @Override
  9. public int getMessageType() {
  10. return RPC_MESSAGE_TYPE_RESPONSE;
  11. }

}

  1. 服务器架子

@Slf4j public class RpcServer { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();

  1. // rpc 请求消息处理器,待实现
  2. RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
  3. try {
  4. ServerBootstrap serverBootstrap = new ServerBootstrap();
  5. serverBootstrap.channel(NioServerSocketChannel.class);
  6. serverBootstrap.group(boss, worker);
  7. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  8. @Override
  9. protected void initChannel(SocketChannel ch) throws Exception {
  10. ch.pipeline().addLast(new ProcotolFrameDecoder());
  11. ch.pipeline().addLast(LOGGING_HANDLER);
  12. ch.pipeline().addLast(MESSAGE_CODEC);
  13. ch.pipeline().addLast(RPC_HANDLER);
  14. }
  15. });
  16. Channel channel = serverBootstrap.bind(8080).sync().channel();
  17. channel.closeFuture().sync();
  18. } catch (InterruptedException e) {
  19. log.error("server error", e);
  20. } finally {
  21. boss.shutdownGracefully();
  22. worker.shutdownGracefully();
  23. }
  24. }

}

  1. 客户端架子

public class RpcClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();

  1. // rpc 响应消息处理器,待实现
  2. RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
  3. try {
  4. Bootstrap bootstrap = new Bootstrap();
  5. bootstrap.channel(NioSocketChannel.class);
  6. bootstrap.group(group);
  7. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  8. @Override
  9. protected void initChannel(SocketChannel ch) throws Exception {
  10. ch.pipeline().addLast(new ProcotolFrameDecoder());
  11. ch.pipeline().addLast(LOGGING_HANDLER);
  12. ch.pipeline().addLast(MESSAGE_CODEC);
  13. ch.pipeline().addLast(RPC_HANDLER);
  14. }
  15. });
  16. Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
  17. channel.closeFuture().sync();
  18. } catch (Exception e) {
  19. log.error("client error", e);
  20. } finally {
  21. group.shutdownGracefully();
  22. }
  23. }

}

  1. 服务器端的 service 获取

public class ServicesFactory {

  1. static Properties properties;
  2. static Map<Class<?>, Object> map = new ConcurrentHashMap<>();
  3. static {
  4. try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
  5. properties = new Properties();
  6. properties.load(in);
  7. Set<String> names = properties.stringPropertyNames();
  8. for (String name : names) {
  9. if (name.endsWith("Service")) {
  10. Class<?> interfaceClass = Class.forName(name);
  11. Class<?> instanceClass = Class.forName(properties.getProperty(name));
  12. map.put(interfaceClass, instanceClass.newInstance());
  13. }
  14. }
  15. } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
  16. throw new ExceptionInInitializerError(e);
  17. }
  18. }
  19. public static <T> T getService(Class<T> interfaceClass) {
  20. return (T) map.get(interfaceClass);
  21. }

}

  1. 相关配置 application.properties

serializer.algorithm=Json cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

  1. #### 2)服务器 handler

@Slf4j @ChannelHandler.Sharable public class RpcRequestMessageHandler extends SimpleChannelInboundHandler {

  1. @Override
  2. protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
  3. RpcResponseMessage response = new RpcResponseMessage();
  4. response.setSequenceId(message.getSequenceId());
  5. try {
  6. // 获取真正的实现对象
  7. HelloService service = (HelloService)
  8. ServicesFactory.getService(Class.forName(message.getInterfaceName()));
  9. // 获取要调用的方法
  10. Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
  11. // 调用方法
  12. Object invoke = method.invoke(service, message.getParameterValue());
  13. // 调用成功
  14. response.setReturnValue(invoke);
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. // 调用异常
  18. response.setExceptionValue(e);
  19. }
  20. // 返回结果
  21. ctx.writeAndFlush(response);
  22. }

}

  1. #### 3)客户端代码第一版
  2. 只发消息

@Slf4j public class RpcClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect(“localhost”, 8080).sync().channel();

  1. ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
  2. 1,
  3. "cn.itcast.server.service.HelloService",
  4. "sayHello",
  5. String.class,
  6. new Class[]{String.class},
  7. new Object[]{"张三"}
  8. )).addListener(promise -> {
  9. if (!promise.isSuccess()) {
  10. Throwable cause = promise.cause();
  11. log.error("error", cause);
  12. }
  13. });
  14. channel.closeFuture().sync();
  15. } catch (Exception e) {
  16. log.error("client error", e);
  17. } finally {
  18. group.shutdownGracefully();
  19. }
  20. }

}

  1. #### 4)客户端 handler 第一版

@Slf4j @ChannelHandler.Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug(“{}”, msg); } }

  1. #### 5)客户端代码 第二版
  2. 包括 channel 管理,代理,接收结果

@Slf4j public class RpcClientManager {

  1. public static void main(String[] args) {
  2. HelloService service = getProxyService(HelloService.class);
  3. System.out.println(service.sayHello("zhangsan"));

// System.out.println(service.sayHello(“lisi”)); // System.out.println(service.sayHello(“wangwu”)); }

  1. // 创建代理类
  2. public static <T> T getProxyService(Class<T> serviceClass) {
  3. ClassLoader loader = serviceClass.getClassLoader();
  4. Class<?>[] interfaces = new Class[]{serviceClass};
  5. // sayHello "张三"
  6. Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
  7. // 1. 将方法调用转换为 消息对象
  8. int sequenceId = SequenceIdGenerator.nextId();
  9. RpcRequestMessage msg = new RpcRequestMessage(
  10. sequenceId,
  11. serviceClass.getName(),
  12. method.getName(),
  13. method.getReturnType(),
  14. method.getParameterTypes(),
  15. args
  16. );
  17. // 2. 将消息对象发送出去
  18. getChannel().writeAndFlush(msg);
  19. // 3. 准备一个空 Promise 对象,来接收结果 指定 promise 对象异步接收结果线程
  20. DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
  21. RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);

// promise.addListener(future -> { // // 线程 // });

  1. // 4. 等待 promise 结果
  2. promise.await();
  3. if(promise.isSuccess()) {
  4. // 调用正常
  5. return promise.getNow();
  6. } else {
  7. // 调用失败
  8. throw new RuntimeException(promise.cause());
  9. }
  10. });
  11. return (T) o;
  12. }
  13. private static Channel channel = null;
  14. private static final Object LOCK = new Object();
  15. // 获取唯一的 channel 对象
  16. public static Channel getChannel() {
  17. if (channel != null) {
  18. return channel;
  19. }
  20. synchronized (LOCK) { // t2
  21. if (channel != null) { // t1
  22. return channel;
  23. }
  24. initChannel();
  25. return channel;
  26. }
  27. }
  28. // 初始化 channel 方法
  29. private static void initChannel() {
  30. NioEventLoopGroup group = new NioEventLoopGroup();
  31. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  32. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  33. RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
  34. Bootstrap bootstrap = new Bootstrap();
  35. bootstrap.channel(NioSocketChannel.class);
  36. bootstrap.group(group);
  37. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  38. @Override
  39. protected void initChannel(SocketChannel ch) throws Exception {
  40. ch.pipeline().addLast(new ProcotolFrameDecoder());
  41. ch.pipeline().addLast(LOGGING_HANDLER);
  42. ch.pipeline().addLast(MESSAGE_CODEC);
  43. ch.pipeline().addLast(RPC_HANDLER);
  44. }
  45. });
  46. try {
  47. channel = bootstrap.connect("localhost", 8080).sync().channel();
  48. channel.closeFuture().addListener(future -> {
  49. group.shutdownGracefully();
  50. });
  51. } catch (Exception e) {
  52. log.error("client error", e);
  53. }
  54. }

}

  1. #### 6)客户端 handler 第二版

@Slf4j @ChannelHandler.Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandler {

  1. // 序号 用来接收结果的 promise 对象
  2. public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
  3. @Override
  4. protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
  5. log.debug("{}", msg);
  6. // 拿到空的 promise
  7. Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
  8. if (promise != null) {
  9. Object returnValue = msg.getReturnValue();
  10. Exception exceptionValue = msg.getExceptionValue();
  11. if(exceptionValue != null) {
  12. promise.setFailure(exceptionValue);
  13. } else {
  14. promise.setSuccess(returnValue);
  15. }
  16. }
  17. }

}

  1. ## 2. 源码分析
  2. ### 2.1 启动剖析
  3. ![image-20210417010353009](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702025800.png)<br />![image-20210417011111756](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702025801.png)<br />我们就来看看 netty 中对下面的代码是怎样进行处理的

//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector Selector selector = Selector.open();

//2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config NioServerSocketChannel attachment = new NioServerSocketChannel();

//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false);

//4 启动 nio boss 线程执行接下来的操作

//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件 SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);

//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor

//7 绑定端口 serverSocketChannel.bind(new InetSocketAddress(8080));

//8 触发 channel active 事件,在 head 中关注 op_accept 事件 selectionKey.interestOps(SelectionKey.OP_ACCEPT);

  1. 入口 `io.netty.bootstrap.ServerBootstrap#bind`<br />关键代码 `io.netty.bootstrap.AbstractBootstrap#doBind`

private ChannelFuture doBind(final SocketAddress localAddress) { // 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }

  1. // 2. 因为是 initAndRegister 异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分
  2. // 2.1 如果已经完成
  3. if (regFuture.isDone()) {
  4. ChannelPromise promise = channel.newPromise();
  5. // 3.1 立刻调用 doBind0
  6. doBind0(regFuture, channel, localAddress, promise);
  7. return promise;
  8. }
  9. // 2.2 还没有完成
  10. else {
  11. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
  12. // 3.2 回调 doBind0
  13. regFuture.addListener(new ChannelFutureListener() {
  14. @Override
  15. public void operationComplete(ChannelFuture future) throws Exception {
  16. Throwable cause = future.cause();
  17. if (cause != null) {
  18. // 处理异常...
  19. promise.setFailure(cause);
  20. } else {
  21. promise.registered();
  22. // 3. 由注册线程去执行 doBind0
  23. doBind0(regFuture, channel, localAddress, promise);
  24. }
  25. }
  26. });
  27. return promise;
  28. }

}

  1. 关键代码 `io.netty.bootstrap.AbstractBootstrap#initAndRegister`

final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); // 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer init(channel); } catch (Throwable t) { // 处理异常… return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); }

  1. // 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上
  2. ChannelFuture regFuture = config().group().register(channel);
  3. if (regFuture.cause() != null) {
  4. // 处理异常...
  5. }
  6. return regFuture;

}

  1. 关键代码 `io.netty.bootstrap.ServerBootstrap#init`

// 这里 channel 实际上是 NioServerSocketChannel void init(Channel channel) throws Exception { final Map, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); }

  1. final Map<AttributeKey<?>, Object> attrs = attrs0();
  2. synchronized (attrs) {
  3. for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
  4. @SuppressWarnings("unchecked")
  5. AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
  6. channel.attr(key).set(e.getValue());
  7. }
  8. }
  9. ChannelPipeline p = channel.pipeline();
  10. final EventLoopGroup currentChildGroup = childGroup;
  11. final ChannelHandler currentChildHandler = childHandler;
  12. final Entry<ChannelOption<?>, Object>[] currentChildOptions;
  13. final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
  14. synchronized (childOptions) {
  15. currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
  16. }
  17. synchronized (childAttrs) {
  18. currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
  19. }
  20. // 为 NioServerSocketChannel 添加初始化器
  21. p.addLast(new ChannelInitializer<Channel>() {
  22. @Override
  23. public void initChannel(final Channel ch) throws Exception {
  24. final ChannelPipeline pipeline = ch.pipeline();
  25. ChannelHandler handler = config.handler();
  26. if (handler != null) {
  27. pipeline.addLast(handler);
  28. }
  29. // 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
  30. ch.eventLoop().execute(new Runnable() {
  31. @Override
  32. public void run() {
  33. pipeline.addLast(new ServerBootstrapAcceptor(
  34. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  35. }
  36. });
  37. }
  38. });

}

  1. 关键代码 `io.netty.channel.AbstractChannel.AbstractUnsafe#register`

public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 一些检查,略…

  1. AbstractChannel.this.eventLoop = eventLoop;
  2. if (eventLoop.inEventLoop()) {
  3. register0(promise);
  4. } else {
  5. try {
  6. // 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行
  7. // 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程
  8. // 这行代码完成的事实是 main -> nio boss 线程的切换
  9. eventLoop.execute(new Runnable() {
  10. @Override
  11. public void run() {
  12. register0(promise);
  13. }
  14. });
  15. } catch (Throwable t) {
  16. // 日志记录...
  17. closeForcibly();
  18. closeFuture.setClosed();
  19. safeSetFailure(promise, t);
  20. }
  21. }

}

  1. `io.netty.channel.AbstractChannel.AbstractUnsafe#register0`

private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 1.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel doRegister(); neverRegistered = false; registered = true;

  1. // 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel
  2. pipeline.invokeHandlerAddedIfNeeded();
  3. // 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0
  4. safeSetSuccess(promise);
  5. pipeline.fireChannelRegistered();
  6. // 对应 server socket channel 还未绑定,isActive 为 false
  7. if (isActive()) {
  8. if (firstRegistration) {
  9. pipeline.fireChannelActive();
  10. } else if (config().isAutoRead()) {
  11. beginRead();
  12. }
  13. }
  14. } catch (Throwable t) {
  15. // Close the channel directly to avoid FD leak.
  16. closeForcibly();
  17. closeFuture.setClosed();
  18. safeSetFailure(promise, t);
  19. }

}

  1. 关键代码 `io.netty.channel.ChannelInitializer#initChannel`

private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { // 1.2.2.1 执行初始化 initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { // 1.2.2.2 移除初始化器 ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; }

  1. 关键代码 `io.netty.bootstrap.AbstractBootstrap#doBind0`

// 3.1 或 3.2 执行 doBind0 private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {

  1. channel.eventLoop().execute(new Runnable() {
  2. @Override
  3. public void run() {
  4. if (regFuture.isSuccess()) {
  5. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  6. } else {
  7. promise.setFailure(regFuture.cause());
  8. }
  9. }
  10. });

}

  1. 关键代码 `io.netty.channel.AbstractChannel.AbstractUnsafe#bind`

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop();

  1. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  2. return;
  3. }
  4. if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
  5. localAddress instanceof InetSocketAddress &&
  6. !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
  7. !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
  8. // 记录日志...
  9. }
  10. boolean wasActive = isActive();
  11. try {
  12. // 3.3 执行端口绑定
  13. doBind(localAddress);
  14. } catch (Throwable t) {
  15. safeSetFailure(promise, t);
  16. closeIfClosed();
  17. return;
  18. }
  19. if (!wasActive && isActive()) {
  20. invokeLater(new Runnable() {
  21. @Override
  22. public void run() {
  23. // 3.4 触发 active 事件
  24. pipeline.fireChannelActive();
  25. }
  26. });
  27. }
  28. safeSetSuccess(promise);

}

  1. 3.3 关键代码 `io.netty.channel.socket.nio.NioServerSocketChannel#doBind`

protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }

  1. 3.4 关键代码 `io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive`

public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); // 触发 read (NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册) readIfIsAutoRead(); }

  1. 关键代码 `io.netty.channel.nio.AbstractNioChannel#doBeginRead`

protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; }

  1. readPending = true;
  2. final int interestOps = selectionKey.interestOps();
  3. // readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
  4. if ((interestOps & readInterestOp) == 0) {
  5. selectionKey.interestOps(interestOps | readInterestOp);
  6. }

}

  1. ### 2.2 NioEventLoop 剖析
  2. ![image-20210417104638916](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702025802.png)<br />NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),<br />提交任务代码 `io.netty.util.concurrent.SingleThreadEventExecutor#execute`

public void execute(Runnable task) { if (task == null) { throw new NullPointerException(“task”); }

  1. boolean inEventLoop = inEventLoop();
  2. // 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
  3. addTask(task);
  4. if (!inEventLoop) {
  5. // inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
  6. startThread();
  7. if (isShutdown()) {
  8. // 如果已经 shutdown,做拒绝逻辑,代码略...
  9. }
  10. }
  11. if (!addTaskWakesUp && wakesUpForTask(task)) {
  12. // 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
  13. wakeup(inEventLoop);
  14. }

}

  1. 唤醒 select 阻塞线程`io.netty.channel.nio.NioEventLoop#wakeup`

@Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false, true)) { selector.wakeup(); } }

  1. 启动 EventLoop 主循环 `io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread`

private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { // 将线程池的当前线程保存在成员变量中,以便后续使用 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }

  1. boolean success = false;
  2. updateLastExecutionTime();
  3. try {
  4. // 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下
  5. SingleThreadEventExecutor.this.run();
  6. success = true;
  7. } catch (Throwable t) {
  8. logger.warn("Unexpected exception from an event executor: ", t);
  9. } finally {
  10. // 清理工作,代码略...
  11. }
  12. }
  13. });

}

  1. `io.netty.channel.nio.NioEventLoop#run` 主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件

protected void run() { for (;;) { try { try { // calculateStrategy 的逻辑如下: // 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch // 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue;

  1. case SelectStrategy.BUSY_WAIT:
  2. case SelectStrategy.SELECT:
  3. // 因为 IO 线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒
  4. // 进行 select 阻塞,并设置唤醒状态为 false
  5. boolean oldWakenUp = wakenUp.getAndSet(false);
  6. // 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup
  7. // 下面的 select 方法不会阻塞
  8. // 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?
  9. // 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时
  10. // 才能执行,让 select 方法无谓阻塞
  11. select(oldWakenUp);
  12. if (wakenUp.get()) {
  13. selector.wakeup();
  14. }
  15. default:
  16. }
  17. } catch (IOException e) {
  18. rebuildSelector0();
  19. handleLoopException(e);
  20. continue;
  21. }
  22. cancelledKeys = 0;
  23. needsToSelectAgain = false;
  24. // ioRatio 默认是 50
  25. final int ioRatio = this.ioRatio;
  26. if (ioRatio == 100) {
  27. try {
  28. processSelectedKeys();
  29. } finally {
  30. // ioRatio 为 100 时,总是运行完所有非 IO 任务
  31. runAllTasks();
  32. }
  33. } else {
  34. final long ioStartTime = System.nanoTime();
  35. try {
  36. processSelectedKeys();
  37. } finally {
  38. // 记录 io 事件处理耗时
  39. final long ioTime = System.nanoTime() - ioStartTime;
  40. // 运行非 IO 任务,一旦超时会退出 runAllTasks
  41. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  42. }
  43. }
  44. } catch (Throwable t) {
  45. handleLoopException(t);
  46. }
  47. try {
  48. if (isShuttingDown()) {
  49. closeAll();
  50. if (confirmShutdown()) {
  51. return;
  52. }
  53. }
  54. } catch (Throwable t) {
  55. handleLoopException(t);
  56. }
  57. }

}

  1. #### ⚠️ 注意
  2. > 这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:
  3. > - 由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
  4. > - EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作
  5. 参考下图
  6. ![](../../../../JAVA_STUDY_FILES/百度网盘/Netty教程源码资料/讲义/Netty-讲义/img/0032.png)`io.netty.channel.nio.NioEventLoop#select`

private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); // 计算等待时间 // 没有 scheduledTask,超时时间为 1s // 有 scheduledTask,超时时间为 下一个定时任务执行时间 - 当前时间 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

  1. for (;;) {
  2. long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
  3. // 如果超时,退出循环
  4. if (timeoutMillis <= 0) {
  5. if (selectCnt == 0) {
  6. selector.selectNow();
  7. selectCnt = 1;
  8. }
  9. break;
  10. }
  11. // 如果期间又有 task 退出循环,如果没这个判断,那么任务就会等到下次 select 超时时才能被执行
  12. // wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeup
  13. if (hasTasks() && wakenUp.compareAndSet(false, true)) {
  14. selector.selectNow();
  15. selectCnt = 1;
  16. break;
  17. }
  18. // select 有限时阻塞
  19. // 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,导致不断空轮询,cpu 占用 100%
  20. int selectedKeys = selector.select(timeoutMillis);
  21. // 计数加 1
  22. selectCnt ++;
  23. // 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环
  24. if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
  25. break;
  26. }
  27. if (Thread.interrupted()) {
  28. // 线程被打断,退出循环
  29. // 记录日志
  30. selectCnt = 1;
  31. break;
  32. }
  33. long time = System.nanoTime();
  34. if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
  35. // 如果超时,计数重置为 1,下次循环就会 break
  36. selectCnt = 1;
  37. }
  38. // 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
  39. // 这是为了解决 nio 空轮询 bug
  40. else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
  41. selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
  42. // 重建 selector
  43. selector = selectRebuildSelector(selectCnt);
  44. selectCnt = 1;
  45. break;
  46. }
  47. currentTimeNanos = time;
  48. }
  49. if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
  50. // 记录日志
  51. }
  52. } catch (CancelledKeyException e) {
  53. // 记录日志
  54. }

}

  1. 处理 keys `io.netty.channel.nio.NioEventLoop#processSelectedKeys`

private void processSelectedKeys() { if (selectedKeys != null) { // 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet // SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet) processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }

  1. `io.netty.channel.nio.NioEventLoop#processSelectedKey`

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 当 key 取消或关闭时会导致这个 key 无效 if (!k.isValid()) { // 无效时处理… return; }

  1. try {
  2. int readyOps = k.readyOps();
  3. // 连接事件
  4. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
  5. int ops = k.interestOps();
  6. ops &= ~SelectionKey.OP_CONNECT;
  7. k.interestOps(ops);
  8. unsafe.finishConnect();
  9. }
  10. // 可写事件
  11. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  12. ch.unsafe().forceFlush();
  13. }
  14. // 可读或可接入事件
  15. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  16. // 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
  17. // 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
  18. unsafe.read();
  19. }
  20. } catch (CancelledKeyException ignored) {
  21. unsafe.close(unsafe.voidPromise());
  22. }

}

  1. ![image-20210417111246131](https://myblog-1258908231.cos.ap-shanghai.myqcloud.com/hexo/20210702025803.png)
  2. ### 2.3 accept 剖析
  3. nio 中如下代码,在 netty 中的流程

//1 阻塞直到事件发生 selector.select();

Iterator iter = selector.selectedKeys().iterator(); while (iter.hasNext()) {
//2 拿到一个事件 SelectionKey key = iter.next();

  1. //3 如果是 accept 事件
  2. if (key.isAcceptable()) {
  3. //4 执行 accept
  4. SocketChannel channel = serverSocketChannel.accept();
  5. channel.configureBlocking(false);
  6. //5 关注 read 事件
  7. channel.register(selector, SelectionKey.OP_READ);
  8. }
  9. // ...

}

  1. 先来看可接入事件处理(accept)<br />`io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read`

public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config);

  1. boolean closed = false;
  2. Throwable exception = null;
  3. try {
  4. try {
  5. do {
  6. // doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf
  7. // readBuf 是一个 ArrayList 用来缓存消息
  8. int localRead = doReadMessages(readBuf);
  9. if (localRead == 0) {
  10. break;
  11. }
  12. if (localRead < 0) {
  13. closed = true;
  14. break;
  15. }
  16. // localRead 为 1,就一条消息,即接收一个客户端连接
  17. allocHandle.incMessagesRead(localRead);
  18. } while (allocHandle.continueReading());
  19. } catch (Throwable t) {
  20. exception = t;
  21. }
  22. int size = readBuf.size();
  23. for (int i = 0; i < size; i ++) {
  24. readPending = false;
  25. // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理
  26. // io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
  27. pipeline.fireChannelRead(readBuf.get(i));
  28. }
  29. readBuf.clear();
  30. allocHandle.readComplete();
  31. pipeline.fireChannelReadComplete();
  32. if (exception != null) {
  33. closed = closeOnReadError(exception);
  34. pipeline.fireExceptionCaught(exception);
  35. }
  36. if (closed) {
  37. inputShutdown = true;
  38. if (isOpen()) {
  39. close(voidPromise());
  40. }
  41. }
  42. } finally {
  43. if (!readPending && !config.isAutoRead()) {
  44. removeReadOp();
  45. }
  46. }

}

  1. 关键代码 `io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead`

public void channelRead(ChannelHandlerContext ctx, Object msg) { // 这时的 msg 是 NioSocketChannel final Channel child = (Channel) msg;

  1. // NioSocketChannel 添加 childHandler 即初始化器
  2. child.pipeline().addLast(childHandler);
  3. // 设置选项
  4. setChannelOptions(child, childOptions, logger);
  5. for (Entry<AttributeKey<?>, Object> e: childAttrs) {
  6. child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
  7. }
  8. try {
  9. // 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
  10. childGroup.register(child).addListener(new ChannelFutureListener() {
  11. @Override
  12. public void operationComplete(ChannelFuture future) throws Exception {
  13. if (!future.isSuccess()) {
  14. forceClose(child, future.cause());
  15. }
  16. }
  17. });
  18. } catch (Throwable t) {
  19. forceClose(child, t);
  20. }

}

  1. 又回到了熟悉的 `io.netty.channel.AbstractChannel.AbstractUnsafe#register` 方法

public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 一些检查,略…

  1. AbstractChannel.this.eventLoop = eventLoop;
  2. if (eventLoop.inEventLoop()) {
  3. register0(promise);
  4. } else {
  5. try {
  6. // 这行代码完成的事实是 nio boss -> nio worker 线程的切换
  7. eventLoop.execute(new Runnable() {
  8. @Override
  9. public void run() {
  10. register0(promise);
  11. }
  12. });
  13. } catch (Throwable t) {
  14. // 日志记录...
  15. closeForcibly();
  16. closeFuture.setClosed();
  17. safeSetFailure(promise, t);
  18. }
  19. }

}

  1. `io.netty.channel.AbstractChannel.AbstractUnsafe#register0`

private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true;

  1. // 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
  2. pipeline.invokeHandlerAddedIfNeeded();
  3. // 执行后就是 head -> logging handler -> my handler -> tail
  4. safeSetSuccess(promise);
  5. pipeline.fireChannelRegistered();
  6. if (isActive()) {
  7. if (firstRegistration) {
  8. // 触发 pipeline 上 active 事件
  9. pipeline.fireChannelActive();
  10. } else if (config().isAutoRead()) {
  11. beginRead();
  12. }
  13. }
  14. } catch (Throwable t) {
  15. closeForcibly();
  16. closeFuture.setClosed();
  17. safeSetFailure(promise, t);
  18. }

}

  1. 回到了熟悉的代码 `io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive`

public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); // 触发 read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取) readIfIsAutoRead(); }

  1. `io.netty.channel.nio.AbstractNioChannel#doBeginRead`

protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; }

  1. readPending = true;
  2. // 这时候 interestOps 是 0
  3. final int interestOps = selectionKey.interestOps();
  4. if ((interestOps & readInterestOp) == 0) {
  5. // 关注 read 事件
  6. selectionKey.interestOps(interestOps | readInterestOp);
  7. }

}

  1. ### 2.4 read 剖析
  2. 再来看可读事件 `io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read`,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete

public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); // io.netty.allocator.type 决定 allocator 的实现 final ByteBufAllocator allocator = config.getAllocator(); // 用来分配 byteBuf,确定单次读取大小 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config);

  1. ByteBuf byteBuf = null;
  2. boolean close = false;
  3. try {
  4. do {
  5. byteBuf = allocHandle.allocate(allocator);
  6. // 读取
  7. allocHandle.lastBytesRead(doReadBytes(byteBuf));
  8. if (allocHandle.lastBytesRead() <= 0) {
  9. byteBuf.release();
  10. byteBuf = null;
  11. close = allocHandle.lastBytesRead() < 0;
  12. if (close) {
  13. readPending = false;
  14. }
  15. break;
  16. }
  17. allocHandle.incMessagesRead(1);
  18. readPending = false;
  19. // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理 NioSocketChannel 上的 handler
  20. pipeline.fireChannelRead(byteBuf);
  21. byteBuf = null;
  22. }
  23. // 是否要继续循环
  24. while (allocHandle.continueReading());
  25. allocHandle.readComplete();
  26. // 触发 read complete 事件
  27. pipeline.fireChannelReadComplete();
  28. if (close) {
  29. closeOnRead(pipeline);
  30. }
  31. } catch (Throwable t) {
  32. handleReadException(pipeline, byteBuf, t, close, allocHandle);
  33. } finally {
  34. if (!readPending && !config.isAutoRead()) {
  35. removeReadOp();
  36. }
  37. }

}

  1. `io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)`

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return // 一般为 true config.isAutoRead() && // respectMaybeMoreData 默认为 true // maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && // 小于最大次数,maxMessagePerRead 默认 16 totalMessages < maxMessagePerRead && // 实际读到了数据 totalBytesRead > 0; }

```