模拟一亿数据,使用 Disruptor 与传统队列 ArrayBlockQueue 进行性能比较
实体类定义:
package com.dmbjz.contrast;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
/* 数据实体类 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
public class MessageInfo{
private Long id ;
private String name;
}
ArrayBlockingQueue案例:
package com.dmbjz.contrast;
import java.util.concurrent.ArrayBlockingQueue;
/* ArrayBlockingQueue性能测试
* 模拟往其中放入一亿数据并取出需要花费的时间
*/
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
ArrayBlockingQueue<MessageInfo> queue = new ArrayBlockingQueue<MessageInfo>(100000000);
long startTime = System.currentTimeMillis();
//添加元素
new Thread(()->{
long i = 0;
while (i < 100000000) {
MessageInfo data = new MessageInfo(i, "c" + i);
try {
queue.put(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}).start();
//取出元素
new Thread(()->{
int k = 0;
while (k < 100000000) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
k++;
}
long endTime = System.currentTimeMillis();
System.out.println("ArrayBlockingQueue 花费时间 = " + (endTime - startTime) + "ms");
}).start();
}
}
Disruptor案例:
消费者:
package com.dmbjz.contrast;
import com.lmax.disruptor.EventHandler;
public class DataConsumer implements EventHandler<MessageInfo> {
private long startTime;
private int i;
public DataConsumer() {
this.startTime = System.currentTimeMillis();
}
@Override
public void onEvent(MessageInfo data, long seq, boolean bool)
throws Exception {
i++;
if (i == 100000000) {
long endTime = System.currentTimeMillis();
System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");
}
}
}
执行:
使用性能最高的 YieldingWaitStrategy 策略进行演示
package com.dmbjz.contrast;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executors;
public class DisruptorTest {
public static void main(String[] args) {
int ringBufferSize = 1024*1024;
Disruptor<MessageInfo> disruptor = new Disruptor<MessageInfo>(
() -> new MessageInfo(),
ringBufferSize,
Executors.newSingleThreadExecutor(),
ProducerType.SINGLE,
//new BlockingWaitStrategy()
new YieldingWaitStrategy()
);
DataConsumer consumer = new DataConsumer();
//消费数据
disruptor.handleEventsWith(consumer);
//启用框架
disruptor.start();
new Thread(()->{
RingBuffer<MessageInfo> ringBuffer = disruptor.getRingBuffer();
for (long i = 0; i < 100000000; i++) {
long seq = ringBuffer.next();
MessageInfo data = ringBuffer.get(seq);
data.setId(i);
data.setName("c" + i);
ringBuffer.publish(seq);
}
}).start();
}
}