四. 优化与源码

1. 优化

1.1 扩展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下

  1. // 反序列化
  2. byte[] body = new byte[bodyLength];
  3. byteByf.readBytes(body);
  4. ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
  5. Message message = (Message) in.readObject();
  6. message.setSequenceId(sequenceId);
  7. // 序列化
  8. ByteArrayOutputStream out = new ByteArrayOutputStream();
  9. new ObjectOutputStream(out).writeObject(message);
  10. byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口

  1. public interface Serializer {
  2. // 反序列化方法
  3. <T> T deserialize(Class<T> clazz, byte[] bytes);
  4. // 序列化方法
  5. <T> byte[] serialize(T object);
  6. }

提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中

  1. enum SerializerAlgorithm implements Serializer {
  2. // Java 实现
  3. Java {
  4. @Override
  5. public <T> T deserialize(Class<T> clazz, byte[] bytes) {
  6. try {
  7. ObjectInputStream in =
  8. new ObjectInputStream(new ByteArrayInputStream(bytes));
  9. Object object = in.readObject();
  10. return (T) object;
  11. } catch (IOException | ClassNotFoundException e) {
  12. throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);
  13. }
  14. }
  15. @Override
  16. public <T> byte[] serialize(T object) {
  17. try {
  18. ByteArrayOutputStream out = new ByteArrayOutputStream();
  19. new ObjectOutputStream(out).writeObject(object);
  20. return out.toByteArray();
  21. } catch (IOException e) {
  22. throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);
  23. }
  24. }
  25. },
  26. // Json 实现(引入了 Gson 依赖)
  27. Json {
  28. @Override
  29. public <T> T deserialize(Class<T> clazz, byte[] bytes) {
  30. return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
  31. }
  32. @Override
  33. public <T> byte[] serialize(T object) {
  34. return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
  35. }
  36. };
  37. // 需要从协议的字节中得到是哪种序列化算法
  38. public static SerializerAlgorithm getByInt(int type) {
  39. SerializerAlgorithm[] array = SerializerAlgorithm.values();
  40. if (type < 0 || type > array.length - 1) {
  41. throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");
  42. }
  43. return array[type];
  44. }
  45. }

增加配置类和配置文件

  1. public abstract class Config {
  2. static Properties properties;
  3. static {
  4. try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
  5. properties = new Properties();
  6. properties.load(in);
  7. } catch (IOException e) {
  8. throw new ExceptionInInitializerError(e);
  9. }
  10. }
  11. public static int getServerPort() {
  12. String value = properties.getProperty("server.port");
  13. if(value == null) {
  14. return 8080;
  15. } else {
  16. return Integer.parseInt(value);
  17. }
  18. }
  19. public static Serializer.Algorithm getSerializerAlgorithm() {
  20. String value = properties.getProperty("serializer.algorithm");
  21. if(value == null) {
  22. return Serializer.Algorithm.Java;
  23. } else {
  24. return Serializer.Algorithm.valueOf(value);
  25. }
  26. }
  27. }

配置文件

  1. serializer.algorithm=Json

修改编解码器

  1. /**
  2. * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
  3. */
  4. public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
  5. @Override
  6. public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
  7. ByteBuf out = ctx.alloc().buffer();
  8. // 1. 4 字节的魔数
  9. out.writeBytes(new byte[]{1, 2, 3, 4});
  10. // 2. 1 字节的版本,
  11. out.writeByte(1);
  12. // 3. 1 字节的序列化方式 jdk 0 , json 1
  13. out.writeByte(Config.getSerializerAlgorithm().ordinal());
  14. // 4. 1 字节的指令类型
  15. out.writeByte(msg.getMessageType());
  16. // 5. 4 个字节
  17. out.writeInt(msg.getSequenceId());
  18. // 无意义,对齐填充
  19. out.writeByte(0xff);
  20. // 6. 获取内容的字节数组
  21. byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
  22. // 7. 长度
  23. out.writeInt(bytes.length);
  24. // 8. 写入内容
  25. out.writeBytes(bytes);
  26. outList.add(out);
  27. }
  28. @Override
  29. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  30. int magicNum = in.readInt();
  31. byte version = in.readByte();
  32. byte serializerAlgorithm = in.readByte(); // 0 或 1
  33. byte messageType = in.readByte(); // 0,1,2...
  34. int sequenceId = in.readInt();
  35. in.readByte();
  36. int length = in.readInt();
  37. byte[] bytes = new byte[length];
  38. in.readBytes(bytes, 0, length);
  39. // 找到反序列化算法
  40. Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
  41. // 确定具体消息类型
  42. Class<? extends Message> messageClass = Message.getMessageClass(messageType);
  43. Message message = algorithm.deserialize(messageClass, bytes);
  44. // log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
  45. // log.debug("{}", message);
  46. out.add(message);
  47. }
  48. }

其中确定具体消息类型,可以根据 消息类型字节 获取到对应的 消息 class

  1. @Data
  2. public abstract class Message implements Serializable {
  3. /**
  4. * 根据消息类型字节,获得对应的消息 class
  5. * @param messageType 消息类型字节
  6. * @return 消息 class
  7. */
  8. public static Class<? extends Message> getMessageClass(int messageType) {
  9. return messageClasses.get(messageType);
  10. }
  11. private int sequenceId;
  12. private int messageType;
  13. public abstract int getMessageType();
  14. public static final int LoginRequestMessage = 0;
  15. public static final int LoginResponseMessage = 1;
  16. public static final int ChatRequestMessage = 2;
  17. public static final int ChatResponseMessage = 3;
  18. public static final int GroupCreateRequestMessage = 4;
  19. public static final int GroupCreateResponseMessage = 5;
  20. public static final int GroupJoinRequestMessage = 6;
  21. public static final int GroupJoinResponseMessage = 7;
  22. public static final int GroupQuitRequestMessage = 8;
  23. public static final int GroupQuitResponseMessage = 9;
  24. public static final int GroupChatRequestMessage = 10;
  25. public static final int GroupChatResponseMessage = 11;
  26. public static final int GroupMembersRequestMessage = 12;
  27. public static final int GroupMembersResponseMessage = 13;
  28. public static final int PingMessage = 14;
  29. public static final int PongMessage = 15;
  30. private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
  31. static {
  32. messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
  33. messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
  34. messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
  35. messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
  36. messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
  37. messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
  38. messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
  39. messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
  40. messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
  41. messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
  42. messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
  43. messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
  44. messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
  45. messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
  46. }
  47. }

1.2 参数调优

1)CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal 参数

  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常

  • SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间

  1. @Slf4j
  2. public class TestConnectionTimeout {
  3. public static void main(String[] args) {
  4. NioEventLoopGroup group = new NioEventLoopGroup();
  5. try {
  6. Bootstrap bootstrap = new Bootstrap()
  7. .group(group)
  8. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
  9. .channel(NioSocketChannel.class)
  10. .handler(new LoggingHandler());
  11. ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
  12. future.sync().channel().closeFuture().sync(); // 断点1
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. log.debug("timeout");
  16. } finally {
  17. group.shutdownGracefully();
  18. }
  19. }
  20. }

另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

  1. @Override
  2. public final void connect(
  3. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
  4. // ...
  5. // Schedule connect timeout.
  6. int connectTimeoutMillis = config().getConnectTimeoutMillis();
  7. if (connectTimeoutMillis > 0) {
  8. connectTimeoutFuture = eventLoop().schedule(new Runnable() {
  9. @Override
  10. public void run() {
  11. ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
  12. ConnectTimeoutException cause =
  13. new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2
  14. if (connectPromise != null && connectPromise.tryFailure(cause)) {
  15. close(voidPromise());
  16. }
  17. }
  18. }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
  19. }
  20. // ...
  21. }

2)SO_BACKLOG

  • 属于 ServerSocketChannal 参数
  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

其中

  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制

  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

netty 中

可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小

可以通过下面源码查看默认大小

  1. public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
  2. implements ServerSocketChannelConfig {
  3. private volatile int backlog = NetUtil.SOMAXCONN;
  4. // ...
  5. }

课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey

oio 中更容易说明,不用 debug 模式

  1. public class Server {
  2. public static void main(String[] args) throws IOException {
  3. ServerSocket ss = new ServerSocket(8888, 2);
  4. Socket accept = ss.accept();
  5. System.out.println(accept);
  6. System.in.read();
  7. }
  8. }

客户端启动 4 个

  1. public class Client {
  2. public static void main(String[] args) throws IOException {
  3. try {
  4. Socket s = new Socket();
  5. System.out.println(new Date()+" connecting...");
  6. s.connect(new InetSocketAddress("localhost", 8888),1000);
  7. System.out.println(new Date()+" connected...");
  8. s.getOutputStream().write(1);
  9. System.in.read();
  10. } catch (IOException e) {
  11. System.out.println(new Date()+" connecting timeout...");
  12. e.printStackTrace();
  13. }
  14. }
  15. }

第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中

  1. Tue Apr 21 20:30:28 CST 2020 connecting...
  2. Tue Apr 21 20:30:28 CST 2020 connected...

第 4 个客户端连接时

  1. Tue Apr 21 20:53:58 CST 2020 connecting...
  2. Tue Apr 21 20:53:59 CST 2020 connecting timeout...
  3. java.net.SocketTimeoutException: connect timed out

3)ulimit -n

  • 属于操作系统参数

4)TCP_NODELAY

  • 属于 SocketChannal 参数

5)SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)

6)ALLOCATOR

  • 属于 SocketChannal 参数
  • 用来分配 ByteBuf, ctx.alloc()

7)RCVBUF_ALLOCATOR

  • 属于 SocketChannal 参数
  • 控制 netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

1.3 RPC 框架

1)准备工作

这些代码可以认为是现成的,无需从头编写练习

为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

  1. @Data
  2. public abstract class Message implements Serializable {
  3. // 省略旧的代码
  4. public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
  5. public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
  6. static {
  7. // ...
  8. messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
  9. messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
  10. }
  11. }

请求消息

  1. @Getter
  2. @ToString(callSuper = true)
  3. public class RpcRequestMessage extends Message {
  4. /**
  5. * 调用的接口全限定名,服务端根据它找到实现
  6. */
  7. private String interfaceName;
  8. /**
  9. * 调用接口中的方法名
  10. */
  11. private String methodName;
  12. /**
  13. * 方法返回类型
  14. */
  15. private Class<?> returnType;
  16. /**
  17. * 方法参数类型数组
  18. */
  19. private Class[] parameterTypes;
  20. /**
  21. * 方法参数值数组
  22. */
  23. private Object[] parameterValue;
  24. public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
  25. super.setSequenceId(sequenceId);
  26. this.interfaceName = interfaceName;
  27. this.methodName = methodName;
  28. this.returnType = returnType;
  29. this.parameterTypes = parameterTypes;
  30. this.parameterValue = parameterValue;
  31. }
  32. @Override
  33. public int getMessageType() {
  34. return RPC_MESSAGE_TYPE_REQUEST;
  35. }
  36. }

响应消息

  1. @Data
  2. @ToString(callSuper = true)
  3. public class RpcResponseMessage extends Message {
  4. /**
  5. * 返回值
  6. */
  7. private Object returnValue;
  8. /**
  9. * 异常值
  10. */
  11. private Exception exceptionValue;
  12. @Override
  13. public int getMessageType() {
  14. return RPC_MESSAGE_TYPE_RESPONSE;
  15. }
  16. }

服务器架子

  1. @Slf4j
  2. public class RpcServer {
  3. public static void main(String[] args) {
  4. NioEventLoopGroup boss = new NioEventLoopGroup();
  5. NioEventLoopGroup worker = new NioEventLoopGroup();
  6. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  7. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  8. // rpc 请求消息处理器,待实现
  9. RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
  10. try {
  11. ServerBootstrap serverBootstrap = new ServerBootstrap();
  12. serverBootstrap.channel(NioServerSocketChannel.class);
  13. serverBootstrap.group(boss, worker);
  14. serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
  15. @Override
  16. protected void initChannel(SocketChannel ch) throws Exception {
  17. ch.pipeline().addLast(new ProcotolFrameDecoder());
  18. ch.pipeline().addLast(LOGGING_HANDLER);
  19. ch.pipeline().addLast(MESSAGE_CODEC);
  20. ch.pipeline().addLast(RPC_HANDLER);
  21. }
  22. });
  23. Channel channel = serverBootstrap.bind(8080).sync().channel();
  24. channel.closeFuture().sync();
  25. } catch (InterruptedException e) {
  26. log.error("server error", e);
  27. } finally {
  28. boss.shutdownGracefully();
  29. worker.shutdownGracefully();
  30. }
  31. }
  32. }

客户端架子

  1. public class RpcClient {
  2. public static void main(String[] args) {
  3. NioEventLoopGroup group = new NioEventLoopGroup();
  4. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  5. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  6. // rpc 响应消息处理器,待实现
  7. RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
  8. try {
  9. Bootstrap bootstrap = new Bootstrap();
  10. bootstrap.channel(NioSocketChannel.class);
  11. bootstrap.group(group);
  12. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. protected void initChannel(SocketChannel ch) throws Exception {
  15. ch.pipeline().addLast(new ProcotolFrameDecoder());
  16. ch.pipeline().addLast(LOGGING_HANDLER);
  17. ch.pipeline().addLast(MESSAGE_CODEC);
  18. ch.pipeline().addLast(RPC_HANDLER);
  19. }
  20. });
  21. Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
  22. channel.closeFuture().sync();
  23. } catch (Exception e) {
  24. log.error("client error", e);
  25. } finally {
  26. group.shutdownGracefully();
  27. }
  28. }
  29. }

服务器端的 service 获取

  1. public class ServicesFactory {
  2. static Properties properties;
  3. static Map<Class<?>, Object> map = new ConcurrentHashMap<>();
  4. static {
  5. try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
  6. properties = new Properties();
  7. properties.load(in);
  8. Set<String> names = properties.stringPropertyNames();
  9. for (String name : names) {
  10. if (name.endsWith("Service")) {
  11. Class<?> interfaceClass = Class.forName(name);
  12. Class<?> instanceClass = Class.forName(properties.getProperty(name));
  13. map.put(interfaceClass, instanceClass.newInstance());
  14. }
  15. }
  16. } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
  17. throw new ExceptionInInitializerError(e);
  18. }
  19. }
  20. public static <T> T getService(Class<T> interfaceClass) {
  21. return (T) map.get(interfaceClass);
  22. }
  23. }

相关配置 application.properties

  1. serializer.algorithm=Json
  2. cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

2)服务器 handler

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
  4. @Override
  5. protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
  6. RpcResponseMessage response = new RpcResponseMessage();
  7. response.setSequenceId(message.getSequenceId());
  8. try {
  9. // 获取真正的实现对象
  10. HelloService service = (HelloService)
  11. ServicesFactory.getService(Class.forName(message.getInterfaceName()));
  12. // 获取要调用的方法
  13. Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
  14. // 调用方法
  15. Object invoke = method.invoke(service, message.getParameterValue());
  16. // 调用成功
  17. response.setReturnValue(invoke);
  18. } catch (Exception e) {
  19. e.printStackTrace();
  20. // 调用异常
  21. response.setExceptionValue(e);
  22. }
  23. // 返回结果
  24. ctx.writeAndFlush(response);
  25. }
  26. }

3)客户端代码第一版

只发消息

  1. @Slf4j
  2. public class RpcClient {
  3. public static void main(String[] args) {
  4. NioEventLoopGroup group = new NioEventLoopGroup();
  5. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  6. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  7. RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
  8. try {
  9. Bootstrap bootstrap = new Bootstrap();
  10. bootstrap.channel(NioSocketChannel.class);
  11. bootstrap.group(group);
  12. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. protected void initChannel(SocketChannel ch) throws Exception {
  15. ch.pipeline().addLast(new ProcotolFrameDecoder());
  16. ch.pipeline().addLast(LOGGING_HANDLER);
  17. ch.pipeline().addLast(MESSAGE_CODEC);
  18. ch.pipeline().addLast(RPC_HANDLER);
  19. }
  20. });
  21. Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
  22. ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
  23. 1,
  24. "cn.itcast.server.service.HelloService",
  25. "sayHello",
  26. String.class,
  27. new Class[]{String.class},
  28. new Object[]{"张三"}
  29. )).addListener(promise -> {
  30. if (!promise.isSuccess()) {
  31. Throwable cause = promise.cause();
  32. log.error("error", cause);
  33. }
  34. });
  35. channel.closeFuture().sync();
  36. } catch (Exception e) {
  37. log.error("client error", e);
  38. } finally {
  39. group.shutdownGracefully();
  40. }
  41. }
  42. }

4)客户端 handler 第一版

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
  4. @Override
  5. protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
  6. log.debug("{}", msg);
  7. }
  8. }

5)客户端代码 第二版

包括 channel 管理,代理,接收结果

  1. @Slf4j
  2. public class RpcClientManager {
  3. public static void main(String[] args) {
  4. HelloService service = getProxyService(HelloService.class);
  5. System.out.println(service.sayHello("zhangsan"));
  6. // System.out.println(service.sayHello("lisi"));
  7. // System.out.println(service.sayHello("wangwu"));
  8. }
  9. // 创建代理类
  10. public static <T> T getProxyService(Class<T> serviceClass) {
  11. ClassLoader loader = serviceClass.getClassLoader();
  12. Class<?>[] interfaces = new Class[]{serviceClass};
  13. // sayHello "张三"
  14. Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
  15. // 1. 将方法调用转换为 消息对象
  16. int sequenceId = SequenceIdGenerator.nextId();
  17. RpcRequestMessage msg = new RpcRequestMessage(
  18. sequenceId,
  19. serviceClass.getName(),
  20. method.getName(),
  21. method.getReturnType(),
  22. method.getParameterTypes(),
  23. args
  24. );
  25. // 2. 将消息对象发送出去
  26. getChannel().writeAndFlush(msg);
  27. // 3. 准备一个空 Promise 对象,来接收结果 指定 promise 对象异步接收结果线程
  28. DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
  29. RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);
  30. // promise.addListener(future -> {
  31. // // 线程
  32. // });
  33. // 4. 等待 promise 结果
  34. promise.await();
  35. if(promise.isSuccess()) {
  36. // 调用正常
  37. return promise.getNow();
  38. } else {
  39. // 调用失败
  40. throw new RuntimeException(promise.cause());
  41. }
  42. });
  43. return (T) o;
  44. }
  45. private static Channel channel = null;
  46. private static final Object LOCK = new Object();
  47. // 获取唯一的 channel 对象
  48. public static Channel getChannel() {
  49. if (channel != null) {
  50. return channel;
  51. }
  52. synchronized (LOCK) { // t2
  53. if (channel != null) { // t1
  54. return channel;
  55. }
  56. initChannel();
  57. return channel;
  58. }
  59. }
  60. // 初始化 channel 方法
  61. private static void initChannel() {
  62. NioEventLoopGroup group = new NioEventLoopGroup();
  63. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
  64. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  65. RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
  66. Bootstrap bootstrap = new Bootstrap();
  67. bootstrap.channel(NioSocketChannel.class);
  68. bootstrap.group(group);
  69. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  70. @Override
  71. protected void initChannel(SocketChannel ch) throws Exception {
  72. ch.pipeline().addLast(new ProcotolFrameDecoder());
  73. ch.pipeline().addLast(LOGGING_HANDLER);
  74. ch.pipeline().addLast(MESSAGE_CODEC);
  75. ch.pipeline().addLast(RPC_HANDLER);
  76. }
  77. });
  78. try {
  79. channel = bootstrap.connect("localhost", 8080).sync().channel();
  80. channel.closeFuture().addListener(future -> {
  81. group.shutdownGracefully();
  82. });
  83. } catch (Exception e) {
  84. log.error("client error", e);
  85. }
  86. }
  87. }

6)客户端 handler 第二版

  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
  4. // 序号 用来接收结果的 promise 对象
  5. public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
  6. @Override
  7. protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
  8. log.debug("{}", msg);
  9. // 拿到空的 promise
  10. Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
  11. if (promise != null) {
  12. Object returnValue = msg.getReturnValue();
  13. Exception exceptionValue = msg.getExceptionValue();
  14. if(exceptionValue != null) {
  15. promise.setFailure(exceptionValue);
  16. } else {
  17. promise.setSuccess(returnValue);
  18. }
  19. }
  20. }
  21. }

2. 源码分析

2.1 启动剖析

我们就来看看 netty 中对下面的代码是怎样进行处理的

  1. //1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
  2. Selector selector = Selector.open();
  3. //2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
  4. NioServerSocketChannel attachment = new NioServerSocketChannel();
  5. //3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
  6. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  7. serverSocketChannel.configureBlocking(false);
  8. //4 启动 nio boss 线程执行接下来的操作
  9. //5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
  10. SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
  11. //6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
  12. //7 绑定端口
  13. serverSocketChannel.bind(new InetSocketAddress(8080));
  14. //8 触发 channel active 事件,在 head 中关注 op_accept 事件
  15. selectionKey.interestOps(SelectionKey.OP_ACCEPT);

入口 io.netty.bootstrap.ServerBootstrap#bind

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind

  1. private ChannelFuture doBind(final SocketAddress localAddress) {
  2. // 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码
  3. final ChannelFuture regFuture = initAndRegister();
  4. final Channel channel = regFuture.channel();
  5. if (regFuture.cause() != null) {
  6. return regFuture;
  7. }
  8. // 2. 因为是 initAndRegister 异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分
  9. // 2.1 如果已经完成
  10. if (regFuture.isDone()) {
  11. ChannelPromise promise = channel.newPromise();
  12. // 3.1 立刻调用 doBind0
  13. doBind0(regFuture, channel, localAddress, promise);
  14. return promise;
  15. }
  16. // 2.2 还没有完成
  17. else {
  18. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
  19. // 3.2 回调 doBind0
  20. regFuture.addListener(new ChannelFutureListener() {
  21. @Override
  22. public void operationComplete(ChannelFuture future) throws Exception {
  23. Throwable cause = future.cause();
  24. if (cause != null) {
  25. // 处理异常...
  26. promise.setFailure(cause);
  27. } else {
  28. promise.registered();
  29. // 3. 由注册线程去执行 doBind0
  30. doBind0(regFuture, channel, localAddress, promise);
  31. }
  32. }
  33. });
  34. return promise;
  35. }
  36. }

关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister

  1. final ChannelFuture initAndRegister() {
  2. Channel channel = null;
  3. try {
  4. channel = channelFactory.newChannel();
  5. // 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer
  6. init(channel);
  7. } catch (Throwable t) {
  8. // 处理异常...
  9. return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
  10. }
  11. // 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上
  12. ChannelFuture regFuture = config().group().register(channel);
  13. if (regFuture.cause() != null) {
  14. // 处理异常...
  15. }
  16. return regFuture;
  17. }

关键代码 io.netty.bootstrap.ServerBootstrap#init

  1. // 这里 channel 实际上是 NioServerSocketChannel
  2. void init(Channel channel) throws Exception {
  3. final Map<ChannelOption<?>, Object> options = options0();
  4. synchronized (options) {
  5. setChannelOptions(channel, options, logger);
  6. }
  7. final Map<AttributeKey<?>, Object> attrs = attrs0();
  8. synchronized (attrs) {
  9. for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
  10. @SuppressWarnings("unchecked")
  11. AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
  12. channel.attr(key).set(e.getValue());
  13. }
  14. }
  15. ChannelPipeline p = channel.pipeline();
  16. final EventLoopGroup currentChildGroup = childGroup;
  17. final ChannelHandler currentChildHandler = childHandler;
  18. final Entry<ChannelOption<?>, Object>[] currentChildOptions;
  19. final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
  20. synchronized (childOptions) {
  21. currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
  22. }
  23. synchronized (childAttrs) {
  24. currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
  25. }
  26. // 为 NioServerSocketChannel 添加初始化器
  27. p.addLast(new ChannelInitializer<Channel>() {
  28. @Override
  29. public void initChannel(final Channel ch) throws Exception {
  30. final ChannelPipeline pipeline = ch.pipeline();
  31. ChannelHandler handler = config.handler();
  32. if (handler != null) {
  33. pipeline.addLast(handler);
  34. }
  35. // 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
  36. ch.eventLoop().execute(new Runnable() {
  37. @Override
  38. public void run() {
  39. pipeline.addLast(new ServerBootstrapAcceptor(
  40. ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  41. }
  42. });
  43. }
  44. });
  45. }

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register

  1. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  2. // 一些检查,略...
  3. AbstractChannel.this.eventLoop = eventLoop;
  4. if (eventLoop.inEventLoop()) {
  5. register0(promise);
  6. } else {
  7. try {
  8. // 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行
  9. // 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程
  10. // 这行代码完成的事实是 main -> nio boss 线程的切换
  11. eventLoop.execute(new Runnable() {
  12. @Override
  13. public void run() {
  14. register0(promise);
  15. }
  16. });
  17. } catch (Throwable t) {
  18. // 日志记录...
  19. closeForcibly();
  20. closeFuture.setClosed();
  21. safeSetFailure(promise, t);
  22. }
  23. }
  24. }

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

  1. private void register0(ChannelPromise promise) {
  2. try {
  3. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  4. return;
  5. }
  6. boolean firstRegistration = neverRegistered;
  7. // 1.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel
  8. doRegister();
  9. neverRegistered = false;
  10. registered = true;
  11. // 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel
  12. pipeline.invokeHandlerAddedIfNeeded();
  13. // 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0
  14. safeSetSuccess(promise);
  15. pipeline.fireChannelRegistered();
  16. // 对应 server socket channel 还未绑定,isActive 为 false
  17. if (isActive()) {
  18. if (firstRegistration) {
  19. pipeline.fireChannelActive();
  20. } else if (config().isAutoRead()) {
  21. beginRead();
  22. }
  23. }
  24. } catch (Throwable t) {
  25. // Close the channel directly to avoid FD leak.
  26. closeForcibly();
  27. closeFuture.setClosed();
  28. safeSetFailure(promise, t);
  29. }
  30. }

关键代码 io.netty.channel.ChannelInitializer#initChannel

  1. private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
  2. if (initMap.add(ctx)) { // Guard against re-entrance.
  3. try {
  4. // 1.2.2.1 执行初始化
  5. initChannel((C) ctx.channel());
  6. } catch (Throwable cause) {
  7. exceptionCaught(ctx, cause);
  8. } finally {
  9. // 1.2.2.2 移除初始化器
  10. ChannelPipeline pipeline = ctx.pipeline();
  11. if (pipeline.context(this) != null) {
  12. pipeline.remove(this);
  13. }
  14. }
  15. return true;
  16. }
  17. return false;
  18. }

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0

  1. // 3.1 或 3.2 执行 doBind0
  2. private static void doBind0(
  3. final ChannelFuture regFuture, final Channel channel,
  4. final SocketAddress localAddress, final ChannelPromise promise) {
  5. channel.eventLoop().execute(new Runnable() {
  6. @Override
  7. public void run() {
  8. if (regFuture.isSuccess()) {
  9. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  10. } else {
  11. promise.setFailure(regFuture.cause());
  12. }
  13. }
  14. });
  15. }

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind

  1. public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
  2. assertEventLoop();
  3. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  4. return;
  5. }
  6. if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
  7. localAddress instanceof InetSocketAddress &&
  8. !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
  9. !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
  10. // 记录日志...
  11. }
  12. boolean wasActive = isActive();
  13. try {
  14. // 3.3 执行端口绑定
  15. doBind(localAddress);
  16. } catch (Throwable t) {
  17. safeSetFailure(promise, t);
  18. closeIfClosed();
  19. return;
  20. }
  21. if (!wasActive && isActive()) {
  22. invokeLater(new Runnable() {
  23. @Override
  24. public void run() {
  25. // 3.4 触发 active 事件
  26. pipeline.fireChannelActive();
  27. }
  28. });
  29. }
  30. safeSetSuccess(promise);
  31. }

3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind

  1. protected void doBind(SocketAddress localAddress) throws Exception {
  2. if (PlatformDependent.javaVersion() >= 7) {
  3. javaChannel().bind(localAddress, config.getBacklog());
  4. } else {
  5. javaChannel().socket().bind(localAddress, config.getBacklog());
  6. }
  7. }

3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

  1. public void channelActive(ChannelHandlerContext ctx) {
  2. ctx.fireChannelActive();
  3. // 触发 read (NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册)
  4. readIfIsAutoRead();
  5. }

关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead

  1. protected void doBeginRead() throws Exception {
  2. // Channel.read() or ChannelHandlerContext.read() was called
  3. final SelectionKey selectionKey = this.selectionKey;
  4. if (!selectionKey.isValid()) {
  5. return;
  6. }
  7. readPending = true;
  8. final int interestOps = selectionKey.interestOps();
  9. // readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
  10. if ((interestOps & readInterestOp) == 0) {
  11. selectionKey.interestOps(interestOps | readInterestOp);
  12. }
  13. }

2.2 NioEventLoop 剖析

NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),

提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute

  1. public void execute(Runnable task) {
  2. if (task == null) {
  3. throw new NullPointerException("task");
  4. }
  5. boolean inEventLoop = inEventLoop();
  6. // 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
  7. addTask(task);
  8. if (!inEventLoop) {
  9. // inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
  10. startThread();
  11. if (isShutdown()) {
  12. // 如果已经 shutdown,做拒绝逻辑,代码略...
  13. }
  14. }
  15. if (!addTaskWakesUp && wakesUpForTask(task)) {
  16. // 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
  17. wakeup(inEventLoop);
  18. }
  19. }

唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup

  1. @Override
  2. protected void wakeup(boolean inEventLoop) {
  3. if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
  4. selector.wakeup();
  5. }
  6. }

启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread

  1. private void doStartThread() {
  2. assert thread == null;
  3. executor.execute(new Runnable() {
  4. @Override
  5. public void run() {
  6. // 将线程池的当前线程保存在成员变量中,以便后续使用
  7. thread = Thread.currentThread();
  8. if (interrupted) {
  9. thread.interrupt();
  10. }
  11. boolean success = false;
  12. updateLastExecutionTime();
  13. try {
  14. // 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下
  15. SingleThreadEventExecutor.this.run();
  16. success = true;
  17. } catch (Throwable t) {
  18. logger.warn("Unexpected exception from an event executor: ", t);
  19. } finally {
  20. // 清理工作,代码略...
  21. }
  22. }
  23. });
  24. }

io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件

  1. protected void run() {
  2. for (;;) {
  3. try {
  4. try {
  5. // calculateStrategy 的逻辑如下:
  6. // 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch
  7. // 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞
  8. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
  9. case SelectStrategy.CONTINUE:
  10. continue;
  11. case SelectStrategy.BUSY_WAIT:
  12. case SelectStrategy.SELECT:
  13. // 因为 IO 线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒
  14. // 进行 select 阻塞,并设置唤醒状态为 false
  15. boolean oldWakenUp = wakenUp.getAndSet(false);
  16. // 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup
  17. // 下面的 select 方法不会阻塞
  18. // 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?
  19. // 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时
  20. // 才能执行,让 select 方法无谓阻塞
  21. select(oldWakenUp);
  22. if (wakenUp.get()) {
  23. selector.wakeup();
  24. }
  25. default:
  26. }
  27. } catch (IOException e) {
  28. rebuildSelector0();
  29. handleLoopException(e);
  30. continue;
  31. }
  32. cancelledKeys = 0;
  33. needsToSelectAgain = false;
  34. // ioRatio 默认是 50
  35. final int ioRatio = this.ioRatio;
  36. if (ioRatio == 100) {
  37. try {
  38. processSelectedKeys();
  39. } finally {
  40. // ioRatio 为 100 时,总是运行完所有非 IO 任务
  41. runAllTasks();
  42. }
  43. } else {
  44. final long ioStartTime = System.nanoTime();
  45. try {
  46. processSelectedKeys();
  47. } finally {
  48. // 记录 io 事件处理耗时
  49. final long ioTime = System.nanoTime() - ioStartTime;
  50. // 运行非 IO 任务,一旦超时会退出 runAllTasks
  51. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  52. }
  53. }
  54. } catch (Throwable t) {
  55. handleLoopException(t);
  56. }
  57. try {
  58. if (isShuttingDown()) {
  59. closeAll();
  60. if (confirmShutdown()) {
  61. return;
  62. }
  63. }
  64. } catch (Throwable t) {
  65. handleLoopException(t);
  66. }
  67. }
  68. }

⚠️ 注意

这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:

  • 由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
  • 由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作

参考下图

Netty04-优化与源码 - 图1

io.netty.channel.nio.NioEventLoop#select

  1. private void select(boolean oldWakenUp) throws IOException {
  2. Selector selector = this.selector;
  3. try {
  4. int selectCnt = 0;
  5. long currentTimeNanos = System.nanoTime();
  6. // 计算等待时间
  7. // * 没有 scheduledTask,超时时间为 1s
  8. // * 有 scheduledTask,超时时间为 `下一个定时任务执行时间 - 当前时间`
  9. long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
  10. for (;;) {
  11. long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
  12. // 如果超时,退出循环
  13. if (timeoutMillis <= 0) {
  14. if (selectCnt == 0) {
  15. selector.selectNow();
  16. selectCnt = 1;
  17. }
  18. break;
  19. }
  20. // 如果期间又有 task 退出循环,如果没这个判断,那么任务就会等到下次 select 超时时才能被执行
  21. // wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeup
  22. if (hasTasks() && wakenUp.compareAndSet(false, true)) {
  23. selector.selectNow();
  24. selectCnt = 1;
  25. break;
  26. }
  27. // select 有限时阻塞
  28. // 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,导致不断空轮询,cpu 占用 100%
  29. int selectedKeys = selector.select(timeoutMillis);
  30. // 计数加 1
  31. selectCnt ++;
  32. // 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环
  33. if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
  34. break;
  35. }
  36. if (Thread.interrupted()) {
  37. // 线程被打断,退出循环
  38. // 记录日志
  39. selectCnt = 1;
  40. break;
  41. }
  42. long time = System.nanoTime();
  43. if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
  44. // 如果超时,计数重置为 1,下次循环就会 break
  45. selectCnt = 1;
  46. }
  47. // 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
  48. // 这是为了解决 nio 空轮询 bug
  49. else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
  50. selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
  51. // 重建 selector
  52. selector = selectRebuildSelector(selectCnt);
  53. selectCnt = 1;
  54. break;
  55. }
  56. currentTimeNanos = time;
  57. }
  58. if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
  59. // 记录日志
  60. }
  61. } catch (CancelledKeyException e) {
  62. // 记录日志
  63. }
  64. }

处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys

  1. private void processSelectedKeys() {
  2. if (selectedKeys != null) {
  3. // 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet
  4. // SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)
  5. processSelectedKeysOptimized();
  6. } else {
  7. processSelectedKeysPlain(selector.selectedKeys());
  8. }
  9. }

io.netty.channel.nio.NioEventLoop#processSelectedKey

  1. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
  2. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
  3. // 当 key 取消或关闭时会导致这个 key 无效
  4. if (!k.isValid()) {
  5. // 无效时处理...
  6. return;
  7. }
  8. try {
  9. int readyOps = k.readyOps();
  10. // 连接事件
  11. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
  12. int ops = k.interestOps();
  13. ops &= ~SelectionKey.OP_CONNECT;
  14. k.interestOps(ops);
  15. unsafe.finishConnect();
  16. }
  17. // 可写事件
  18. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
  19. ch.unsafe().forceFlush();
  20. }
  21. // 可读或可接入事件
  22. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
  23. // 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
  24. // 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
  25. unsafe.read();
  26. }
  27. } catch (CancelledKeyException ignored) {
  28. unsafe.close(unsafe.voidPromise());
  29. }
  30. }

2.3 accept 剖析

nio 中如下代码,在 netty 中的流程

  1. //1 阻塞直到事件发生
  2. selector.select();
  3. Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  4. while (iter.hasNext()) {
  5. //2 拿到一个事件
  6. SelectionKey key = iter.next();
  7. //3 如果是 accept 事件
  8. if (key.isAcceptable()) {
  9. //4 执行 accept
  10. SocketChannel channel = serverSocketChannel.accept();
  11. channel.configureBlocking(false);
  12. //5 关注 read 事件
  13. channel.register(selector, SelectionKey.OP_READ);
  14. }
  15. // ...
  16. }

先来看可接入事件处理(accept)

io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

  1. public void read() {
  2. assert eventLoop().inEventLoop();
  3. final ChannelConfig config = config();
  4. final ChannelPipeline pipeline = pipeline();
  5. final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
  6. allocHandle.reset(config);
  7. boolean closed = false;
  8. Throwable exception = null;
  9. try {
  10. try {
  11. do {
  12. // doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf
  13. // readBuf 是一个 ArrayList 用来缓存消息
  14. int localRead = doReadMessages(readBuf);
  15. if (localRead == 0) {
  16. break;
  17. }
  18. if (localRead < 0) {
  19. closed = true;
  20. break;
  21. }
  22. // localRead 为 1,就一条消息,即接收一个客户端连接
  23. allocHandle.incMessagesRead(localRead);
  24. } while (allocHandle.continueReading());
  25. } catch (Throwable t) {
  26. exception = t;
  27. }
  28. int size = readBuf.size();
  29. for (int i = 0; i < size; i ++) {
  30. readPending = false;
  31. // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理
  32. // io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
  33. pipeline.fireChannelRead(readBuf.get(i));
  34. }
  35. readBuf.clear();
  36. allocHandle.readComplete();
  37. pipeline.fireChannelReadComplete();
  38. if (exception != null) {
  39. closed = closeOnReadError(exception);
  40. pipeline.fireExceptionCaught(exception);
  41. }
  42. if (closed) {
  43. inputShutdown = true;
  44. if (isOpen()) {
  45. close(voidPromise());
  46. }
  47. }
  48. } finally {
  49. if (!readPending && !config.isAutoRead()) {
  50. removeReadOp();
  51. }
  52. }
  53. }

关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

  1. public void channelRead(ChannelHandlerContext ctx, Object msg) {
  2. // 这时的 msg 是 NioSocketChannel
  3. final Channel child = (Channel) msg;
  4. // NioSocketChannel 添加 childHandler 即初始化器
  5. child.pipeline().addLast(childHandler);
  6. // 设置选项
  7. setChannelOptions(child, childOptions, logger);
  8. for (Entry<AttributeKey<?>, Object> e: childAttrs) {
  9. child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
  10. }
  11. try {
  12. // 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
  13. childGroup.register(child).addListener(new ChannelFutureListener() {
  14. @Override
  15. public void operationComplete(ChannelFuture future) throws Exception {
  16. if (!future.isSuccess()) {
  17. forceClose(child, future.cause());
  18. }
  19. }
  20. });
  21. } catch (Throwable t) {
  22. forceClose(child, t);
  23. }
  24. }

又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法

  1. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  2. // 一些检查,略...
  3. AbstractChannel.this.eventLoop = eventLoop;
  4. if (eventLoop.inEventLoop()) {
  5. register0(promise);
  6. } else {
  7. try {
  8. // 这行代码完成的事实是 nio boss -> nio worker 线程的切换
  9. eventLoop.execute(new Runnable() {
  10. @Override
  11. public void run() {
  12. register0(promise);
  13. }
  14. });
  15. } catch (Throwable t) {
  16. // 日志记录...
  17. closeForcibly();
  18. closeFuture.setClosed();
  19. safeSetFailure(promise, t);
  20. }
  21. }
  22. }

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

  1. private void register0(ChannelPromise promise) {
  2. try {
  3. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  4. return;
  5. }
  6. boolean firstRegistration = neverRegistered;
  7. doRegister();
  8. neverRegistered = false;
  9. registered = true;
  10. // 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
  11. pipeline.invokeHandlerAddedIfNeeded();
  12. // 执行后就是 head -> logging handler -> my handler -> tail
  13. safeSetSuccess(promise);
  14. pipeline.fireChannelRegistered();
  15. if (isActive()) {
  16. if (firstRegistration) {
  17. // 触发 pipeline 上 active 事件
  18. pipeline.fireChannelActive();
  19. } else if (config().isAutoRead()) {
  20. beginRead();
  21. }
  22. }
  23. } catch (Throwable t) {
  24. closeForcibly();
  25. closeFuture.setClosed();
  26. safeSetFailure(promise, t);
  27. }
  28. }

回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

  1. public void channelActive(ChannelHandlerContext ctx) {
  2. ctx.fireChannelActive();
  3. // 触发 read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)
  4. readIfIsAutoRead();
  5. }

io.netty.channel.nio.AbstractNioChannel#doBeginRead

  1. protected void doBeginRead() throws Exception {
  2. // Channel.read() or ChannelHandlerContext.read() was called
  3. final SelectionKey selectionKey = this.selectionKey;
  4. if (!selectionKey.isValid()) {
  5. return;
  6. }
  7. readPending = true;
  8. // 这时候 interestOps 是 0
  9. final int interestOps = selectionKey.interestOps();
  10. if ((interestOps & readInterestOp) == 0) {
  11. // 关注 read 事件
  12. selectionKey.interestOps(interestOps | readInterestOp);
  13. }
  14. }

2.4 read 剖析

再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete

  1. public final void read() {
  2. final ChannelConfig config = config();
  3. if (shouldBreakReadReady(config)) {
  4. clearReadPending();
  5. return;
  6. }
  7. final ChannelPipeline pipeline = pipeline();
  8. // io.netty.allocator.type 决定 allocator 的实现
  9. final ByteBufAllocator allocator = config.getAllocator();
  10. // 用来分配 byteBuf,确定单次读取大小
  11. final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
  12. allocHandle.reset(config);
  13. ByteBuf byteBuf = null;
  14. boolean close = false;
  15. try {
  16. do {
  17. byteBuf = allocHandle.allocate(allocator);
  18. // 读取
  19. allocHandle.lastBytesRead(doReadBytes(byteBuf));
  20. if (allocHandle.lastBytesRead() <= 0) {
  21. byteBuf.release();
  22. byteBuf = null;
  23. close = allocHandle.lastBytesRead() < 0;
  24. if (close) {
  25. readPending = false;
  26. }
  27. break;
  28. }
  29. allocHandle.incMessagesRead(1);
  30. readPending = false;
  31. // 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理 NioSocketChannel 上的 handler
  32. pipeline.fireChannelRead(byteBuf);
  33. byteBuf = null;
  34. }
  35. // 是否要继续循环
  36. while (allocHandle.continueReading());
  37. allocHandle.readComplete();
  38. // 触发 read complete 事件
  39. pipeline.fireChannelReadComplete();
  40. if (close) {
  41. closeOnRead(pipeline);
  42. }
  43. } catch (Throwable t) {
  44. handleReadException(pipeline, byteBuf, t, close, allocHandle);
  45. } finally {
  46. if (!readPending && !config.isAutoRead()) {
  47. removeReadOp();
  48. }
  49. }
  50. }

io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)

  1. public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
  2. return
  3. // 一般为 true
  4. config.isAutoRead() &&
  5. // respectMaybeMoreData 默认为 true
  6. // maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
  7. (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
  8. // 小于最大次数,maxMessagePerRead 默认 16
  9. totalMessages < maxMessagePerRead &&
  10. // 实际读到了数据
  11. totalBytesRead > 0;
  12. }