netty客户端

类:NettyRemotingClient
下面是rocketmq的通讯的发送端

  1. Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
  2. // 禁用小数据发送
  3. .option(ChannelOption.TCP_NODELAY, true)
  4. // 长连接标示,客户端和服务端要保持一致,否则有问题
  5. .option(ChannelOption.SO_KEEPALIVE, false)
  6. // 链接超时(毫秒,默认3秒)
  7. .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
  8. // 发送端缓冲区大小(单位Byte,默认 65535,即64k)
  9. .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
  10. // 接收端缓冲区大小(单位Byte,默认 65535,即64k)
  11. .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
  12. .handler(new ChannelInitializer<SocketChannel>() {
  13. @Override
  14. public void initChannel(SocketChannel ch) throws Exception {
  15. ChannelPipeline pipeline = ch.pipeline();
  16. // 安全套接字
  17. if (nettyClientConfig.isUseTLS()) {
  18. if (null != sslContext) {
  19. pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
  20. log.info("Prepend SSL handler");
  21. } else {
  22. log.warn("Connections are insecure as SSLContext is null!");
  23. }
  24. }
  25. pipeline.addLast(
  26. defaultEventExecutorGroup,
  27. // 自定义的编码器
  28. new NettyEncoder(),
  29. // 自定义的解码器
  30. new NettyDecoder(),
  31. // 读写心跳检测,(单位秒,默认120秒,2分钟),2分钟没有读写则发起心跳配置
  32. new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
  33. // 实现心跳超时后的处理
  34. new NettyConnectManageHandler(),
  35. new NettyClientHandler());
  36. }
  37. });
  1. class NettyConnectManageHandler extends ChannelDuplexHandler {
  2. @Override
  3. public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
  4. ChannelPromise promise) throws Exception {
  5. final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
  6. final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
  7. log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
  8. super.connect(ctx, remoteAddress, localAddress, promise);
  9. if (NettyRemotingClient.this.channelEventListener != null) {
  10. NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
  11. }
  12. }
  13. @Override
  14. public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
  15. final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
  16. log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
  17. closeChannel(ctx.channel());
  18. super.disconnect(ctx, promise);
  19. if (NettyRemotingClient.this.channelEventListener != null) {
  20. NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
  21. }
  22. }
  23. @Override
  24. public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
  25. final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
  26. log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
  27. closeChannel(ctx.channel());
  28. super.close(ctx, promise);
  29. NettyRemotingClient.this.failFast(ctx.channel());
  30. if (NettyRemotingClient.this.channelEventListener != null) {
  31. NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
  32. }
  33. }
  34. @Override
  35. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  36. if (evt instanceof IdleStateEvent) {
  37. IdleStateEvent event = (IdleStateEvent) evt;
  38. if (event.state().equals(IdleState.ALL_IDLE)) {
  39. final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
  40. log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
  41. closeChannel(ctx.channel());
  42. if (NettyRemotingClient.this.channelEventListener != null) {
  43. NettyRemotingClient.this
  44. .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
  45. }
  46. }
  47. }
  48. ctx.fireUserEventTriggered(evt);
  49. }
  50. @Override
  51. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  52. final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
  53. log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
  54. log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
  55. closeChannel(ctx.channel());
  56. if (NettyRemotingClient.this.channelEventListener != null) {
  57. NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
  58. }
  59. }
  60. }
  1. class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
  2. @Override
  3. protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  4. processMessageReceived(ctx, msg);
  5. }
  6. }
  1. this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
  2. private AtomicInteger threadIndex = new AtomicInteger(0);
  3. @Override
  4. public Thread newThread(Runnable r) {
  5. return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
  6. }
  7. });

netty服务端

借鉴rocketmq

  1. if (useEpoll()) {
  2. // 默认boss线程池1个线程
  3. this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
  4. private AtomicInteger threadIndex = new AtomicInteger(0);
  5. @Override
  6. public Thread newThread(Runnable r) {
  7. return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
  8. }
  9. });
  10. this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
  11. private AtomicInteger threadIndex = new AtomicInteger(0);
  12. private int threadTotal = nettyServerConfig.getServerSelectorThreads();
  13. @Override
  14. public Thread newThread(Runnable r) {
  15. return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
  16. }
  17. });
  18. } else {
  19. // 默认boss线程池1个线程
  20. this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
  21. private AtomicInteger threadIndex = new AtomicInteger(0);
  22. @Override
  23. public Thread newThread(Runnable r) {
  24. return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
  25. }
  26. });
  27. // 默认worker线程为3个
  28. this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
  29. private AtomicInteger threadIndex = new AtomicInteger(0);
  30. private int threadTotal = nettyServerConfig.getServerSelectorThreads();
  31. @Override
  32. public Thread newThread(Runnable r) {
  33. return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
  34. }
  35. });
  36. }
  37. private boolean useEpoll() {
  38. return RemotingUtil.isLinuxPlatform()
  39. && nettyServerConfig.isUseEpollNativeSelector()
  40. // 这个函数是netty自带的
  41. && Epoll.isAvailable();
  42. }

linux操作系统判断

  1. public static boolean isLinuxPlatform() {
  2. return isLinuxPlatform;
  3. }
  4. // 操作系统的名字
  5. public static final String OS_NAME = System.getProperty("os.name");
  6. if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {
  7. isLinuxPlatform = true;
  8. }

netty使用epoll本地选择器

  1. /**
  2. * make make install
  3. *
  4. *
  5. * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
  6. * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
  7. */
  8. private boolean useEpollNativeSelector = false;

ServerBootStrap

类:NettyRemotingServer

  1. this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
  2. nettyServerConfig.getServerWorkerThreads(),
  3. new ThreadFactory() {
  4. private AtomicInteger threadIndex = new AtomicInteger(0);
  5. @Override
  6. public Thread newThread(Runnable r) {
  7. return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
  8. }
  9. });
  10. ServerBootstrap childHandler =
  11. this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
  12. .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
  13. // 链接队列大小,在同一瞬间服务端只能处理一个,更多的请求会先放到队列中,这个就是队列的大小
  14. .option(ChannelOption.SO_BACKLOG, 1024)
  15. // 这个参数表示允许重复使用本地地址和端口。比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题
  16. .option(ChannelOption.SO_REUSEADDR, true)
  17. // 下面有心跳了,这里可以设置为false
  18. .option(ChannelOption.SO_KEEPALIVE, false)
  19. // 禁止使用Nagle算法,使用于小数据即时传输
  20. .childOption(ChannelOption.TCP_NODELAY, true)
  21. // 发送端缓冲区大小(单位Byte,默认 65535,即64k)
  22. .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
  23. // 接收端缓冲区大小(单位Byte,默认 65535,即64k)
  24. .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
  25. .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
  26. .childHandler(new ChannelInitializer<SocketChannel>() {
  27. @Override
  28. public void initChannel(SocketChannel ch) throws Exception {
  29. ch.pipeline()
  30. .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
  31. .addLast(defaultEventExecutorGroup,
  32. new NettyEncoder(),
  33. new NettyDecoder(),
  34. // 心跳配置(默认,120秒,2分钟),2分钟没有读写则发起心跳配置
  35. new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
  36. connectionManageHandler,
  37. serverHandler
  38. );
  39. }
  40. });

参考:

IdelStateHandler这个的配置参考这里:https://www.yuque.com/simonalong/jishu/wd74q8
https://blog.csdn.net/u013967175/article/details/78591810
http://www.mamicode.com/info-detail-903786.html