代码编写:
事件实体类:
package com.dmbjz.quickstart;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/** 消息数据对象 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
private Integer value; //订单价格
}
事件工厂:
用于在 RingBuffer 中返回对象实例
package com.dmbjz.quickstart;
import com.lmax.disruptor.EventFactory;
/** Event工厂,用于创建对象实例,需要实现EventFactory接口 */
public class OrderEventFactory implements EventFactory<OrderEvent> {
//这个方法就是为了返回空的消息&数据对象(Event)
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
消费者:
获取到事件之后需要执行的方法
package com.dmbjz.quickstart;
import com.lmax.disruptor.EventHandler;
/**监听事件类,用于处理数据(Event类),可以理解为消费者 */
public class OrderEventHandler implements EventHandler<OrderEvent> {
/* 事件监听模式 */
@Override
public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception {
System.out.println("消费者: " + orderEvent.getValue());
}
}
生产者:
获取 RingBuffer 的下标 > 通过下标获取创建出的事件对象 > 将对象进行赋值后塞回 RingBuffer
package com.dmbjz.quickstart;
import com.lmax.disruptor.RingBuffer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.nio.ByteBuffer;
/**生产者 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEventProducer {
private RingBuffer<OrderEvent> ringBuffer;
//投递数据的办法
public void send(ByteBuffer byteBuffer){
//在生产者发送消息的时候, 首先需要从RingBuffer里面获取下一个可用的序号
long sequence = ringBuffer.next();
try {
//通过序号找到具体数据对象( 此时获取的对象是一个空对象,相当于获得到一个 new OrderEvent() )
OrderEvent orderEvent = ringBuffer.get(sequence);
//对传递过来的数据赋值到对象中(数据一直在0索引位)
orderEvent.setValue(byteBuffer.getInt(0));
} finally {
//发布数据
ringBuffer.publish(sequence);
}
}
}
Disruptor 使用代码:
Disruptor 使用类似线程池 + 队列,需要进行资源关闭
package com.dmbjz.quickstart;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 1024*1024;
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20,30,300, TimeUnit.MICROSECONDS,new LinkedBlockingDeque<>());
/*
* 实例化Disruptor对象,指定实际需要处理的事件对象
* @param orderEventFactory事件工厂
* @param ringBufferSize容器长度,必须为2的次幂
* @param threadPool线程池
* @param ProducerType指定生产者模式为单个或多个
* @param WaitStrategy等待策略
*/
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,threadPool,ProducerType.SINGLE,new BlockingWaitStrategy());
//添加消费者监听,获取生产者投递的数据 (构建disruptor 与 消费者的一个关联关系)
disruptor.handleEventsWith(new OrderEventHandler());
//启动disruptor
disruptor.start();
//获取实际存储数据的容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
//创建生产者
OrderEventProducer producer = new OrderEventProducer(ringBuffer);
//模拟投递100次数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
for (int i = 0; i < 100; i++) {
byteBuffer.putInt(0,i);
producer.send(byteBuffer);
}
//关闭框架与线程池
disruptor.shutdown();
threadPool.shutdown();
}
}