在Netty优化章节中介绍过可以使用线程组优化 WorkGroup 的执行效率,但在高并发环境下,应采使用 disruptor 对业务逻辑进行处理

Netty基础服务搭建:

基于Jboss编解码器案例进行修改,创建两个 SpringBoot 项目,分别为Netty客户端与服务端,服务端接收到客户端发送的Java实体类消息后创建新消息返回给客户端
image.png

Common端:

Pom依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.projectlombok</groupId>
  4. <artifactId>lombok</artifactId>
  5. <version>1.18.24</version>
  6. <scope>provided</scope>
  7. </dependency>
  8. <dependency>
  9. <groupId>io.netty</groupId>
  10. <artifactId>netty-codec</artifactId>
  11. <version>4.1.76.Final</version>
  12. <scope>compile</scope>
  13. </dependency>
  14. <!-- Netty Java序列化框架marshalling -->
  15. <dependency>
  16. <groupId>org.jboss.marshalling</groupId>
  17. <artifactId>jboss-marshalling</artifactId>
  18. <version>1.3.0.CR9</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.jboss.marshalling</groupId>
  22. <artifactId>jboss-marshalling-serial</artifactId>
  23. <version>1.3.0.CR9</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>com.lmax</groupId>
  27. <artifactId>disruptor</artifactId>
  28. <version>3.3.7</version>
  29. <scope>compile</scope>
  30. </dependency>
  31. </dependencies>

实体类:

  1. package entity;
  2. import lombok.Data;
  3. import lombok.experimental.Accessors;
  4. import java.io.Serializable;
  5. /* Netty 传输数据实体类 */
  6. @Data
  7. @Accessors(chain = true)
  8. public class TranslatorData implements Serializable {
  9. private String id;
  10. private String name;
  11. private String message; //传输消息具体内容
  12. }

Java实体类编解码器:

  1. package factory;
  2. import io.netty.handler.codec.marshalling.*;
  3. import org.jboss.marshalling.MarshallerFactory;
  4. import org.jboss.marshalling.Marshalling;
  5. import org.jboss.marshalling.MarshallingConfiguration;
  6. /**
  7. * Marshalling工厂,Java实体类编解码器
  8. */
  9. public final class MarshallingCodeCFactory {
  10. /**
  11. * 创建Jboss Marshalling解码器MarshallingDecoder
  12. * @return MarshallingDecoder
  13. */
  14. public static MarshallingDecoder buildMarshallingDecoder() {
  15. //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
  16. final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
  17. //创建了MarshallingConfiguration对象,配置了版本号为5
  18. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  19. configuration.setVersion(5);
  20. //根据marshallerFactory和configuration创建provider
  21. UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
  22. //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
  23. MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
  24. return decoder;
  25. }
  26. /**
  27. * 创建Jboss Marshalling编码器MarshallingEncoder
  28. * @return MarshallingEncoder
  29. */
  30. public static MarshallingEncoder buildMarshallingEncoder() {
  31. final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
  32. final MarshallingConfiguration configuration = new MarshallingConfiguration();
  33. configuration.setVersion(5);
  34. MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
  35. //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
  36. MarshallingEncoder encoder = new MarshallingEncoder(provider);
  37. return encoder;
  38. }
  39. }

服务端:

服务端核心代码:

  1. package dmbjz.server;
  2. import factory.MarshallingCodeCFactory;
  3. import io.netty.bootstrap.ServerBootstrap;
  4. import io.netty.buffer.PooledByteBufAllocator;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.marshalling.MarshallingEncoder;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. import io.netty.handler.logging.LogLevel;
  13. import io.netty.handler.logging.LoggingHandler;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.springframework.stereotype.Component;
  16. /* Netty服务端 */
  17. @Component
  18. @Slf4j
  19. public class NettyServer {
  20. //创建单例模式
  21. private static class SingletionWsServer{
  22. static final NettyServer instance = new NettyServer();
  23. }
  24. //返回单例模式对象
  25. public static NettyServer getInstance(){
  26. return SingletionWsServer.instance;
  27. }
  28. private EventLoopGroup bossGroup;
  29. private EventLoopGroup workGroup;
  30. private ServerBootstrap bootstrap;
  31. private ChannelFuture channelFuture;
  32. /* 构造方法内进行初始化 */
  33. public NettyServer(){
  34. bossGroup = new NioEventLoopGroup();
  35. workGroup = new NioEventLoopGroup();
  36. bootstrap = new ServerBootstrap();
  37. bootstrap.group(bossGroup,workGroup)
  38. .channel(NioServerSocketChannel.class)
  39. .option(ChannelOption.SO_BACKLOG,1024)
  40. //缓存区自适应,最好可以进行手动计算Bytebuf数据包大小来设置缓存区空间
  41. .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
  42. //开启日志记录
  43. .handler(new LoggingHandler(LogLevel.DEBUG))
  44. //回调方法
  45. .childHandler(new ChannelInitializer<SocketChannel>() {
  46. @Override
  47. protected void initChannel(SocketChannel socketChannel) throws Exception {
  48. ChannelPipeline pipeline = socketChannel.pipeline();
  49. //添加Java实体类编解码器
  50. pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
  51. pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
  52. //添加服务端Handler
  53. pipeline.addLast(new ServerHandler());
  54. }}
  55. );
  56. }
  57. public void start(){
  58. //额外线程启动Netty而不是在Main方法中启动,因此无需异步
  59. //使用Spring容器进行托管,也无需进行关闭
  60. this.channelFuture = bootstrap.bind(8765);
  61. log.warn("Netty WebSocket Server启动完毕...");
  62. }
  63. }

服务端Handler:

  1. package dmbjz.server;
  2. import entity.TranslatorData;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. /** 服务端Handler */
  6. public class ServerHandler extends ChannelInboundHandlerAdapter {
  7. @Override
  8. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  9. System.out.println("通道建立成功");
  10. super.channelActive(ctx);
  11. }
  12. @Override
  13. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  14. TranslatorData info = (TranslatorData) msg;
  15. System.out.println("ID:"+info.getId()+",名称:"+info.getName()+",消息:"+info.getMessage());
  16. //创建新数据
  17. TranslatorData response = new TranslatorData();
  18. response.setId("resp: " + info.getId());
  19. response.setName("resp: " + info.getName());
  20. response.setMessage("resp: " + info.getMessage());
  21. //返回数据
  22. ctx.writeAndFlush(response);
  23. }
  24. }

SpringBoot监听执行:

  1. package dmbjz;
  2. import dmbjz.server.NettyServer;
  3. import org.springframework.context.ApplicationListener;
  4. import org.springframework.context.event.ContextRefreshedEvent;
  5. import org.springframework.stereotype.Component;
  6. /* Spring加载完成后进行监听执行 */
  7. @Component
  8. public class NettyBoot implements ApplicationListener<ContextRefreshedEvent> {
  9. /* 当一个ApplicationContext被初始化或刷新触发 */
  10. @Override
  11. public void onApplicationEvent(ContextRefreshedEvent event) {
  12. //判断当前是否为spring容器初始化完成,防止重复执行
  13. if(event.getApplicationContext().getParent() == null){
  14. try {
  15. NettyServer.getInstance().start();
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. }

客户端:

客户端代码:

  1. package com.dmbjz.client;
  2. import entity.TranslatorData;
  3. import factory.MarshallingCodeCFactory;
  4. import io.netty.bootstrap.Bootstrap;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.logging.LogLevel;
  10. import io.netty.handler.logging.LoggingHandler;
  11. public class NettyClient {
  12. private Channel channel;
  13. public NettyClient() {
  14. this.connect();
  15. }
  16. private void connect() {
  17. EventLoopGroup workGroup = new NioEventLoopGroup();
  18. Bootstrap bootstrap = new Bootstrap();
  19. try {
  20. bootstrap.group(workGroup)
  21. .channel(NioSocketChannel.class)
  22. //表示缓存区动态调配(自适应)
  23. .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
  24. .handler(new LoggingHandler(LogLevel.INFO))
  25. .handler(new ChannelInitializer<SocketChannel>() {
  26. @Override
  27. protected void initChannel(SocketChannel sc) throws Exception {
  28. ChannelPipeline pipeline = sc.pipeline();
  29. //添加Java实体类编解码器
  30. pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
  31. pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
  32. //添加自定义处理器
  33. pipeline.addLast(new ClientHandler());
  34. }
  35. });
  36. //绑定端口,同步等等请求连接
  37. ChannelFuture future = bootstrap.connect("127.0.0.1", 8765).sync();
  38. System.out.println("客户端开始连接....");
  39. //进行数据的发送, 但是首先我们要获取channel:
  40. this.channel = future.channel();
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. /* 数据发送方法 */
  46. public void sendData(){
  47. for(int i =0; i <10; i++){
  48. TranslatorData request = new TranslatorData();
  49. request.setId("" + i);
  50. request.setName("请求消息名称 " + i);
  51. request.setMessage("请求消息内容 " + i);
  52. this.channel.writeAndFlush(request);
  53. }
  54. }
  55. }

客户端Handler:

  1. package com.dmbjz.client;
  2. import entity.TranslatorData;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.util.ReferenceCountUtil;
  6. public class ClientHandler extends ChannelInboundHandlerAdapter {
  7. @Override
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  9. try {
  10. TranslatorData response = (TranslatorData)msg;
  11. System.err.println("Client端: id= " + response.getId()
  12. + ", name= " + response.getName()
  13. + ", message= " + response.getMessage());
  14. } finally {
  15. //释放缓存
  16. ReferenceCountUtil.release(msg);
  17. }
  18. }
  19. }

测试:

image.png
image.png


整合 Disruptor:

每个用户创建出唯一ID,将服务端与消费端需要处理的业务逻辑交给 disruptor 的生产者,消费者通过唯一ID获取到用户在客户端传递的数据进行业务处理
Netty整合Disruptor - 图4
image.png

Common端:

消费者:

  1. package disruptor;
  2. import com.lmax.disruptor.WorkHandler;
  3. /* 消费者 */
  4. public abstract class MessageConsumer implements WorkHandler<TranslatorDataWrapper> {
  5. protected String consumerId;
  6. public MessageConsumer(String consumerId) {
  7. this.consumerId = consumerId;
  8. }
  9. public String getConsumerId() {
  10. return consumerId;
  11. }
  12. public void setConsumerId(String consumerId) {
  13. this.consumerId = consumerId;
  14. }
  15. }

生产者:

  1. package disruptor;
  2. import com.lmax.disruptor.RingBuffer;
  3. import entity.TranslatorData;
  4. import io.netty.channel.ChannelHandlerContext;
  5. import lombok.AllArgsConstructor;
  6. import lombok.Getter;
  7. import lombok.NoArgsConstructor;
  8. import lombok.Setter;
  9. /* 生产者 */
  10. @NoArgsConstructor
  11. @AllArgsConstructor
  12. @Getter
  13. @Setter
  14. public class MessageProductor {
  15. private String producerId;
  16. private RingBuffer<TranslatorDataWrapper> ringBuffer;
  17. /* 发布数据到 RingBuffer */
  18. public void onData(TranslatorData data, ChannelHandlerContext ctx) {
  19. long sequence = ringBuffer.next();
  20. try {
  21. TranslatorDataWrapper wapper = ringBuffer.get(sequence);
  22. wapper.setTranslatorData(data);
  23. wapper.setCtx(ctx);
  24. } finally {
  25. ringBuffer.publish(sequence);
  26. }
  27. }
  28. }

传输实体类:

  1. package disruptor;
  2. import entity.TranslatorData;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import lombok.Data;
  5. import lombok.NoArgsConstructor;
  6. /* Disruptor 传递的数据实体类 */
  7. @Data
  8. @NoArgsConstructor
  9. public class TranslatorDataWrapper {
  10. private TranslatorData translatorData; //数据实体类
  11. private ChannelHandlerContext ctx; //Netty使用的Ctx对象
  12. }

失败操作:

  1. package disruptor;
  2. import com.lmax.disruptor.ExceptionHandler;
  3. /* 事件处理失败时的操作 */
  4. public class EventExceptionHandler implements ExceptionHandler<TranslatorDataWrapper> {
  5. @Override
  6. public void handleEventException(Throwable ex, long sequence, TranslatorDataWrapper event) {
  7. System.out.println("消费时出现异常");
  8. }
  9. @Override
  10. public void handleOnStartException(Throwable ex) {
  11. System.out.println("启动时出现异常");
  12. }
  13. @Override
  14. public void handleOnShutdownException(Throwable ex) {
  15. System.out.println("关闭时出现异常");
  16. }
  17. }

Disruptor实现:

  1. package disruptor;
  2. import com.lmax.disruptor.*;
  3. import com.lmax.disruptor.dsl.ProducerType;
  4. import java.util.Map;
  5. import java.util.concurrent.ArrayBlockingQueue;
  6. import java.util.concurrent.ConcurrentHashMap;
  7. import java.util.concurrent.ThreadPoolExecutor;
  8. import java.util.concurrent.TimeUnit;
  9. /* Disruptor实现 */
  10. public class RingBufferWorkerPoolFactory {
  11. private static class SingletonHolder {
  12. static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
  13. }
  14. //返回单例模式对象
  15. public static RingBufferWorkerPoolFactory getInstance(){
  16. return SingletonHolder.instance;
  17. }
  18. private RingBufferWorkerPoolFactory(){
  19. }
  20. //用Map管理生产者
  21. private static Map<String,MessageProductor> productorMap = new ConcurrentHashMap<String, MessageProductor>();
  22. //用Map管理消费者
  23. private static Map<String,MessageConsumer> consumerMap = new ConcurrentHashMap<String, MessageConsumer>();
  24. //序号栅栏
  25. private SequenceBarrier sequenceBarrier;
  26. //环形数组
  27. private RingBuffer<TranslatorDataWrapper> ringBuffer;
  28. //工作池
  29. private WorkerPool<TranslatorDataWrapper> workerPool;
  30. /*
  31. * 初始化Disruptor并启动
  32. * @param type生产者类型
  33. * @param bufferSize 环形缓冲区中创建的元素数
  34. * @param waitStrategy等待策略
  35. * @param messageConsumers消费者数组
  36. */
  37. public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {
  38. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,16,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(20));
  39. ringBuffer = RingBuffer.create(type,()->new TranslatorDataWrapper(),bufferSize,new YieldingWaitStrategy());
  40. //设置序号栅栏(创建序号屏障)
  41. sequenceBarrier = ringBuffer.newBarrier();
  42. //构建多消费者工作池
  43. workerPool = new WorkerPool<TranslatorDataWrapper>(ringBuffer,sequenceBarrier,new EventExceptionHandler(),messageConsumers);
  44. //把构建的消费者放入池中
  45. for (MessageConsumer consumer : messageConsumers) {
  46. consumerMap.put(consumer.getConsumerId(), consumer);
  47. }
  48. //每个消费者的Sequence序号都是单独的,通过WorkPool获取每个消费者的序号然后设置到RingBuffer中
  49. ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
  50. //启动工作池
  51. workerPool.start(threadPool);
  52. }
  53. /* 通过ID获取消费者 */
  54. public MessageProductor getMessageProducer(String producerId){
  55. MessageProductor messageProducer = productorMap.get(producerId);
  56. if(null == messageProducer) {
  57. messageProducer = new MessageProductor(producerId,ringBuffer);
  58. productorMap.put(producerId, messageProducer);
  59. }
  60. return messageProducer;
  61. }
  62. }

服务端:

添加消费者:

  1. package dmbjz.server;
  2. import disruptor.MessageConsumer;
  3. import disruptor.TranslatorDataWrapper;
  4. import entity.TranslatorData;
  5. import io.netty.channel.ChannelHandlerContext;
  6. public class MessageConsumerImplServer extends MessageConsumer {
  7. public MessageConsumerImplServer(String consumerId) {
  8. super(consumerId);
  9. }
  10. @Override
  11. public void onEvent(TranslatorDataWrapper event) throws Exception {
  12. TranslatorData request = event.getTranslatorData();
  13. ChannelHandlerContext ctx = event.getCtx();
  14. //业务处理逻辑:
  15. System.out.println("Sever端: id= " + request.getId() + ", name= " + request.getName() + ", message= " + request.getMessage());
  16. //回送响应信息:
  17. TranslatorData response = new TranslatorData();
  18. response.setId("resp: " + request.getId());
  19. response.setName("resp: " + request.getName());
  20. response.setMessage("resp: " + request.getMessage());
  21. //写出response响应信息:
  22. ctx.writeAndFlush(response);
  23. }
  24. }

修改ServcerHandler:

使用 disruptor 管理业务处理逻辑

  1. package dmbjz.server;
  2. import disruptor.MessageProductor;
  3. import disruptor.RingBufferWorkerPoolFactory;
  4. import entity.TranslatorData;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInboundHandlerAdapter;
  7. /** 服务端Handler */
  8. public class ServerHandler extends ChannelInboundHandlerAdapter {
  9. @Override
  10. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  11. System.out.println("通道建立成功");
  12. super.channelActive(ctx);
  13. }
  14. @Override
  15. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  16. /*
  17. TranslatorData info = (TranslatorData) msg;
  18. System.out.println("ID:"+info.getId()+",名称:"+info.getName()+",消息:"+info.getMessage());
  19. //创建新数据
  20. TranslatorData response = new TranslatorData();
  21. response.setId("resp: " + info.getId());
  22. response.setName("resp: " + info.getName());
  23. response.setMessage("resp: " + info.getMessage());
  24. //返回数据
  25. ctx.writeAndFlush(response);
  26. */
  27. /* 使用disruptor异步执行业务逻辑,将数据发送给消费者即可 */
  28. TranslatorData info = (TranslatorData) msg;
  29. String id = "code:seesionId:001"; //同一个用户应使用同一个Id,例如可以使用机器码:sessionId:用户编号作为规则进行生成,这里固定写死
  30. MessageProductor messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(id);
  31. messageProducer.onData(info,ctx);
  32. }
  33. }

修改NettyBoot:

  1. package dmbjz;
  2. import com.lmax.disruptor.BlockingWaitStrategy;
  3. import com.lmax.disruptor.YieldingWaitStrategy;
  4. import com.lmax.disruptor.dsl.ProducerType;
  5. import disruptor.MessageConsumer;
  6. import disruptor.RingBufferWorkerPoolFactory;
  7. import dmbjz.server.MessageConsumerImplServer;
  8. import dmbjz.server.NettyServer;
  9. import org.springframework.context.ApplicationListener;
  10. import org.springframework.context.event.ContextRefreshedEvent;
  11. import org.springframework.stereotype.Component;
  12. /* Spring加载完成后进行监听执行 */
  13. @Component
  14. public class NettyBoot implements ApplicationListener<ContextRefreshedEvent> {
  15. /* 当一个ApplicationContext被初始化或刷新触发 */
  16. @Override
  17. public void onApplicationEvent(ContextRefreshedEvent event) {
  18. //判断当前是否为spring容器初始化完成,防止重复执行
  19. if(event.getApplicationContext().getParent() == null){
  20. try {
  21. //初始化消费者
  22. MessageConsumer[] conusmers = new MessageConsumer[4];
  23. for(int i =0; i < conusmers.length; i++) {
  24. MessageConsumer messageConsumer = new MessageConsumerImplServer("code:serverId:" + i);
  25. conusmers[i] = messageConsumer;
  26. }
  27. //初始化Disruptor并启动
  28. RingBufferWorkerPoolFactory.getInstance().initAndStart(
  29. ProducerType.MULTI,
  30. 1024*1024,
  31. //new YieldingWaitStrategy(),
  32. new BlockingWaitStrategy(),
  33. conusmers);
  34. //启动Netty服务端
  35. NettyServer.getInstance().start();
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }
  41. }

修改客户端:

修改ClientHandler:

  1. package com.dmbjz.client;
  2. import disruptor.MessageProductor;
  3. import disruptor.RingBufferWorkerPoolFactory;
  4. import entity.TranslatorData;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInboundHandlerAdapter;
  7. import io.netty.util.ReferenceCountUtil;
  8. public class ClientHandler extends ChannelInboundHandlerAdapter {
  9. @Override
  10. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  11. /*
  12. try {
  13. TranslatorData response = (TranslatorData)msg;
  14. System.err.println("Client端: id= " + response.getId()
  15. + ", name= " + response.getName()
  16. + ", message= " + response.getMessage());
  17. } finally {
  18. //释放缓存
  19. ReferenceCountUtil.release(msg);
  20. }
  21. */
  22. TranslatorData response = (TranslatorData)msg;
  23. String producerId = "code:seesionId:002";
  24. MessageProductor messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
  25. messageProducer.onData(response, ctx);
  26. }
  27. }

添加消费者:

  1. package com.dmbjz.client;
  2. import disruptor.MessageConsumer;
  3. import disruptor.TranslatorDataWrapper;
  4. import entity.TranslatorData;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.util.ReferenceCountUtil;
  7. public class MessageConsumerImplClient extends MessageConsumer {
  8. public MessageConsumerImplClient(String consumerId) {
  9. super(consumerId);
  10. }
  11. /* 业务逻辑处理方式 */
  12. @Override
  13. public void onEvent(TranslatorDataWrapper event) throws Exception {
  14. TranslatorData response = event.getTranslatorData();
  15. //业务逻辑处理:
  16. try {
  17. System.out.println("Client端: id= " + response.getId() + ", name= " + response.getName() + ", message= " + response.getMessage());
  18. } finally {
  19. ReferenceCountUtil.release(response);
  20. }
  21. }
  22. }

修改启动类:

  1. package com.dmbjz;
  2. import com.dmbjz.client.MessageConsumerImplClient;
  3. import com.dmbjz.client.NettyClient;
  4. import com.lmax.disruptor.BlockingWaitStrategy;
  5. import com.lmax.disruptor.dsl.ProducerType;
  6. import disruptor.MessageConsumer;
  7. import disruptor.RingBufferWorkerPoolFactory;
  8. import org.springframework.boot.SpringApplication;
  9. import org.springframework.boot.autoconfigure.SpringBootApplication;
  10. @SpringBootApplication
  11. public class NettyClientApplication {
  12. public static void main(String[] args) {
  13. SpringApplication.run(NettyClientApplication.class, args);
  14. //创建消费者
  15. MessageConsumer[] conusmers = new MessageConsumer[4];
  16. for(int i =0; i < conusmers.length; i++) {
  17. MessageConsumer messageConsumer = new MessageConsumerImplClient("code:clientId:" + i);
  18. conusmers[i] = messageConsumer;
  19. }
  20. //初始化并启动disruptor
  21. RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
  22. 1024*1024,
  23. //new YieldingWaitStrategy(),
  24. new BlockingWaitStrategy(),
  25. conusmers);
  26. //在启动的时候执行Client代码
  27. new NettyClient().sendData();
  28. }
  29. }

测试:

image.png image.png