1、剖析Disruptor:为什么会这么快
https://www.cnblogs.com/haiq/p/4112689.html
Disruptor单消费者单生产者Demo
public class Test2 {public static void main(String[] args) {int ringBufferSize = 1024 * 1024;//初始化disruptorDisruptor<OrderEvent1> disruptor = new Disruptor(//事件工厂 用于初始化事件new OrdetEventFactory1(),//ringBufferSizeringBufferSize,//线程工厂r -> {Thread thread = new Thread(r);return thread;},//生产者类型ProducerType.SINGLE,//阻塞策略new BlockingWaitStrategy());//设置消费者Handlerdisruptor.handleEventsWith(new OrderHandler1());//启动无锁队列disruptor.start();//获取ringBufferRingBuffer<OrderEvent1> ringBuffer = disruptor.getRingBuffer();ByteBuffer allocate = ByteBuffer.allocate(8);for (int i = 0; i < 100; i++) {//通过ringBuffer向队列发送order事件OrderProducer1 orderProducer = new OrderProducer1(ringBuffer);orderProducer.addData(allocate.putLong(0,i));}//关闭队列disruptor.shutdown();}}class OrdetEventFactory1 implements EventFactory<OrderEvent1> {@Overridepublic OrderEvent1 newInstance() {return new OrderEvent1();}}@Data@AllArgsConstructorclass OrderProducer1 {private RingBuffer<OrderEvent1> ringBuffer;public void addData(ByteBuffer bb){long next = ringBuffer.next();try {OrderEvent1 orderEvent = ringBuffer.get(next);orderEvent.setValue(bb.getLong(0));} finally {ringBuffer.publish(next);}}}class OrderHandler1 implements EventHandler<OrderEvent1> {@Overridepublic void onEvent(OrderEvent1 orderEvent, long l, boolean b) throws Exception {System.out.println("OrderHandler1:" + orderEvent.getValue());}}@Dataclass OrderEvent1 {private Long value;}
Disruptor多生产者多消费者Demo
//Disruptor 多生产者多消费者Deompublic class Main {public static void main(String[] args) throws InterruptedException {Disruptor<Order2> disruptor = new Disruptor(new Order2Factory(),1024 * 1024,Executors.newFixedThreadPool(5),ProducerType.MULTI,new YieldingWaitStrategy());RingBuffer<Order2> ringBuffer = disruptor.getRingBuffer();Order2Handle[] order2Handles = new Order2Handle[10];for (int i = 0; i < 10; i++) {order2Handles[i] = new Order2Handle("C"+i);}disruptor.handleEventsWithWorkerPool(order2Handles);disruptor.start();CyclicBarrier barrier = new CyclicBarrier(100);for (int i = 0; i < 100; i++) {Producer producer = new Producer(ringBuffer);new Thread(new Runnable() {@Overridepublic void run() {try {barrier.await();}catch (Exception e){}for (int j = 0; j < 1000; j++) {producer.putOrder(UUID.randomUUID().toString());}}}).start();}Thread.sleep(2000);System.err.println("----------线程创建完毕,开始生产数据----------");Thread.sleep(10000);System.err.println("任务总数1:" + order2Handles[0].getCount());}}@Dataclass Order2{private String id;private String data;}@Slf4j@AllArgsConstructorclass Order2Handle implements WorkHandler<Order2>{private String orderId;private AtomicInteger count = new AtomicInteger(0);private Random random = new Random();public Order2Handle(String orderId){this.orderId = orderId;}@Overridepublic void onEvent(Order2 order2) throws Exception {log.info("orderId:{} {}",orderId,order2.getId());count.incrementAndGet();}public int getCount(){return count.get();}}class Order2Factory implements EventFactory<Order2>{@Overridepublic Order2 newInstance() {return new Order2();}}class EventExceptionHandler implements ExceptionHandler{@Overridepublic void handleEventException(Throwable throwable, long l, Object o) {}@Overridepublic void handleOnStartException(Throwable throwable) {}@Overridepublic void handleOnShutdownException(Throwable throwable) {}}class Producer{private RingBuffer<Order2> ringBuffer;public Producer(RingBuffer<Order2> ringBuffer){this.ringBuffer = ringBuffer;}public void putOrder(String uuid){long next = ringBuffer.next();try {Order2 order2 = ringBuffer.get(next);order2.setId(uuid);} finally {ringBuffer.publish(next);}}}
