代码编写:

事件实体类:

  1. package com.dmbjz.quickstart;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. /** 消息数据对象 */
  6. @Data
  7. @AllArgsConstructor
  8. @NoArgsConstructor
  9. public class OrderEvent {
  10. private Integer value; //订单价格
  11. }

事件工厂:

用于在 RingBuffer 中返回对象实例

  1. package com.dmbjz.quickstart;
  2. import com.lmax.disruptor.EventFactory;
  3. /** Event工厂,用于创建对象实例,需要实现EventFactory接口 */
  4. public class OrderEventFactory implements EventFactory<OrderEvent> {
  5. //这个方法就是为了返回空的消息&数据对象(Event)
  6. @Override
  7. public OrderEvent newInstance() {
  8. return new OrderEvent();
  9. }
  10. }

消费者:

获取到事件之后需要执行的方法

  1. package com.dmbjz.quickstart;
  2. import com.lmax.disruptor.EventHandler;
  3. /**监听事件类,用于处理数据(Event类),可以理解为消费者 */
  4. public class OrderEventHandler implements EventHandler<OrderEvent> {
  5. /* 事件监听模式 */
  6. @Override
  7. public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
  8. System.out.println("消费者: " + orderEvent.getValue());
  9. }
  10. }

生产者:

获取 RingBuffer 的下标 > 通过下标获取创建出的事件对象 > 将对象进行赋值后塞回 RingBuffer

  1. package com.dmbjz.quickstart;
  2. import com.lmax.disruptor.RingBuffer;
  3. import lombok.AllArgsConstructor;
  4. import lombok.Data;
  5. import lombok.NoArgsConstructor;
  6. import java.nio.ByteBuffer;
  7. /**生产者 */
  8. @Data
  9. @AllArgsConstructor
  10. @NoArgsConstructor
  11. public class OrderEventProducer {
  12. private RingBuffer<OrderEvent> ringBuffer;
  13. //投递数据的办法
  14. public void send(ByteBuffer byteBuffer){
  15. //在生产者发送消息的时候, 首先需要从RingBuffer里面获取下一个可用的序号
  16. long sequence = ringBuffer.next();
  17. try {
  18. //通过序号找到具体数据对象( 此时获取的对象是一个空对象,相当于获得到一个 new OrderEvent() )
  19. OrderEvent orderEvent = ringBuffer.get(sequence);
  20. //对传递过来的数据赋值到对象中(数据一直在0索引位)
  21. orderEvent.setValue(byteBuffer.getInt(0));
  22. } finally {
  23. //发布数据
  24. ringBuffer.publish(sequence);
  25. }
  26. }
  27. }

Disruptor 使用代码:

Disruptor 使用类似线程池 + 队列,需要进行资源关闭

  1. package com.dmbjz.quickstart;
  2. import com.lmax.disruptor.BlockingWaitStrategy;
  3. import com.lmax.disruptor.RingBuffer;
  4. import com.lmax.disruptor.dsl.Disruptor;
  5. import com.lmax.disruptor.dsl.ProducerType;
  6. import java.nio.ByteBuffer;
  7. import java.util.concurrent.LinkedBlockingDeque;
  8. import java.util.concurrent.ThreadPoolExecutor;
  9. import java.util.concurrent.TimeUnit;
  10. public class Main {
  11. public static void main(String[] args) {
  12. OrderEventFactory orderEventFactory = new OrderEventFactory();
  13. int ringBufferSize = 1024*1024;
  14. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20,30,300, TimeUnit.MICROSECONDS,new LinkedBlockingDeque<>());
  15. /*
  16. * 实例化Disruptor对象,指定实际需要处理的事件对象
  17. * @param orderEventFactory事件工厂
  18. * @param ringBufferSize容器长度,必须为2的次幂
  19. * @param threadPool线程池
  20. * @param ProducerType指定生产者模式为单个或多个
  21. * @param WaitStrategy等待策略
  22. */
  23. Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,threadPool,ProducerType.SINGLE,new BlockingWaitStrategy());
  24. //添加消费者监听,获取生产者投递的数据 (构建disruptor 与 消费者的一个关联关系)
  25. disruptor.handleEventsWith(new OrderEventHandler());
  26. //启动disruptor
  27. disruptor.start();
  28. //获取实际存储数据的容器
  29. RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
  30. //创建生产者
  31. OrderEventProducer producer = new OrderEventProducer(ringBuffer);
  32. //模拟投递100次数据
  33. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  34. for (int i = 0; i < 100; i++) {
  35. byteBuffer.putInt(0,i);
  36. producer.send(byteBuffer);
  37. }
  38. //关闭框架与线程池
  39. disruptor.shutdown();
  40. threadPool.shutdown();
  41. }
  42. }

运行结果:

image.png