1. 优化
1.1 序列化算法
序列化,反序列化主要用在消息正文的转换上
- 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理 ```java public interface Serializer {
// 反序列化方法
T deserialize(Class clazz, byte[] bytes); // 序列化方法
byte[] serialize(T object);
}
```java
public enum SerializerAlgorithm implements Serializer {
JAVA {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
return (T) new ObjectInputStream(byteArrayInputStream).readObject();
} catch (Exception e) {
throw new RuntimeException("JAVA deserialize error", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
new ObjectOutputStream(buffer).writeObject(object);
return buffer.toByteArray();
} catch (IOException e) {
throw new RuntimeException("JAVA serializer error", e);
}
}
},
GSON {
private final Gson gson = new Gson();
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
return gson.fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
}
@Override
public <T> byte[] serialize(T object) {
return gson.toJson(object).getBytes(StandardCharsets.UTF_8);
}
};
public static SerializerAlgorithm getByType(Byte type) {
return SerializerAlgorithm.values()[type];
}
}
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(Config.getSerializerAlgorithm().ordinal());
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
log.info(new String(bytes));
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
SerializerAlgorithm algorithm = SerializerAlgorithm.getByType(serializerType);
Class<? extends Message> messageClass = Message.getMessageClass(messageType);
Message message = algorithm.deserialize(messageClass, bytes);
log.info("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.info("{}", message);
out.add(message);
}
}
1.2 参数调优
1.2.1 CONNECT_TIMEOUT_MILLIS
属于 SocketChannal 参数
用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间
@Slf4j
public class TestConnectionTimeout {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync(); // 断点1
} catch (Exception e) {
e.printStackTrace();
log.debug("timeout");
} finally {
group.shutdownGracefully();
}
}
}
1.2.2 SO_BACKLOG
属于 ServerSocketChannal 参数
- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
- 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
- sync queue - 半连接队列
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- accept queue - 全连接队列
- 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
- 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
netty 中可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小
1.2.3 TCP_NODELAY
1.2.4 SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
1.2.5 ALLOCATOR
属于 SocketChannal 参数
1.2.6 RCVBUF_ALLOCATOR
属于 SocketChannal 参数
控制 netty 接收缓冲区大小
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
2. RPC 案例
@Slf4j
public class ClientRequestHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
// 注意这里需要从缓存中移除
Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
if (promise != null) {
Object returnValue = msg.getReturnValue();
Throwable exceptionValue = msg.getExceptionValue();
if (exceptionValue != null) {
promise.setFailure(exceptionValue);
} else {
promise.setSuccess(returnValue);
}
}
}
}
@Slf4j
public class RpcClient {
private static final Channel channel;
static {
channel = initChannel();
}
private static final AtomicInteger REQUEST_COUNT = new AtomicInteger(0);
@SuppressWarnings("all")
public static <T> T getProxyService(Class<T> serviceInterface) {
return (T) Proxy.newProxyInstance(
serviceInterface.getClassLoader(),
new Class[]{serviceInterface},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1. 构造 rpc 请求
int sequenceId = REQUEST_COUNT.incrementAndGet();
RpcRequestMessage rpcRequestMessage = new RpcRequestMessage(
sequenceId,
serviceInterface.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2. 发送 rpc 请求
channel.writeAndFlush(rpcRequestMessage);
// 3. 准备一个空 Promise 对象,来接收结果, 指定 promise 对象异步接收结果线程
DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());
ClientRequestHandler.PROMISES.put(sequenceId, promise);
// 4. 阻塞知道有结果
promise.await();
// 5. 此时已经获取到结果, 解析
if (promise.isSuccess()) {
// 正常返回
return promise.getNow();
} else {
// 包装异常
throw new RuntimeException(promise.cause());
}
}
});
}
private static Channel initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(MESSAGE_CODEC);
// 业务处理
ch.pipeline().addLast(new ClientRequestHandler());
// 用来判断是不是[读空闲时间过长],或[写空闲时间过长]
// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler 可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了写空闲事件
if (event.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingMessage());
}
}
});
}
});
// 这里需要 sync 同步确保连接建立
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
// 这里需要异步监听 channel 关闭事件
channel.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
group.shutdownGracefully();
if (!future.isSuccess()) {
Throwable cause = future.cause();
log.error("客户端 channel 关闭异常", cause);
}
}
});
return channel;
} catch (Exception e) {
log.error("client init error", e);
}
return null;
}
}
public class RpcClientTest {
public static void main(String[] args) {
HelloService helloService = RpcClient.getProxyService(HelloService.class);
System.out.println(helloService.echoName("张三"));
System.out.println(helloService.getAge());
}
}