1、剖析Disruptor:为什么会这么快
https://www.cnblogs.com/haiq/p/4112689.html
Disruptor单消费者单生产者Demo
public class Test2 {
public static void main(String[] args) {
int ringBufferSize = 1024 * 1024;
//初始化disruptor
Disruptor<OrderEvent1> disruptor = new Disruptor(
//事件工厂 用于初始化事件
new OrdetEventFactory1(),
//ringBufferSize
ringBufferSize,
//线程工厂
r -> {
Thread thread = new Thread(r);
return thread;
},
//生产者类型
ProducerType.SINGLE,
//阻塞策略
new BlockingWaitStrategy());
//设置消费者Handler
disruptor.handleEventsWith(new OrderHandler1());
//启动无锁队列
disruptor.start();
//获取ringBuffer
RingBuffer<OrderEvent1> ringBuffer = disruptor.getRingBuffer();
ByteBuffer allocate = ByteBuffer.allocate(8);
for (int i = 0; i < 100; i++) {
//通过ringBuffer向队列发送order事件
OrderProducer1 orderProducer = new OrderProducer1(ringBuffer);
orderProducer.addData(allocate.putLong(0,i));
}
//关闭队列
disruptor.shutdown();
}
}
class OrdetEventFactory1 implements EventFactory<OrderEvent1> {
@Override
public OrderEvent1 newInstance() {
return new OrderEvent1();
}
}
@Data
@AllArgsConstructor
class OrderProducer1 {
private RingBuffer<OrderEvent1> ringBuffer;
public void addData(ByteBuffer bb){
long next = ringBuffer.next();
try {
OrderEvent1 orderEvent = ringBuffer.get(next);
orderEvent.setValue(bb.getLong(0));
} finally {
ringBuffer.publish(next);
}
}
}
class OrderHandler1 implements EventHandler<OrderEvent1> {
@Override
public void onEvent(OrderEvent1 orderEvent, long l, boolean b) throws Exception {
System.out.println("OrderHandler1:" + orderEvent.getValue());
}
}
@Data
class OrderEvent1 {
private Long value;
}
Disruptor多生产者多消费者Demo
//Disruptor 多生产者多消费者Deom
public class Main {
public static void main(String[] args) throws InterruptedException {
Disruptor<Order2> disruptor = new Disruptor(
new Order2Factory(),
1024 * 1024,
Executors.newFixedThreadPool(5),
ProducerType.MULTI,
new YieldingWaitStrategy()
);
RingBuffer<Order2> ringBuffer = disruptor.getRingBuffer();
Order2Handle[] order2Handles = new Order2Handle[10];
for (int i = 0; i < 10; i++) {
order2Handles[i] = new Order2Handle("C"+i);
}
disruptor.handleEventsWithWorkerPool(order2Handles);
disruptor.start();
CyclicBarrier barrier = new CyclicBarrier(100);
for (int i = 0; i < 100; i++) {
Producer producer = new Producer(ringBuffer);
new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.await();
}catch (Exception e){
}
for (int j = 0; j < 1000; j++) {
producer.putOrder(UUID.randomUUID().toString());
}
}
}).start();
}
Thread.sleep(2000);
System.err.println("----------线程创建完毕,开始生产数据----------");
Thread.sleep(10000);
System.err.println("任务总数1:" + order2Handles[0].getCount());
}
}
@Data
class Order2{
private String id;
private String data;
}
@Slf4j
@AllArgsConstructor
class Order2Handle implements WorkHandler<Order2>{
private String orderId;
private AtomicInteger count = new AtomicInteger(0);
private Random random = new Random();
public Order2Handle(String orderId){
this.orderId = orderId;
}
@Override
public void onEvent(Order2 order2) throws Exception {
log.info("orderId:{} {}",orderId,order2.getId());
count.incrementAndGet();
}
public int getCount(){
return count.get();
}
}
class Order2Factory implements EventFactory<Order2>{
@Override
public Order2 newInstance() {
return new Order2();
}
}
class EventExceptionHandler implements ExceptionHandler{
@Override
public void handleEventException(Throwable throwable, long l, Object o) {
}
@Override
public void handleOnStartException(Throwable throwable) {
}
@Override
public void handleOnShutdownException(Throwable throwable) {
}
}
class Producer{
private RingBuffer<Order2> ringBuffer;
public Producer(RingBuffer<Order2> ringBuffer){
this.ringBuffer = ringBuffer;
}
public void putOrder(String uuid){
long next = ringBuffer.next();
try {
Order2 order2 = ringBuffer.get(next);
order2.setId(uuid);
} finally {
ringBuffer.publish(next);
}
}
}