netty客户端
类:NettyRemotingClient
下面是rocketmq的通讯的发送端
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// 禁用小数据发送.option(ChannelOption.TCP_NODELAY, true)// 长连接标示,客户端和服务端要保持一致,否则有问题.option(ChannelOption.SO_KEEPALIVE, false)// 链接超时(毫秒,默认3秒).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())// 发送端缓冲区大小(单位Byte,默认 65535,即64k).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())// 接收端缓冲区大小(单位Byte,默认 65535,即64k).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 安全套接字if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));log.info("Prepend SSL handler");} else {log.warn("Connections are insecure as SSLContext is null!");}}pipeline.addLast(defaultEventExecutorGroup,// 自定义的编码器new NettyEncoder(),// 自定义的解码器new NettyDecoder(),// 读写心跳检测,(单位秒,默认120秒,2分钟),2分钟没有读写则发起心跳配置new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),// 实现心跳超时后的处理new NettyConnectManageHandler(),new NettyClientHandler());}});
class NettyConnectManageHandler extends ChannelDuplexHandler {@Overridepublic void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) throws Exception {final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);super.connect(ctx, remoteAddress, localAddress, promise);if (NettyRemotingClient.this.channelEventListener != null) {NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));}}@Overridepublic void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);closeChannel(ctx.channel());super.disconnect(ctx, promise);if (NettyRemotingClient.this.channelEventListener != null) {NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));}}@Overridepublic void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);closeChannel(ctx.channel());super.close(ctx, promise);NettyRemotingClient.this.failFast(ctx.channel());if (NettyRemotingClient.this.channelEventListener != null) {NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state().equals(IdleState.ALL_IDLE)) {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);closeChannel(ctx.channel());if (NettyRemotingClient.this.channelEventListener != null) {NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));}}}ctx.fireUserEventTriggered(evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);closeChannel(ctx.channel());if (NettyRemotingClient.this.channelEventListener != null) {NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));}}}
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {processMessageReceived(ctx, msg);}}
this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));}});
netty服务端
借鉴rocketmq
if (useEpoll()) {// 默认boss线程池1个线程this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {// 默认boss线程池1个线程this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});// 默认worker线程为3个this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}private boolean useEpoll() {return RemotingUtil.isLinuxPlatform()&& nettyServerConfig.isUseEpollNativeSelector()// 这个函数是netty自带的&& Epoll.isAvailable();}
linux操作系统判断
public static boolean isLinuxPlatform() {return isLinuxPlatform;}// 操作系统的名字public static final String OS_NAME = System.getProperty("os.name");if (OS_NAME != null && OS_NAME.toLowerCase().contains("linux")) {isLinuxPlatform = true;}
netty使用epoll本地选择器
/*** make make install*** ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd*/private boolean useEpollNativeSelector = false;
ServerBootStrap
类:NettyRemotingServer
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 链接队列大小,在同一瞬间服务端只能处理一个,更多的请求会先放到队列中,这个就是队列的大小.option(ChannelOption.SO_BACKLOG, 1024)// 这个参数表示允许重复使用本地地址和端口。比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题.option(ChannelOption.SO_REUSEADDR, true)// 下面有心跳了,这里可以设置为false.option(ChannelOption.SO_KEEPALIVE, false)// 禁止使用Nagle算法,使用于小数据即时传输.childOption(ChannelOption.TCP_NODELAY, true)// 发送端缓冲区大小(单位Byte,默认 65535,即64k).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())// 接收端缓冲区大小(单位Byte,默认 65535,即64k).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,new NettyEncoder(),new NettyDecoder(),// 心跳配置(默认,120秒,2分钟),2分钟没有读写则发起心跳配置new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});
参考:
IdelStateHandler这个的配置参考这里:https://www.yuque.com/simonalong/jishu/wd74q8
https://blog.csdn.net/u013967175/article/details/78591810
http://www.mamicode.com/info-detail-903786.html
