一、NettyServer是什么?有什么作用?

NettyServer是管理netty服务生命周期的类,负责Netty服务的创建、关闭、通道管理

二、NettyServer类的继承关系

dubbo之NettyServer源码详解 - 图1

三、重要属性

  1. 属性列表1:这是NettyServer本身属性
  2. //缓存活着的工作通道,key: ip:port,value: channel
  3. private Map<String, Channel> channels;
  4. //netty的ServerBootstrap实例
  5. private ServerBootstrap bootstrap;
  6. //the boss channel that receive connections and dispatch these to worker channel
  7. //接收连接并且成功连接的通道转发给工作组通道去处理的boss组通道
  8. private io.netty.channel.Channel channel;
  9. //boss线程组,处理连接事件
  10. private EventLoopGroup bossGroup;
  11. //worker线程组,处理读写事件
  12. private EventLoopGroup workerGroup;

四、重要方法

1、构造方法:构造通道事件处理器

  1. public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
  2. // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
  3. // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
  4. super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
  5. }

2、doOpen:初始化并且启动netty服务

  1. protected void doOpen() throws Throwable {
  2. bootstrap = new ServerBootstrap();
  3. //netty的boss/worker线程组,前者负责socket的连接事件,后者负责socket的读写事件
  4. bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
  5. workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true));
  6. //netty通道的处理器,通道上的读写连接事件均由该类处理
  7. final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
  8. channels = nettyServerHandler.getChannels();
  9. bootstrap.group(bossGroup, workerGroup)
  10. .channel(NioServerSocketChannel.class) //选择io模式,这里选择Nio
  11. //tcp消息延迟设置,为true表示应用层一有消息便发送出去,而不必等到socket的输出缓冲区满了之后再发送
  12. .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
  13. //??含义记不清了,待查证
  14. .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
  15. //内存使用策略,池化的直接内存,池化:相当于缓存池,内存使用之后不会立即销毁,提高性能
  16. .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
  17. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  18. @Override
  19. protected void initChannel(NioSocketChannel ch) throws Exception {
  20. // FIXME: should we use getTimeout()?
  21. int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
  22. //消息编解码
  23. NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
  24. ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
  25. .addLast("decoder", adapter.getDecoder())
  26. .addLast("encoder", adapter.getEncoder())
  27. .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
  28. //设置业务处理器,nettyServerHandler是实际处理socket事件的地方
  29. .addLast("handler", nettyServerHandler);
  30. }
  31. });
  32. //绑定接口,服务端开启了监听
  33. ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
  34. channelFuture.syncUninterruptibly();
  35. channel = channelFuture.channel();
  36. }

3、doClose:服务关闭

  1. @Override
  2. protected void doClose() throws Throwable {
  3. try {
  4. if (channel != null) {
  5. //关闭boss组连接通道
  6. channel.close();
  7. }
  8. } catch (Throwable e) {
  9. logger.warn(e.getMessage(), e);
  10. }
  11. try {
  12. //遍历工作组通道,并关闭
  13. Collection<org.apache.dubbo.remoting.Channel> channels = getChannels();
  14. if (channels != null && channels.size() > 0) {
  15. for (org.apache.dubbo.remoting.Channel channel : channels) {
  16. try {
  17. channel.close();
  18. } catch (Throwable e) {
  19. logger.warn(e.getMessage(), e);
  20. }
  21. }
  22. }
  23. } catch (Throwable e) {
  24. logger.warn(e.getMessage(), e);
  25. }
  26. try {
  27. //优雅关停netty服务
  28. if (bootstrap != null) {
  29. bossGroup.shutdownGracefully();
  30. workerGroup.shutdownGracefully();
  31. }
  32. } catch (Throwable e) {
  33. logger.warn(e.getMessage(), e);
  34. }
  35. try {
  36. if (channels != null) {
  37. channels.clear();
  38. }
  39. } catch (Throwable e) {
  40. logger.warn(e.getMessage(), e);
  41. }
  42. }

4、getChannels:获取有效的通道

  1. public Collection<Channel> getChannels() {
  2. Collection<Channel> chs = new HashSet<Channel>();
  3. for (Channel channel : this.channels.values()) {
  4. if (channel.isConnected()) {
  5. chs.add(channel);
  6. } else {
  7. channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
  8. }
  9. }
  10. return chs;
  11. }

5、getChannel:获取指定地址的通道实例

  1. public Channel getChannel(InetSocketAddress remoteAddress) {
  2. return channels.get(NetUtils.toAddressString(remoteAddress));
  3. }