模拟一亿数据,使用 Disruptor 与传统队列 ArrayBlockQueue 进行性能比较
实体类定义:
package com.dmbjz.contrast;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.experimental.Accessors;/* 数据实体类 */@Data@AllArgsConstructor@NoArgsConstructor@Accessors(chain = true)public class MessageInfo{private Long id ;private String name;}
ArrayBlockingQueue案例:
package com.dmbjz.contrast;import java.util.concurrent.ArrayBlockingQueue;/* ArrayBlockingQueue性能测试* 模拟往其中放入一亿数据并取出需要花费的时间*/public class ArrayBlockingQueueTest {public static void main(String[] args) {ArrayBlockingQueue<MessageInfo> queue = new ArrayBlockingQueue<MessageInfo>(100000000);long startTime = System.currentTimeMillis();//添加元素new Thread(()->{long i = 0;while (i < 100000000) {MessageInfo data = new MessageInfo(i, "c" + i);try {queue.put(data);} catch (InterruptedException e) {e.printStackTrace();}i++;}}).start();//取出元素new Thread(()->{int k = 0;while (k < 100000000) {try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}k++;}long endTime = System.currentTimeMillis();System.out.println("ArrayBlockingQueue 花费时间 = " + (endTime - startTime) + "ms");}).start();}}

Disruptor案例:
消费者:
package com.dmbjz.contrast;import com.lmax.disruptor.EventHandler;public class DataConsumer implements EventHandler<MessageInfo> {private long startTime;private int i;public DataConsumer() {this.startTime = System.currentTimeMillis();}@Overridepublic void onEvent(MessageInfo data, long seq, boolean bool)throws Exception {i++;if (i == 100000000) {long endTime = System.currentTimeMillis();System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");}}}
执行:
使用性能最高的 YieldingWaitStrategy 策略进行演示
package com.dmbjz.contrast;import com.lmax.disruptor.BlockingWaitStrategy;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.YieldingWaitStrategy;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.Executors;public class DisruptorTest {public static void main(String[] args) {int ringBufferSize = 1024*1024;Disruptor<MessageInfo> disruptor = new Disruptor<MessageInfo>(() -> new MessageInfo(),ringBufferSize,Executors.newSingleThreadExecutor(),ProducerType.SINGLE,//new BlockingWaitStrategy()new YieldingWaitStrategy());DataConsumer consumer = new DataConsumer();//消费数据disruptor.handleEventsWith(consumer);//启用框架disruptor.start();new Thread(()->{RingBuffer<MessageInfo> ringBuffer = disruptor.getRingBuffer();for (long i = 0; i < 100000000; i++) {long seq = ringBuffer.next();MessageInfo data = ringBuffer.get(seq);data.setId(i);data.setName("c" + i);ringBuffer.publish(seq);}}).start();}}

