Netty服务端
Netty服务端的网络数据传输依靠的还是IO多路复用模型以及reactor反应器模式,其中主要包括四个部分:
- 创建一个服务端启动器
- 创建反应器组,包括监听反应器组以及IO事件的反应器组
- 在启动器中设置通道的IO类型(NioServerSocketChannel),其中NIOServerSocketChannel负责服务器连接监听和接收称为父通道;而NioSocketChannel通道称之为子通道;
- 设置传输通道的配置选项(比如说心跳机制啊、Nagle算法等)
- 装配子通道的Pipeline流水线(实则是一个双向链表,需要将业务处理器实例加入到双向链表中)
- 开始绑定服务器新连接的监听端口 (主机名以及端口号都需要)
- 自我阻塞,等待通道关闭连接
最后需要释放资源,关闭反应器组,具体代码如下:
public void start() {CustomShutdownHook.getCustomShutdownHook().clearAll();String host = InetAddress.getLocalHost().getHostAddress();EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。.childOption(ChannelOption.TCP_NODELAY, true)// 是否开启 TCP 底层心跳机制.childOption(ChannelOption.SO_KEEPALIVE, true)//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数.option(ChannelOption.SO_BACKLOG, 128).handler(new LoggingHandler(LogLevel.INFO))// 当客户端第一次进行请求的时候才会进行初始化.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// 30 秒之内没有收到客户端请求的话就关闭连接ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));ch.pipeline().addLast(new NettyServerHandler());}});// 绑定端口,同步等待绑定成功ChannelFuture f = b.bind(host, PORT).sync();// 等待服务端监听端口关闭f.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("occur exception when start server:", e);} finally {log.error("shutdown bossGroup and workerGroup");bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
Handler业务处理器

IO处理的操作包括:从通道读取数据包、数据包解码、业务处理、目标数据编码、将数据包写入到通道、由通道发送到对端;
Netty中主要依靠继承ChannelInBoundHandlerAdapter通道入站处理器,主要是重写channelRead方法public class NettyServerHandler extends ChannelInboundHandlerAdapter {private final RpcRequestHandler rpcRequestHandler;public NettyServerHandler() {this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {try {log.info("server receive msg: [{}] ", msg);RpcRequest rpcRequest = (RpcRequest) msg;if (rpcRequest.getRpcMessageType() == RpcMessageType.HEART_BEAT) {log.info("receive heat beat msg from client");return;}// Execute the target method (the method the client needs to execute) and return the method resultObject result = rpcRequestHandler.handle(rpcRequest);log.info(String.format("server get result: %s", result.toString()));if (ctx.channel().isActive() && ctx.channel().isWritable()) {RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCode.FAIL);ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);log.error("not writable now, message dropped");}} finally {// 确保bytebuf已经释放,反之可能会出现内存泄漏等问题ReferenceCountUtil.release(msg);}}}
Netty客户端
客户端的创建与服务端比较类似,代码如下: ```java public final class NettyClient { private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup;
// initialize resources such as EventLoopGroup, Bootstrap public NettyClient() {
eventLoopGroup = new NioEventLoopGroup();bootstrap = new Bootstrap();Serializer kryoSerializer = ExtensionLoader.getExtensionLoader(Serializer.class).getExtension("kyro");bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))// The timeout period of the connection.// If this time is exceeded or the connection cannot be established, the connection fails..option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {// If no data is sent to the server within 15 seconds, a heartbeat request is sentch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));/*config custom serialization codec*/// RpcResponse -> ByteBufch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));// ByteBuf -> RpcRequestch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));ch.pipeline().addLast(new NettyClientHandler());}});
}
/**
* connect server and get the channel ,so that you can send rpc message to server
*
* @param inetSocketAddress server address
* @return the channel
*/
@SneakyThrows
public Channel doConnect(InetSocketAddress inetSocketAddress) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}
public void close() {
eventLoopGroup.shutdownGracefully();
}
} ```
