在Netty优化章节中介绍过可以使用线程组优化 WorkGroup 的执行效率,但在高并发环境下,应采使用 disruptor 对业务逻辑进行处理
Netty基础服务搭建:
基于Jboss编解码器案例进行修改,创建两个 SpringBoot 项目,分别为Netty客户端与服务端,服务端接收到客户端发送的Java实体类消息后创建新消息返回给客户端
Common端:
Pom依赖:
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>4.1.76.Final</version>
<scope>compile</scope>
</dependency>
<!-- Netty Java序列化框架marshalling -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>1.3.0.CR9</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.3.0.CR9</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.7</version>
<scope>compile</scope>
</dependency>
</dependencies>
实体类:
package entity;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/* Netty 传输数据实体类 */
@Data
@Accessors(chain = true)
public class TranslatorData implements Serializable {
private String id;
private String name;
private String message; //传输消息具体内容
}
Java实体类编解码器:
package factory;
import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
/**
* Marshalling工厂,Java实体类编解码器
*/
public final class MarshallingCodeCFactory {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
服务端:
服务端核心代码:
package dmbjz.server;
import factory.MarshallingCodeCFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/* Netty服务端 */
@Component
@Slf4j
public class NettyServer {
//创建单例模式
private static class SingletionWsServer{
static final NettyServer instance = new NettyServer();
}
//返回单例模式对象
public static NettyServer getInstance(){
return SingletionWsServer.instance;
}
private EventLoopGroup bossGroup;
private EventLoopGroup workGroup;
private ServerBootstrap bootstrap;
private ChannelFuture channelFuture;
/* 构造方法内进行初始化 */
public NettyServer(){
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
//缓存区自适应,最好可以进行手动计算Bytebuf数据包大小来设置缓存区空间
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
//开启日志记录
.handler(new LoggingHandler(LogLevel.DEBUG))
//回调方法
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//添加Java实体类编解码器
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
//添加服务端Handler
pipeline.addLast(new ServerHandler());
}}
);
}
public void start(){
//额外线程启动Netty而不是在Main方法中启动,因此无需异步
//使用Spring容器进行托管,也无需进行关闭
this.channelFuture = bootstrap.bind(8765);
log.warn("Netty WebSocket Server启动完毕...");
}
}
服务端Handler:
package dmbjz.server;
import entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/** 服务端Handler */
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道建立成功");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
TranslatorData info = (TranslatorData) msg;
System.out.println("ID:"+info.getId()+",名称:"+info.getName()+",消息:"+info.getMessage());
//创建新数据
TranslatorData response = new TranslatorData();
response.setId("resp: " + info.getId());
response.setName("resp: " + info.getName());
response.setMessage("resp: " + info.getMessage());
//返回数据
ctx.writeAndFlush(response);
}
}
SpringBoot监听执行:
package dmbjz;
import dmbjz.server.NettyServer;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/* Spring加载完成后进行监听执行 */
@Component
public class NettyBoot implements ApplicationListener<ContextRefreshedEvent> {
/* 当一个ApplicationContext被初始化或刷新触发 */
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//判断当前是否为spring容器初始化完成,防止重复执行
if(event.getApplicationContext().getParent() == null){
try {
NettyServer.getInstance().start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
客户端:
客户端代码:
package com.dmbjz.client;
import entity.TranslatorData;
import factory.MarshallingCodeCFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class NettyClient {
private Channel channel;
public NettyClient() {
this.connect();
}
private void connect() {
EventLoopGroup workGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
//表示缓存区动态调配(自适应)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
//添加Java实体类编解码器
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
//添加自定义处理器
pipeline.addLast(new ClientHandler());
}
});
//绑定端口,同步等等请求连接
ChannelFuture future = bootstrap.connect("127.0.0.1", 8765).sync();
System.out.println("客户端开始连接....");
//进行数据的发送, 但是首先我们要获取channel:
this.channel = future.channel();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/* 数据发送方法 */
public void sendData(){
for(int i =0; i <10; i++){
TranslatorData request = new TranslatorData();
request.setId("" + i);
request.setName("请求消息名称 " + i);
request.setMessage("请求消息内容 " + i);
this.channel.writeAndFlush(request);
}
}
}
客户端Handler:
package com.dmbjz.client;
import entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
TranslatorData response = (TranslatorData)msg;
System.err.println("Client端: id= " + response.getId()
+ ", name= " + response.getName()
+ ", message= " + response.getMessage());
} finally {
//释放缓存
ReferenceCountUtil.release(msg);
}
}
}
测试:
整合 Disruptor:
每个用户创建出唯一ID,将服务端与消费端需要处理的业务逻辑交给 disruptor 的生产者,消费者通过唯一ID获取到用户在客户端传递的数据进行业务处理
Common端:
消费者:
package disruptor;
import com.lmax.disruptor.WorkHandler;
/* 消费者 */
public abstract class MessageConsumer implements WorkHandler<TranslatorDataWrapper> {
protected String consumerId;
public MessageConsumer(String consumerId) {
this.consumerId = consumerId;
}
public String getConsumerId() {
return consumerId;
}
public void setConsumerId(String consumerId) {
this.consumerId = consumerId;
}
}
生产者:
package disruptor;
import com.lmax.disruptor.RingBuffer;
import entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/* 生产者 */
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class MessageProductor {
private String producerId;
private RingBuffer<TranslatorDataWrapper> ringBuffer;
/* 发布数据到 RingBuffer */
public void onData(TranslatorData data, ChannelHandlerContext ctx) {
long sequence = ringBuffer.next();
try {
TranslatorDataWrapper wapper = ringBuffer.get(sequence);
wapper.setTranslatorData(data);
wapper.setCtx(ctx);
} finally {
ringBuffer.publish(sequence);
}
}
}
传输实体类:
package disruptor;
import entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import lombok.Data;
import lombok.NoArgsConstructor;
/* Disruptor 传递的数据实体类 */
@Data
@NoArgsConstructor
public class TranslatorDataWrapper {
private TranslatorData translatorData; //数据实体类
private ChannelHandlerContext ctx; //Netty使用的Ctx对象
}
失败操作:
package disruptor;
import com.lmax.disruptor.ExceptionHandler;
/* 事件处理失败时的操作 */
public class EventExceptionHandler implements ExceptionHandler<TranslatorDataWrapper> {
@Override
public void handleEventException(Throwable ex, long sequence, TranslatorDataWrapper event) {
System.out.println("消费时出现异常");
}
@Override
public void handleOnStartException(Throwable ex) {
System.out.println("启动时出现异常");
}
@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("关闭时出现异常");
}
}
Disruptor实现:
package disruptor;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/* Disruptor实现 */
public class RingBufferWorkerPoolFactory {
private static class SingletonHolder {
static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
}
//返回单例模式对象
public static RingBufferWorkerPoolFactory getInstance(){
return SingletonHolder.instance;
}
private RingBufferWorkerPoolFactory(){
}
//用Map管理生产者
private static Map<String,MessageProductor> productorMap = new ConcurrentHashMap<String, MessageProductor>();
//用Map管理消费者
private static Map<String,MessageConsumer> consumerMap = new ConcurrentHashMap<String, MessageConsumer>();
//序号栅栏
private SequenceBarrier sequenceBarrier;
//环形数组
private RingBuffer<TranslatorDataWrapper> ringBuffer;
//工作池
private WorkerPool<TranslatorDataWrapper> workerPool;
/*
* 初始化Disruptor并启动
* @param type生产者类型
* @param bufferSize 环形缓冲区中创建的元素数
* @param waitStrategy等待策略
* @param messageConsumers消费者数组
*/
public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,16,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(20));
ringBuffer = RingBuffer.create(type,()->new TranslatorDataWrapper(),bufferSize,new YieldingWaitStrategy());
//设置序号栅栏(创建序号屏障)
sequenceBarrier = ringBuffer.newBarrier();
//构建多消费者工作池
workerPool = new WorkerPool<TranslatorDataWrapper>(ringBuffer,sequenceBarrier,new EventExceptionHandler(),messageConsumers);
//把构建的消费者放入池中
for (MessageConsumer consumer : messageConsumers) {
consumerMap.put(consumer.getConsumerId(), consumer);
}
//每个消费者的Sequence序号都是单独的,通过WorkPool获取每个消费者的序号然后设置到RingBuffer中
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
//启动工作池
workerPool.start(threadPool);
}
/* 通过ID获取消费者 */
public MessageProductor getMessageProducer(String producerId){
MessageProductor messageProducer = productorMap.get(producerId);
if(null == messageProducer) {
messageProducer = new MessageProductor(producerId,ringBuffer);
productorMap.put(producerId, messageProducer);
}
return messageProducer;
}
}
服务端:
添加消费者:
package dmbjz.server;
import disruptor.MessageConsumer;
import disruptor.TranslatorDataWrapper;
import entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
public class MessageConsumerImplServer extends MessageConsumer {
public MessageConsumerImplServer(String consumerId) {
super(consumerId);
}
@Override
public void onEvent(TranslatorDataWrapper event) throws Exception {
TranslatorData request = event.getTranslatorData();
ChannelHandlerContext ctx = event.getCtx();
//业务处理逻辑:
System.out.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage());
//回送响应信息:
TranslatorData response = new TranslatorData();
response.setId("resp: " + request.getId());
response.setName("resp: " + request.getName());
response.setMessage("resp: " + request.getMessage());
//写出response响应信息:
ctx.writeAndFlush(response);
}
}
修改ServcerHandler:
使用 disruptor 管理业务处理逻辑
package dmbjz.server;
import disruptor.MessageProductor;
import disruptor.RingBufferWorkerPoolFactory;
import entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/** 服务端Handler */
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道建立成功");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/*
TranslatorData info = (TranslatorData) msg;
System.out.println("ID:"+info.getId()+",名称:"+info.getName()+",消息:"+info.getMessage());
//创建新数据
TranslatorData response = new TranslatorData();
response.setId("resp: " + info.getId());
response.setName("resp: " + info.getName());
response.setMessage("resp: " + info.getMessage());
//返回数据
ctx.writeAndFlush(response);
*/
/* 使用disruptor异步执行业务逻辑,将数据发送给消费者即可 */
TranslatorData info = (TranslatorData) msg;
String id = "code:seesionId:001"; //同一个用户应使用同一个Id,例如可以使用机器码:sessionId:用户编号作为规则进行生成,这里固定写死
MessageProductor messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(id);
messageProducer.onData(info,ctx);
}
}
修改NettyBoot:
package dmbjz;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import disruptor.MessageConsumer;
import disruptor.RingBufferWorkerPoolFactory;
import dmbjz.server.MessageConsumerImplServer;
import dmbjz.server.NettyServer;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/* Spring加载完成后进行监听执行 */
@Component
public class NettyBoot implements ApplicationListener<ContextRefreshedEvent> {
/* 当一个ApplicationContext被初始化或刷新触发 */
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//判断当前是否为spring容器初始化完成,防止重复执行
if(event.getApplicationContext().getParent() == null){
try {
//初始化消费者
MessageConsumer[] conusmers = new MessageConsumer[4];
for(int i =0; i < conusmers.length; i++) {
MessageConsumer messageConsumer = new MessageConsumerImplServer("code:serverId:" + i);
conusmers[i] = messageConsumer;
}
//初始化Disruptor并启动
RingBufferWorkerPoolFactory.getInstance().initAndStart(
ProducerType.MULTI,
1024*1024,
//new YieldingWaitStrategy(),
new BlockingWaitStrategy(),
conusmers);
//启动Netty服务端
NettyServer.getInstance().start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
修改客户端:
修改ClientHandler:
package com.dmbjz.client;
import disruptor.MessageProductor;
import disruptor.RingBufferWorkerPoolFactory;
import entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/*
try {
TranslatorData response = (TranslatorData)msg;
System.err.println("Client端: id= " + response.getId()
+ ", name= " + response.getName()
+ ", message= " + response.getMessage());
} finally {
//释放缓存
ReferenceCountUtil.release(msg);
}
*/
TranslatorData response = (TranslatorData)msg;
String producerId = "code:seesionId:002";
MessageProductor messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
messageProducer.onData(response, ctx);
}
}
添加消费者:
package com.dmbjz.client;
import disruptor.MessageConsumer;
import disruptor.TranslatorDataWrapper;
import entity.TranslatorData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class MessageConsumerImplClient extends MessageConsumer {
public MessageConsumerImplClient(String consumerId) {
super(consumerId);
}
/* 业务逻辑处理方式 */
@Override
public void onEvent(TranslatorDataWrapper event) throws Exception {
TranslatorData response = event.getTranslatorData();
//业务逻辑处理:
try {
System.out.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage());
} finally {
ReferenceCountUtil.release(response);
}
}
}
修改启动类:
package com.dmbjz;
import com.dmbjz.client.MessageConsumerImplClient;
import com.dmbjz.client.NettyClient;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import disruptor.MessageConsumer;
import disruptor.RingBufferWorkerPoolFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class NettyClientApplication {
public static void main(String[] args) {
SpringApplication.run(NettyClientApplication.class, args);
//创建消费者
MessageConsumer[] conusmers = new MessageConsumer[4];
for(int i =0; i < conusmers.length; i++) {
MessageConsumer messageConsumer = new MessageConsumerImplClient("code:clientId:" + i);
conusmers[i] = messageConsumer;
}
//初始化并启动disruptor
RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
1024*1024,
//new YieldingWaitStrategy(),
new BlockingWaitStrategy(),
conusmers);
//在启动的时候执行Client代码
new NettyClient().sendData();
}
}
测试: