代码编写:
事件实体类:
package com.dmbjz.quickstart;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;/** 消息数据对象 */@Data@AllArgsConstructor@NoArgsConstructorpublic class OrderEvent {private Integer value; //订单价格}
事件工厂:
用于在 RingBuffer 中返回对象实例
package com.dmbjz.quickstart;import com.lmax.disruptor.EventFactory;/** Event工厂,用于创建对象实例,需要实现EventFactory接口 */public class OrderEventFactory implements EventFactory<OrderEvent> {//这个方法就是为了返回空的消息&数据对象(Event)@Overridepublic OrderEvent newInstance() {return new OrderEvent();}}
消费者:
获取到事件之后需要执行的方法
package com.dmbjz.quickstart;import com.lmax.disruptor.EventHandler;/**监听事件类,用于处理数据(Event类),可以理解为消费者 */public class OrderEventHandler implements EventHandler<OrderEvent> {/* 事件监听模式 */@Overridepublic void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {System.out.println("消费者: " + orderEvent.getValue());}}
生产者:
获取 RingBuffer 的下标 > 通过下标获取创建出的事件对象 > 将对象进行赋值后塞回 RingBuffer
package com.dmbjz.quickstart;import com.lmax.disruptor.RingBuffer;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import java.nio.ByteBuffer;/**生产者 */@Data@AllArgsConstructor@NoArgsConstructorpublic class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;//投递数据的办法public void send(ByteBuffer byteBuffer){//在生产者发送消息的时候, 首先需要从RingBuffer里面获取下一个可用的序号long sequence = ringBuffer.next();try {//通过序号找到具体数据对象( 此时获取的对象是一个空对象,相当于获得到一个 new OrderEvent() )OrderEvent orderEvent = ringBuffer.get(sequence);//对传递过来的数据赋值到对象中(数据一直在0索引位)orderEvent.setValue(byteBuffer.getInt(0));} finally {//发布数据ringBuffer.publish(sequence);}}}
Disruptor 使用代码:
Disruptor 使用类似线程池 + 队列,需要进行资源关闭
package com.dmbjz.quickstart;import com.lmax.disruptor.BlockingWaitStrategy;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import java.nio.ByteBuffer;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 1024*1024;ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20,30,300, TimeUnit.MICROSECONDS,new LinkedBlockingDeque<>());/** 实例化Disruptor对象,指定实际需要处理的事件对象* @param orderEventFactory事件工厂* @param ringBufferSize容器长度,必须为2的次幂* @param threadPool线程池* @param ProducerType指定生产者模式为单个或多个* @param WaitStrategy等待策略*/Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,threadPool,ProducerType.SINGLE,new BlockingWaitStrategy());//添加消费者监听,获取生产者投递的数据 (构建disruptor 与 消费者的一个关联关系)disruptor.handleEventsWith(new OrderEventHandler());//启动disruptordisruptor.start();//获取实际存储数据的容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者OrderEventProducer producer = new OrderEventProducer(ringBuffer);//模拟投递100次数据ByteBuffer byteBuffer = ByteBuffer.allocate(1024);for (int i = 0; i < 100; i++) {byteBuffer.putInt(0,i);producer.send(byteBuffer);}//关闭框架与线程池disruptor.shutdown();threadPool.shutdown();}}
运行结果:

