一、NettyServer是什么?有什么作用?
NettyServer是管理netty服务生命周期的类,负责Netty服务的创建、关闭、通道管理
二、NettyServer类的继承关系
三、重要属性
属性列表1:这是NettyServer本身属性
//缓存活着的工作通道,key: ip:port,value: channel
private Map<String, Channel> channels;
//netty的ServerBootstrap实例
private ServerBootstrap bootstrap;
//the boss channel that receive connections and dispatch these to worker channel
//接收连接并且成功连接的通道转发给工作组通道去处理的boss组通道
private io.netty.channel.Channel channel;
//boss线程组,处理连接事件
private EventLoopGroup bossGroup;
//worker线程组,处理读写事件
private EventLoopGroup workerGroup;
四、重要方法
1、构造方法:构造通道事件处理器
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
2、doOpen:初始化并且启动netty服务
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
//netty的boss/worker线程组,前者负责socket的连接事件,后者负责socket的读写事件
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true));
//netty通道的处理器,通道上的读写连接事件均由该类处理
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //选择io模式,这里选择Nio
//tcp消息延迟设置,为true表示应用层一有消息便发送出去,而不必等到socket的输出缓冲区满了之后再发送
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
//??含义记不清了,待查证
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
//内存使用策略,池化的直接内存,池化:相当于缓存池,内存使用之后不会立即销毁,提高性能
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
//消息编解码
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
//设置业务处理器,nettyServerHandler是实际处理socket事件的地方
.addLast("handler", nettyServerHandler);
}
});
//绑定接口,服务端开启了监听
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
3、doClose:服务关闭
@Override
protected void doClose() throws Throwable {
try {
if (channel != null) {
//关闭boss组连接通道
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
//遍历工作组通道,并关闭
Collection<org.apache.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (org.apache.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
//优雅关停netty服务
if (bootstrap != null) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
4、getChannels:获取有效的通道
public Collection<Channel> getChannels() {
Collection<Channel> chs = new HashSet<Channel>();
for (Channel channel : this.channels.values()) {
if (channel.isConnected()) {
chs.add(channel);
} else {
channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
}
}
return chs;
}
5、getChannel:获取指定地址的通道实例
public Channel getChannel(InetSocketAddress remoteAddress) {
return channels.get(NetUtils.toAddressString(remoteAddress));
}