JavaNetty

多实例来监听多端口

如果是多个实例监听多个端口,那么就会有多个bossGroup线程池(workGroup也是多个),这种通常是用在海量客户端接入时使用,如果客户端数量不多,其实也没有必要使用这种方式。

单实例监听多端口

利用Reactor主从模式,可以充分利用bossGroup线程池。

  1. import javax.annotation.PreDestroy;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import io.netty.bootstrap.ServerBootstrap;
  5. import io.netty.channel.ChannelFuture;
  6. import io.netty.channel.ChannelFutureListener;
  7. import io.netty.channel.ChannelOption;
  8. import io.netty.channel.EventLoopGroup;
  9. import io.netty.channel.epoll.EpollChannelOption;
  10. import io.netty.channel.epoll.EpollEventLoopGroup;
  11. import io.netty.channel.epoll.EpollServerSocketChannel;
  12. import io.netty.channel.nio.NioEventLoopGroup;
  13. import io.netty.channel.socket.nio.NioServerSocketChannel;
  14. import io.netty.handler.logging.LogLevel;
  15. import io.netty.handler.logging.LoggingHandler;
  16. /**
  17. * NioEventLoopGroup → EpollEventLoopGroup
  18. NioEventLoop → EpollEventLoop
  19. NioServerSocketChannel → EpollServerSocketChannel
  20. NioSocketChannel → EpollSocketChannel
  21. @Component
  22. */
  23. public class NettyServer{
  24. private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
  25. ServerBootstrap serverBootstrap = new ServerBootstrap();
  26. EventLoopGroup boss =null;
  27. EventLoopGroup worker =null;
  28. ChannelFuture future = null;
  29. //厂商编码
  30. Integer factoryCode=null;
  31. boolean epoll=true;
  32. int port;
  33. public NettyServer(Integer fc,int port){
  34. this.factoryCode=fc;
  35. this.port=port;
  36. }
  37. @PreDestroy
  38. public void stop(){
  39. if(future!=null){
  40. future.channel().close().addListener(ChannelFutureListener.CLOSE);
  41. future.awaitUninterruptibly();
  42. boss.shutdownGracefully();
  43. worker.shutdownGracefully();
  44. future=null;
  45. logger.info(" 服务关闭 ");
  46. }
  47. }
  48. public void start(){
  49. logger.info(" nettyServer 正在启动");
  50. if(epoll){
  51. logger.info(" nettyServer 使用epoll模式");
  52. boss = new EpollEventLoopGroup();
  53. worker = new EpollEventLoopGroup();
  54. }
  55. else{
  56. logger.info(" nettyServer 使用nio模式");
  57. boss = new NioEventLoopGroup();
  58. worker = new NioEventLoopGroup();
  59. }
  60. logger.info("netty服务器在["+this.port+"]端口启动监听");
  61. serverBootstrap.group(boss,worker)
  62. .option(ChannelOption.SO_BACKLOG,1024)
  63. .option(EpollChannelOption.SO_REUSEPORT, true)
  64. .handler(new LoggingHandler(LogLevel.INFO))
  65. .option(ChannelOption.TCP_NODELAY,true)
  66. .childOption(ChannelOption.SO_KEEPALIVE,true)
  67. .childHandler(new NettyServerInitializer());
  68. if(epoll){
  69. serverBootstrap.channel(EpollServerSocketChannel.class);
  70. }else{
  71. serverBootstrap.channel(NioServerSocketChannel.class);
  72. }
  73. try{
  74. future = serverBootstrap.bind(8765).sync();
  75. future2 = serverBootstrap.bind(8766).sync();
  76. future.channel().closeFuture().addListener(new ChannelFutureListener()
  77. {
  78. @Override public void operationComplete(ChannelFuture future) throws Exception
  79. { //通过回调只关闭自己监听的channel
  80. future.channel().close();
  81. }
  82. });
  83. future2.channel().closeFuture().addListener(new ChannelFutureListener()
  84. {
  85. @Override public void operationComplete(ChannelFuture future) throws Exception
  86. {
  87. future.channel().close();
  88. }
  89. });
  90. // 等待服务端监听端口关闭
  91. // future.channel().closeFuture().sync();
  92. }catch (Exception e){
  93. logger.info("nettyServer 启动时发生异常---------------{}",e);
  94. logger.info(e.getMessage());
  95. }finally {
  96. //这里一定要注释掉,因为上面没有阻塞了,不注释的话,这里会直接关闭的
  97. //boss.shutdownGracefully();
  98. //worker.shutdownGracefully();
  99. }
  100. }
  101. }

在监听多个端口后,每个端口对应一种数据协议,在客户端channel初始化时,可以根据channel绑定的端口动态的选择解码器。

  1. import java.net.InetAddress;
  2. import java.net.InetSocketAddress;
  3. import java.util.concurrent.TimeUnit;
  4. import org.xxx.android.netty.NettyConstants;
  5. import io.netty.channel.ChannelInitializer;
  6. import io.netty.channel.ChannelPipeline;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.handler.codec.bytes.ByteArrayEncoder;
  9. import io.netty.handler.codec.string.StringEncoder;
  10. import io.netty.handler.timeout.IdleStateHandler;
  11. public class NettyServerInitializer extends ChannelInitializer<SocketChannel>{
  12. @Override
  13. protected void initChannel(SocketChannel socketChannel) throws Exception {
  14. ChannelPipeline pipeline = socketChannel.pipeline();
  15. pipeline.addLast(new IdleStateHandler(
  16. NettyConstants.SERVER_READ_IDEL_TIME_OUT,
  17. NettyConstants.SERVER_WRITE_IDEL_TIME_OUT,
  18. NettyConstants.SERVER_ALL_IDEL_TIME_OUT,
  19. TimeUnit.SECONDS));
  20. pipeline.addLast(new AcceptorIdleStateTrigger());
  21. pipeline.addLast(new StringEncoder());
  22. pipeline.addLast(new ByteArrayEncoder());
  23. int localPort = socketChannel.localAddress().getPort();
  24. //根据端口动态的选择解码器
  25. pipeline.addLast(new NettyServerDecoder(localPort));
  26. pipeline.addLast(new NettyServerHandler());
  27. }
  28. }