Netty服务端
Netty服务端的网络数据传输依靠的还是IO多路复用模型以及reactor反应器模式,其中主要包括四个部分:
- 创建一个服务端启动器
- 创建反应器组,包括监听反应器组以及IO事件的反应器组
- 在启动器中设置通道的IO类型(NioServerSocketChannel),其中NIOServerSocketChannel负责服务器连接监听和接收称为父通道;而NioSocketChannel通道称之为子通道;
- 设置传输通道的配置选项(比如说心跳机制啊、Nagle算法等)
- 装配子通道的Pipeline流水线(实则是一个双向链表,需要将业务处理器实例加入到双向链表中)
- 开始绑定服务器新连接的监听端口 (主机名以及端口号都需要)
- 自我阻塞,等待通道关闭连接
最后需要释放资源,关闭反应器组,具体代码如下:
public void start() {
CustomShutdownHook.getCustomShutdownHook().clearAll();
String host = InetAddress.getLocalHost().getHostAddress();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
// 当客户端第一次进行请求的时候才会进行初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 30 秒之内没有收到客户端请求的话就关闭连接
ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
ch.pipeline().addLast(new NettyServerHandler());
}
});
// 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(host, PORT).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("occur exception when start server:", e);
} finally {
log.error("shutdown bossGroup and workerGroup");
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
Handler业务处理器
IO处理的操作包括:从通道读取数据包、数据包解码、业务处理、目标数据编码、将数据包写入到通道、由通道发送到对端;
Netty中主要依靠继承ChannelInBoundHandlerAdapter通道入站处理器,主要是重写channelRead方法public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final RpcRequestHandler rpcRequestHandler;
public NettyServerHandler() {
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
log.info("server receive msg: [{}] ", msg);
RpcRequest rpcRequest = (RpcRequest) msg;
if (rpcRequest.getRpcMessageType() == RpcMessageType.HEART_BEAT) {
log.info("receive heat beat msg from client");
return;
}
// Execute the target method (the method the client needs to execute) and return the method result
Object result = rpcRequestHandler.handle(rpcRequest);
log.info(String.format("server get result: %s", result.toString()));
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCode.FAIL);
ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
log.error("not writable now, message dropped");
}
} finally {
// 确保bytebuf已经释放,反之可能会出现内存泄漏等问题
ReferenceCountUtil.release(msg);
}
}
}
Netty客户端
客户端的创建与服务端比较类似,代码如下: ```java public final class NettyClient { private final Bootstrap bootstrap; private final EventLoopGroup eventLoopGroup;
// initialize resources such as EventLoopGroup, Bootstrap public NettyClient() {
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
Serializer kryoSerializer = ExtensionLoader.getExtensionLoader(Serializer.class).getExtension("kyro");
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
// The timeout period of the connection.
// If this time is exceeded or the connection cannot be established, the connection fails.
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// If no data is sent to the server within 15 seconds, a heartbeat request is sent
ch.pipeline().addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
/*
config custom serialization codec
*/
// RpcResponse -> ByteBuf
ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
// ByteBuf -> RpcRequest
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new NettyClientHandler());
}
});
}
/**
* connect server and get the channel ,so that you can send rpc message to server
*
* @param inetSocketAddress server address
* @return the channel
*/
@SneakyThrows
public Channel doConnect(InetSocketAddress inetSocketAddress) {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("The client has connected [{}] successful!", inetSocketAddress.toString());
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}
public void close() {
eventLoopGroup.shutdownGracefully();
}
} ```