1. 优化
1.1 序列化算法
序列化,反序列化主要用在消息正文的转换上
- 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理 ```java public interface Serializer {
// 反序列化方法
T deserialize(Class clazz, byte[] bytes); // 序列化方法
byte[] serialize(T object);
}
```javapublic enum SerializerAlgorithm implements Serializer {JAVA {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);return (T) new ObjectInputStream(byteArrayInputStream).readObject();} catch (Exception e) {throw new RuntimeException("JAVA deserialize error", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream buffer = new ByteArrayOutputStream();new ObjectOutputStream(buffer).writeObject(object);return buffer.toByteArray();} catch (IOException e) {throw new RuntimeException("JAVA serializer error", e);}}},GSON {private final Gson gson = new Gson();@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {return gson.fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);}@Overridepublic <T> byte[] serialize(T object) {return gson.toJson(object).getBytes(StandardCharsets.UTF_8);}};public static SerializerAlgorithm getByType(Byte type) {return SerializerAlgorithm.values()[type];}}
@Slf4j@ChannelHandler.Sharablepublic class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(Config.getSerializerAlgorithm().ordinal());// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);log.info(new String(bytes));// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);SerializerAlgorithm algorithm = SerializerAlgorithm.getByType(serializerType);Class<? extends Message> messageClass = Message.getMessageClass(messageType);Message message = algorithm.deserialize(messageClass, bytes);log.info("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.info("{}", message);out.add(message);}}
1.2 参数调优
1.2.1 CONNECT_TIMEOUT_MILLIS
属于 SocketChannal 参数
用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间
@Slf4jpublic class TestConnectionTimeout {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300).channel(NioSocketChannel.class).handler(new LoggingHandler());ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);future.sync().channel().closeFuture().sync(); // 断点1} catch (Exception e) {e.printStackTrace();log.debug("timeout");} finally {group.shutdownGracefully();}}}
1.2.2 SO_BACKLOG
属于 ServerSocketChannal 参数
- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
- 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
- sync queue - 半连接队列
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- accept queue - 全连接队列
- 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
- 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
netty 中可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小
1.2.3 TCP_NODELAY
1.2.4 SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
1.2.5 ALLOCATOR
属于 SocketChannal 参数
1.2.6 RCVBUF_ALLOCATOR
属于 SocketChannal 参数
控制 netty 接收缓冲区大小
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
2. RPC 案例
@Slf4jpublic class ClientRequestHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {// 注意这里需要从缓存中移除Promise<Object> promise = PROMISES.remove(msg.getSequenceId());if (promise != null) {Object returnValue = msg.getReturnValue();Throwable exceptionValue = msg.getExceptionValue();if (exceptionValue != null) {promise.setFailure(exceptionValue);} else {promise.setSuccess(returnValue);}}}}
@Slf4jpublic class RpcClient {private static final Channel channel;static {channel = initChannel();}private static final AtomicInteger REQUEST_COUNT = new AtomicInteger(0);@SuppressWarnings("all")public static <T> T getProxyService(Class<T> serviceInterface) {return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),new Class[]{serviceInterface},new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 1. 构造 rpc 请求int sequenceId = REQUEST_COUNT.incrementAndGet();RpcRequestMessage rpcRequestMessage = new RpcRequestMessage(sequenceId,serviceInterface.getName(),method.getName(),method.getReturnType(),method.getParameterTypes(),args);// 2. 发送 rpc 请求channel.writeAndFlush(rpcRequestMessage);// 3. 准备一个空 Promise 对象,来接收结果, 指定 promise 对象异步接收结果线程DefaultPromise<Object> promise = new DefaultPromise<>(channel.eventLoop());ClientRequestHandler.PROMISES.put(sequenceId, promise);// 4. 阻塞知道有结果promise.await();// 5. 此时已经获取到结果, 解析if (promise.isSuccess()) {// 正常返回return promise.getNow();} else {// 包装异常throw new RuntimeException(promise.cause());}}});}private static Channel initChannel() {NioEventLoopGroup group = new NioEventLoopGroup();MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProcotolFrameDecoder());ch.pipeline().addLast(MESSAGE_CODEC);// 业务处理ch.pipeline().addLast(new ClientRequestHandler());// 用来判断是不是[读空闲时间过长],或[写空闲时间过长]// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));// ChannelDuplexHandler 可以同时作为入站和出站处理器ch.pipeline().addLast(new ChannelDuplexHandler() {// 用来触发特殊事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;// 触发了写空闲事件if (event.state() == IdleState.WRITER_IDLE) {ctx.writeAndFlush(new PingMessage());}}});}});// 这里需要 sync 同步确保连接建立Channel channel = bootstrap.connect("localhost", 8080).sync().channel();// 这里需要异步监听 channel 关闭事件channel.closeFuture().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {group.shutdownGracefully();if (!future.isSuccess()) {Throwable cause = future.cause();log.error("客户端 channel 关闭异常", cause);}}});return channel;} catch (Exception e) {log.error("client init error", e);}return null;}}
public class RpcClientTest {public static void main(String[] args) {HelloService helloService = RpcClient.getProxyService(HelloService.class);System.out.println(helloService.echoName("张三"));System.out.println(helloService.getAge());}}
