1. 优化

1.1 序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理 ```java public interface Serializer {

    // 反序列化方法 T deserialize(Class clazz, byte[] bytes);

    // 序列化方法 byte[] serialize(T object);

}

  1. ```java
  2. public enum SerializerAlgorithm implements Serializer {
  3. JAVA {
  4. @Override
  5. public <T> T deserialize(Class<T> clazz, byte[] bytes) {
  6. try {
  7. ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
  8. return (T) new ObjectInputStream(byteArrayInputStream).readObject();
  9. } catch (Exception e) {
  10. throw new RuntimeException("JAVA deserialize error", e);
  11. }
  12. }
  13. @Override
  14. public <T> byte[] serialize(T object) {
  15. try {
  16. ByteArrayOutputStream buffer = new ByteArrayOutputStream();
  17. new ObjectOutputStream(buffer).writeObject(object);
  18. return buffer.toByteArray();
  19. } catch (IOException e) {
  20. throw new RuntimeException("JAVA serializer error", e);
  21. }
  22. }
  23. },
  24. GSON {
  25. private final Gson gson = new Gson();
  26. @Override
  27. public <T> T deserialize(Class<T> clazz, byte[] bytes) {
  28. return gson.fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
  29. }
  30. @Override
  31. public <T> byte[] serialize(T object) {
  32. return gson.toJson(object).getBytes(StandardCharsets.UTF_8);
  33. }
  34. };
  35. public static SerializerAlgorithm getByType(Byte type) {
  36. return SerializerAlgorithm.values()[type];
  37. }
  38. }
  1. @Slf4j
  2. @ChannelHandler.Sharable
  3. public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
  4. @Override
  5. protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
  6. ByteBuf out = ctx.alloc().buffer();
  7. // 1. 4 字节的魔数
  8. out.writeBytes(new byte[]{1, 2, 3, 4});
  9. // 2. 1 字节的版本,
  10. out.writeByte(1);
  11. // 3. 1 字节的序列化方式 jdk 0 , json 1
  12. out.writeByte(Config.getSerializerAlgorithm().ordinal());
  13. // 4. 1 字节的指令类型
  14. out.writeByte(msg.getMessageType());
  15. // 5. 4 个字节
  16. out.writeInt(msg.getSequenceId());
  17. // 无意义,对齐填充
  18. out.writeByte(0xff);
  19. // 6. 获取内容的字节数组
  20. byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
  21. log.info(new String(bytes));
  22. // 7. 长度
  23. out.writeInt(bytes.length);
  24. // 8. 写入内容
  25. out.writeBytes(bytes);
  26. outList.add(out);
  27. }
  28. @Override
  29. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  30. int magicNum = in.readInt();
  31. byte version = in.readByte();
  32. byte serializerType = in.readByte();
  33. byte messageType = in.readByte();
  34. int sequenceId = in.readInt();
  35. in.readByte();
  36. int length = in.readInt();
  37. byte[] bytes = new byte[length];
  38. in.readBytes(bytes, 0, length);
  39. SerializerAlgorithm algorithm = SerializerAlgorithm.getByType(serializerType);
  40. Class<? extends Message> messageClass = Message.getMessageClass(messageType);
  41. Message message = algorithm.deserialize(messageClass, bytes);
  42. log.info("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
  43. log.info("{}", message);
  44. out.add(message);
  45. }
  46. }

1.2 参数调优

1.2.1 CONNECT_TIMEOUT_MILLIS

属于 SocketChannal 参数

用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常

SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间

  1. @Slf4j
  2. public class TestConnectionTimeout {
  3. public static void main(String[] args) {
  4. NioEventLoopGroup group = new NioEventLoopGroup();
  5. try {
  6. Bootstrap bootstrap = new Bootstrap()
  7. .group(group)
  8. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
  9. .channel(NioSocketChannel.class)
  10. .handler(new LoggingHandler());
  11. ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
  12. future.sync().channel().closeFuture().sync(); // 断点1
  13. } catch (Exception e) {
  14. e.printStackTrace();
  15. log.debug("timeout");
  16. } finally {
  17. group.shutdownGracefully();
  18. }
  19. }
  20. }

1.2.2 SO_BACKLOG

属于 ServerSocketChannal 参数
image.png

  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制

  • sync queue - 半连接队列
    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列
    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

netty 中可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小

1.2.3 TCP_NODELAY

属于 SocketChannal 参数

1.2.4 SO_SNDBUF & SO_RCVBUF

SO_SNDBUF 属于 SocketChannal 参数

SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)

1.2.5 ALLOCATOR

属于 SocketChannal 参数

用来分配 ByteBuf, ctx.alloc()

1.2.6 RCVBUF_ALLOCATOR

属于 SocketChannal 参数

控制 netty 接收缓冲区大小

负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

2. RPC 案例

  1. @Slf4j
  2. public class ClientRequestHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
  3. public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
  4. @Override
  5. protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
  6. // 注意这里需要从缓存中移除
  7. Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
  8. if (promise != null) {
  9. Object returnValue = msg.getReturnValue();
  10. Throwable exceptionValue = msg.getExceptionValue();
  11. if (exceptionValue != null) {
  12. promise.setFailure(exceptionValue);
  13. } else {
  14. promise.setSuccess(returnValue);
  15. }
  16. }
  17. }
  18. }
  1. @Slf4j
  2. public class RpcClient {
  3. private static final Channel channel;
  4. static {
  5. channel = initChannel();
  6. }
  7. private static final AtomicInteger REQUEST_COUNT = new AtomicInteger(0);
  8. @SuppressWarnings("all")
  9. public static <T> T getProxyService(Class<T> serviceInterface) {
  10. return (T) Proxy.newProxyInstance(
  11. serviceInterface.getClassLoader(),
  12. new Class[]{serviceInterface},
  13. new InvocationHandler() {
  14. @Override
  15. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  16. // 1. 构造 rpc 请求
  17. int sequenceId = REQUEST_COUNT.incrementAndGet();
  18. RpcRequestMessage rpcRequestMessage = new RpcRequestMessage(
  19. sequenceId,
  20. serviceInterface.getName(),
  21. method.getName(),
  22. method.getReturnType(),
  23. method.getParameterTypes(),
  24. args
  25. );
  26. // 2. 发送 rpc 请求
  27. channel.writeAndFlush(rpcRequestMessage);
  28. // 3. 准备一个空 Promise 对象,来接收结果, 指定 promise 对象异步接收结果线程
  29. DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());
  30. ClientRequestHandler.PROMISES.put(sequenceId, promise);
  31. // 4. 阻塞知道有结果
  32. promise.await();
  33. // 5. 此时已经获取到结果, 解析
  34. if (promise.isSuccess()) {
  35. // 正常返回
  36. return promise.getNow();
  37. } else {
  38. // 包装异常
  39. throw new RuntimeException(promise.cause());
  40. }
  41. }
  42. });
  43. }
  44. private static Channel initChannel() {
  45. NioEventLoopGroup group = new NioEventLoopGroup();
  46. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
  47. try {
  48. Bootstrap bootstrap = new Bootstrap();
  49. bootstrap.channel(NioSocketChannel.class);
  50. bootstrap.group(group);
  51. bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  52. @Override
  53. protected void initChannel(SocketChannel ch) throws Exception {
  54. ch.pipeline().addLast(new ProcotolFrameDecoder());
  55. ch.pipeline().addLast(MESSAGE_CODEC);
  56. // 业务处理
  57. ch.pipeline().addLast(new ClientRequestHandler());
  58. // 用来判断是不是[读空闲时间过长],或[写空闲时间过长]
  59. // 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
  60. ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
  61. // ChannelDuplexHandler 可以同时作为入站和出站处理器
  62. ch.pipeline().addLast(new ChannelDuplexHandler() {
  63. // 用来触发特殊事件
  64. @Override
  65. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  66. IdleStateEvent event = (IdleStateEvent) evt;
  67. // 触发了写空闲事件
  68. if (event.state() == IdleState.WRITER_IDLE) {
  69. ctx.writeAndFlush(new PingMessage());
  70. }
  71. }
  72. });
  73. }
  74. });
  75. // 这里需要 sync 同步确保连接建立
  76. Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
  77. // 这里需要异步监听 channel 关闭事件
  78. channel.closeFuture().addListener(new ChannelFutureListener() {
  79. @Override
  80. public void operationComplete(ChannelFuture future) throws Exception {
  81. group.shutdownGracefully();
  82. if (!future.isSuccess()) {
  83. Throwable cause = future.cause();
  84. log.error("客户端 channel 关闭异常", cause);
  85. }
  86. }
  87. });
  88. return channel;
  89. } catch (Exception e) {
  90. log.error("client init error", e);
  91. }
  92. return null;
  93. }
  94. }
  1. public class RpcClientTest {
  2. public static void main(String[] args) {
  3. HelloService helloService = RpcClient.getProxyService(HelloService.class);
  4. System.out.println(helloService.echoName("张三"));
  5. System.out.println(helloService.getAge());
  6. }
  7. }