Netty服务端

Netty服务端的网络数据传输依靠的还是IO多路复用模型以及reactor反应器模式,其中主要包括四个部分:

  1. 网络通道
  2. selector选择器
  3. 反应器:一个反应器对应一个选择器,用户捕获通道
  4. handler处理器

    如何将这些单独的组件组装起来,Netty提供了一个工厂类Bootstrap

    具体启动流程如下:
  • 创建一个服务端启动器
  • 创建反应器组,包括监听反应器组以及IO事件的反应器组
  • 在启动器中设置通道的IO类型(NioServerSocketChannel),其中NIOServerSocketChannel负责服务器连接监听和接收称为父通道;而NioSocketChannel通道称之为子通道;
  • 设置传输通道的配置选项(比如说心跳机制啊、Nagle算法等)
  • 装配子通道的Pipeline流水线(实则是一个双向链表,需要将业务处理器实例加入到双向链表中)
  • 开始绑定服务器新连接的监听端口 (主机名以及端口号都需要)
  • 自我阻塞,等待通道关闭连接
  • 最后需要释放资源,关闭反应器组,具体代码如下:

    1. public void start() {
    2. CustomShutdownHook.getCustomShutdownHook().clearAll();
    3. String host = InetAddress.getLocalHost().getHostAddress();
    4. EventLoopGroup bossGroup = new NioEventLoopGroup();
    5. EventLoopGroup workerGroup = new NioEventLoopGroup();
    6. try {
    7. ServerBootstrap b = new ServerBootstrap();
    8. b.group(bossGroup, workerGroup)
    9. .channel(NioServerSocketChannel.class)
    10. // TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
    11. .childOption(ChannelOption.TCP_NODELAY, true)
    12. // 是否开启 TCP 底层心跳机制
    13. .childOption(ChannelOption.SO_KEEPALIVE, true)
    14. //表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
    15. .option(ChannelOption.SO_BACKLOG, 128)
    16. .handler(new LoggingHandler(LogLevel.INFO))
    17. // 当客户端第一次进行请求的时候才会进行初始化
    18. .childHandler(new ChannelInitializer<SocketChannel>() {
    19. @Override
    20. protected void initChannel(SocketChannel ch) {
    21. // 30 秒之内没有收到客户端请求的话就关闭连接
    22. ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
    23. ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
    24. ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
    25. ch.pipeline().addLast(new NettyServerHandler());
    26. }
    27. });
    28. // 绑定端口,同步等待绑定成功
    29. ChannelFuture f = b.bind(host, PORT).sync();
    30. // 等待服务端监听端口关闭
    31. f.channel().closeFuture().sync();
    32. } catch (InterruptedException e) {
    33. log.error("occur exception when start server:", e);
    34. } finally {
    35. log.error("shutdown bossGroup and workerGroup");
    36. bossGroup.shutdownGracefully();
    37. workerGroup.shutdownGracefully();
    38. }

    Handler业务处理器

    image.png
    IO处理的操作包括:从通道读取数据包、数据包解码、业务处理、目标数据编码、将数据包写入到通道、由通道发送到对端;
    Netty中主要依靠继承ChannelInBoundHandlerAdapter通道入站处理器,主要是重写channelRead方法

    1. public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    2. private final RpcRequestHandler rpcRequestHandler;
    3. public NettyServerHandler() {
    4. this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
    5. }
    6. @Override
    7. public void channelRead(ChannelHandlerContext ctx, Object msg) {
    8. try {
    9. log.info("server receive msg: [{}] ", msg);
    10. RpcRequest rpcRequest = (RpcRequest) msg;
    11. if (rpcRequest.getRpcMessageType() == RpcMessageType.HEART_BEAT) {
    12. log.info("receive heat beat msg from client");
    13. return;
    14. }
    15. // Execute the target method (the method the client needs to execute) and return the method result
    16. Object result = rpcRequestHandler.handle(rpcRequest);
    17. log.info(String.format("server get result: %s", result.toString()));
    18. if (ctx.channel().isActive() && ctx.channel().isWritable()) {
    19. RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
    20. ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    21. } else {
    22. RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCode.FAIL);
    23. ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    24. log.error("not writable now, message dropped");
    25. }
    26. } finally {
    27. // 确保bytebuf已经释放,反之可能会出现内存泄漏等问题
    28. ReferenceCountUtil.release(msg);
    29. }
    30. }
    31. }

    Netty客户端

    客户端的创建与服务端比较类似,代码如下: ```java public final class NettyClient { private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup;

    // initialize resources such as EventLoopGroup, Bootstrap public NettyClient() {

    1. eventLoopGroup = new NioEventLoopGroup();
    2. bootstrap = new Bootstrap();
    3. Serializer kryoSerializer = ExtensionLoader.getExtensionLoader(Serializer.class).getExtension("kyro");
    4. bootstrap.group(eventLoopGroup)
    5. .channel(NioSocketChannel.class)
    6. .handler(new LoggingHandler(LogLevel.INFO))
    7. // The timeout period of the connection.
    8. // If this time is exceeded or the connection cannot be established, the connection fails.
    9. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
    10. .handler(new ChannelInitializer<SocketChannel>() {
    11. @Override
    12. protected void initChannel(SocketChannel ch) {
    13. // If no data is sent to the server within 15 seconds, a heartbeat request is sent
    14. ch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
    15. /*
    16. config custom serialization codec
    17. */
    18. // RpcResponse -> ByteBuf
    19. ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
    20. // ByteBuf -> RpcRequest
    21. ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
    22. ch.pipeline().addLast(new NettyClientHandler());
    23. }
    24. });

    }

/**
 * connect server and get the channel ,so that you can send rpc message to server
 *
 * @param inetSocketAddress server address
 * @return the channel
 */
@SneakyThrows
public Channel doConnect(InetSocketAddress inetSocketAddress) {
    CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
    bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
        if (future.isSuccess()) {
            log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
            completableFuture.complete(future.channel());
        } else {
            throw new IllegalStateException();
        }
    });
    return completableFuture.get();
}

public void close() {
    eventLoopGroup.shutdownGracefully();
}

} ```