在Netty优化章节中介绍过可以使用线程组优化 WorkGroup 的执行效率,但在高并发环境下,应采使用 disruptor 对业务逻辑进行处理
Netty基础服务搭建:
基于Jboss编解码器案例进行修改,创建两个 SpringBoot 项目,分别为Netty客户端与服务端,服务端接收到客户端发送的Java实体类消息后创建新消息返回给客户端
Common端:
Pom依赖:
<dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>provided</scope></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-codec</artifactId><version>4.1.76.Final</version><scope>compile</scope></dependency><!-- Netty Java序列化框架marshalling --><dependency><groupId>org.jboss.marshalling</groupId><artifactId>jboss-marshalling</artifactId><version>1.3.0.CR9</version></dependency><dependency><groupId>org.jboss.marshalling</groupId><artifactId>jboss-marshalling-serial</artifactId><version>1.3.0.CR9</version></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.7</version><scope>compile</scope></dependency></dependencies>
实体类:
package entity;import lombok.Data;import lombok.experimental.Accessors;import java.io.Serializable;/* Netty 传输数据实体类 */@Data@Accessors(chain = true)public class TranslatorData implements Serializable {private String id;private String name;private String message; //传输消息具体内容}
Java实体类编解码器:
package factory;import io.netty.handler.codec.marshalling.*;import org.jboss.marshalling.MarshallerFactory;import org.jboss.marshalling.Marshalling;import org.jboss.marshalling.MarshallingConfiguration;/*** Marshalling工厂,Java实体类编解码器*/public final class MarshallingCodeCFactory {/*** 创建Jboss Marshalling解码器MarshallingDecoder* @return MarshallingDecoder*/public static MarshallingDecoder buildMarshallingDecoder() {//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");//创建了MarshallingConfiguration对象,配置了版本号为5final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);//根据marshallerFactory和configuration创建providerUnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);return decoder;}/*** 创建Jboss Marshalling编码器MarshallingEncoder* @return MarshallingEncoder*/public static MarshallingEncoder buildMarshallingEncoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组MarshallingEncoder encoder = new MarshallingEncoder(provider);return encoder;}}
服务端:
服务端核心代码:
package dmbjz.server;import factory.MarshallingCodeCFactory;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.marshalling.MarshallingEncoder;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;/* Netty服务端 */@Component@Slf4jpublic class NettyServer {//创建单例模式private static class SingletionWsServer{static final NettyServer instance = new NettyServer();}//返回单例模式对象public static NettyServer getInstance(){return SingletionWsServer.instance;}private EventLoopGroup bossGroup;private EventLoopGroup workGroup;private ServerBootstrap bootstrap;private ChannelFuture channelFuture;/* 构造方法内进行初始化 */public NettyServer(){bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,1024)//缓存区自适应,最好可以进行手动计算Bytebuf数据包大小来设置缓存区空间.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)//开启日志记录.handler(new LoggingHandler(LogLevel.DEBUG))//回调方法.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//添加Java实体类编解码器pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());//添加服务端Handlerpipeline.addLast(new ServerHandler());}});}public void start(){//额外线程启动Netty而不是在Main方法中启动,因此无需异步//使用Spring容器进行托管,也无需进行关闭this.channelFuture = bootstrap.bind(8765);log.warn("Netty WebSocket Server启动完毕...");}}
服务端Handler:
package dmbjz.server;import entity.TranslatorData;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** 服务端Handler */public class ServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("通道建立成功");super.channelActive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {TranslatorData info = (TranslatorData) msg;System.out.println("ID:"+info.getId()+",名称:"+info.getName()+",消息:"+info.getMessage());//创建新数据TranslatorData response = new TranslatorData();response.setId("resp: " + info.getId());response.setName("resp: " + info.getName());response.setMessage("resp: " + info.getMessage());//返回数据ctx.writeAndFlush(response);}}
SpringBoot监听执行:
package dmbjz;import dmbjz.server.NettyServer;import org.springframework.context.ApplicationListener;import org.springframework.context.event.ContextRefreshedEvent;import org.springframework.stereotype.Component;/* Spring加载完成后进行监听执行 */@Componentpublic class NettyBoot implements ApplicationListener<ContextRefreshedEvent> {/* 当一个ApplicationContext被初始化或刷新触发 */@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//判断当前是否为spring容器初始化完成,防止重复执行if(event.getApplicationContext().getParent() == null){try {NettyServer.getInstance().start();} catch (Exception e) {e.printStackTrace();}}}}
客户端:
客户端代码:
package com.dmbjz.client;import entity.TranslatorData;import factory.MarshallingCodeCFactory;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class NettyClient {private Channel channel;public NettyClient() {this.connect();}private void connect() {EventLoopGroup workGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();try {bootstrap.group(workGroup).channel(NioSocketChannel.class)//表示缓存区动态调配(自适应).option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT).handler(new LoggingHandler(LogLevel.INFO)).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {ChannelPipeline pipeline = sc.pipeline();//添加Java实体类编解码器pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());//添加自定义处理器pipeline.addLast(new ClientHandler());}});//绑定端口,同步等等请求连接ChannelFuture future = bootstrap.connect("127.0.0.1", 8765).sync();System.out.println("客户端开始连接....");//进行数据的发送, 但是首先我们要获取channel:this.channel = future.channel();} catch (InterruptedException e) {e.printStackTrace();}}/* 数据发送方法 */public void sendData(){for(int i =0; i <10; i++){TranslatorData request = new TranslatorData();request.setId("" + i);request.setName("请求消息名称 " + i);request.setMessage("请求消息内容 " + i);this.channel.writeAndFlush(request);}}}
客户端Handler:
package com.dmbjz.client;import entity.TranslatorData;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {TranslatorData response = (TranslatorData)msg;System.err.println("Client端: id= " + response.getId()+ ", name= " + response.getName()+ ", message= " + response.getMessage());} finally {//释放缓存ReferenceCountUtil.release(msg);}}}
测试:


整合 Disruptor:
每个用户创建出唯一ID,将服务端与消费端需要处理的业务逻辑交给 disruptor 的生产者,消费者通过唯一ID获取到用户在客户端传递的数据进行业务处理

Common端:
消费者:
package disruptor;import com.lmax.disruptor.WorkHandler;/* 消费者 */public abstract class MessageConsumer implements WorkHandler<TranslatorDataWrapper> {protected String consumerId;public MessageConsumer(String consumerId) {this.consumerId = consumerId;}public String getConsumerId() {return consumerId;}public void setConsumerId(String consumerId) {this.consumerId = consumerId;}}
生产者:
package disruptor;import com.lmax.disruptor.RingBuffer;import entity.TranslatorData;import io.netty.channel.ChannelHandlerContext;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;/* 生产者 */@NoArgsConstructor@AllArgsConstructor@Getter@Setterpublic class MessageProductor {private String producerId;private RingBuffer<TranslatorDataWrapper> ringBuffer;/* 发布数据到 RingBuffer */public void onData(TranslatorData data, ChannelHandlerContext ctx) {long sequence = ringBuffer.next();try {TranslatorDataWrapper wapper = ringBuffer.get(sequence);wapper.setTranslatorData(data);wapper.setCtx(ctx);} finally {ringBuffer.publish(sequence);}}}
传输实体类:
package disruptor;import entity.TranslatorData;import io.netty.channel.ChannelHandlerContext;import lombok.Data;import lombok.NoArgsConstructor;/* Disruptor 传递的数据实体类 */@Data@NoArgsConstructorpublic class TranslatorDataWrapper {private TranslatorData translatorData; //数据实体类private ChannelHandlerContext ctx; //Netty使用的Ctx对象}
失败操作:
package disruptor;import com.lmax.disruptor.ExceptionHandler;/* 事件处理失败时的操作 */public class EventExceptionHandler implements ExceptionHandler<TranslatorDataWrapper> {@Overridepublic void handleEventException(Throwable ex, long sequence, TranslatorDataWrapper event) {System.out.println("消费时出现异常");}@Overridepublic void handleOnStartException(Throwable ex) {System.out.println("启动时出现异常");}@Overridepublic void handleOnShutdownException(Throwable ex) {System.out.println("关闭时出现异常");}}
Disruptor实现:
package disruptor;import com.lmax.disruptor.*;import com.lmax.disruptor.dsl.ProducerType;import java.util.Map;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/* Disruptor实现 */public class RingBufferWorkerPoolFactory {private static class SingletonHolder {static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();}//返回单例模式对象public static RingBufferWorkerPoolFactory getInstance(){return SingletonHolder.instance;}private RingBufferWorkerPoolFactory(){}//用Map管理生产者private static Map<String,MessageProductor> productorMap = new ConcurrentHashMap<String, MessageProductor>();//用Map管理消费者private static Map<String,MessageConsumer> consumerMap = new ConcurrentHashMap<String, MessageConsumer>();//序号栅栏private SequenceBarrier sequenceBarrier;//环形数组private RingBuffer<TranslatorDataWrapper> ringBuffer;//工作池private WorkerPool<TranslatorDataWrapper> workerPool;/** 初始化Disruptor并启动* @param type生产者类型* @param bufferSize 环形缓冲区中创建的元素数* @param waitStrategy等待策略* @param messageConsumers消费者数组*/public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,16,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(20));ringBuffer = RingBuffer.create(type,()->new TranslatorDataWrapper(),bufferSize,new YieldingWaitStrategy());//设置序号栅栏(创建序号屏障)sequenceBarrier = ringBuffer.newBarrier();//构建多消费者工作池workerPool = new WorkerPool<TranslatorDataWrapper>(ringBuffer,sequenceBarrier,new EventExceptionHandler(),messageConsumers);//把构建的消费者放入池中for (MessageConsumer consumer : messageConsumers) {consumerMap.put(consumer.getConsumerId(), consumer);}//每个消费者的Sequence序号都是单独的,通过WorkPool获取每个消费者的序号然后设置到RingBuffer中ringBuffer.addGatingSequences(workerPool.getWorkerSequences());//启动工作池workerPool.start(threadPool);}/* 通过ID获取消费者 */public MessageProductor getMessageProducer(String producerId){MessageProductor messageProducer = productorMap.get(producerId);if(null == messageProducer) {messageProducer = new MessageProductor(producerId,ringBuffer);productorMap.put(producerId, messageProducer);}return messageProducer;}}
服务端:
添加消费者:
package dmbjz.server;import disruptor.MessageConsumer;import disruptor.TranslatorDataWrapper;import entity.TranslatorData;import io.netty.channel.ChannelHandlerContext;public class MessageConsumerImplServer extends MessageConsumer {public MessageConsumerImplServer(String consumerId) {super(consumerId);}@Overridepublic void onEvent(TranslatorDataWrapper event) throws Exception {TranslatorData request = event.getTranslatorData();ChannelHandlerContext ctx = event.getCtx();//业务处理逻辑:System.out.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage());//回送响应信息:TranslatorData response = new TranslatorData();response.setId("resp: " + request.getId());response.setName("resp: " + request.getName());response.setMessage("resp: " + request.getMessage());//写出response响应信息:ctx.writeAndFlush(response);}}
修改ServcerHandler:
使用 disruptor 管理业务处理逻辑
package dmbjz.server;import disruptor.MessageProductor;import disruptor.RingBufferWorkerPoolFactory;import entity.TranslatorData;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** 服务端Handler */public class ServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("通道建立成功");super.channelActive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {/*TranslatorData info = (TranslatorData) msg;System.out.println("ID:"+info.getId()+",名称:"+info.getName()+",消息:"+info.getMessage());//创建新数据TranslatorData response = new TranslatorData();response.setId("resp: " + info.getId());response.setName("resp: " + info.getName());response.setMessage("resp: " + info.getMessage());//返回数据ctx.writeAndFlush(response);*//* 使用disruptor异步执行业务逻辑,将数据发送给消费者即可 */TranslatorData info = (TranslatorData) msg;String id = "code:seesionId:001"; //同一个用户应使用同一个Id,例如可以使用机器码:sessionId:用户编号作为规则进行生成,这里固定写死MessageProductor messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(id);messageProducer.onData(info,ctx);}}
修改NettyBoot:
package dmbjz;import com.lmax.disruptor.BlockingWaitStrategy;import com.lmax.disruptor.YieldingWaitStrategy;import com.lmax.disruptor.dsl.ProducerType;import disruptor.MessageConsumer;import disruptor.RingBufferWorkerPoolFactory;import dmbjz.server.MessageConsumerImplServer;import dmbjz.server.NettyServer;import org.springframework.context.ApplicationListener;import org.springframework.context.event.ContextRefreshedEvent;import org.springframework.stereotype.Component;/* Spring加载完成后进行监听执行 */@Componentpublic class NettyBoot implements ApplicationListener<ContextRefreshedEvent> {/* 当一个ApplicationContext被初始化或刷新触发 */@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//判断当前是否为spring容器初始化完成,防止重复执行if(event.getApplicationContext().getParent() == null){try {//初始化消费者MessageConsumer[] conusmers = new MessageConsumer[4];for(int i =0; i < conusmers.length; i++) {MessageConsumer messageConsumer = new MessageConsumerImplServer("code:serverId:" + i);conusmers[i] = messageConsumer;}//初始化Disruptor并启动RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,1024*1024,//new YieldingWaitStrategy(),new BlockingWaitStrategy(),conusmers);//启动Netty服务端NettyServer.getInstance().start();} catch (Exception e) {e.printStackTrace();}}}}
修改客户端:
修改ClientHandler:
package com.dmbjz.client;import disruptor.MessageProductor;import disruptor.RingBufferWorkerPoolFactory;import entity.TranslatorData;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.ReferenceCountUtil;public class ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {/*try {TranslatorData response = (TranslatorData)msg;System.err.println("Client端: id= " + response.getId()+ ", name= " + response.getName()+ ", message= " + response.getMessage());} finally {//释放缓存ReferenceCountUtil.release(msg);}*/TranslatorData response = (TranslatorData)msg;String producerId = "code:seesionId:002";MessageProductor messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);messageProducer.onData(response, ctx);}}
添加消费者:
package com.dmbjz.client;import disruptor.MessageConsumer;import disruptor.TranslatorDataWrapper;import entity.TranslatorData;import io.netty.channel.ChannelHandlerContext;import io.netty.util.ReferenceCountUtil;public class MessageConsumerImplClient extends MessageConsumer {public MessageConsumerImplClient(String consumerId) {super(consumerId);}/* 业务逻辑处理方式 */@Overridepublic void onEvent(TranslatorDataWrapper event) throws Exception {TranslatorData response = event.getTranslatorData();//业务逻辑处理:try {System.out.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage());} finally {ReferenceCountUtil.release(response);}}}
修改启动类:
package com.dmbjz;import com.dmbjz.client.MessageConsumerImplClient;import com.dmbjz.client.NettyClient;import com.lmax.disruptor.BlockingWaitStrategy;import com.lmax.disruptor.dsl.ProducerType;import disruptor.MessageConsumer;import disruptor.RingBufferWorkerPoolFactory;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class NettyClientApplication {public static void main(String[] args) {SpringApplication.run(NettyClientApplication.class, args);//创建消费者MessageConsumer[] conusmers = new MessageConsumer[4];for(int i =0; i < conusmers.length; i++) {MessageConsumer messageConsumer = new MessageConsumerImplClient("code:clientId:" + i);conusmers[i] = messageConsumer;}//初始化并启动disruptorRingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,1024*1024,//new YieldingWaitStrategy(),new BlockingWaitStrategy(),conusmers);//在启动的时候执行Client代码new NettyClient().sendData();}}
测试:

