模拟一亿数据,使用 Disruptor 与传统队列 ArrayBlockQueue 进行性能比较

实体类定义:

  1. package com.dmbjz.contrast;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import lombok.experimental.Accessors;
  6. /* 数据实体类 */
  7. @Data
  8. @AllArgsConstructor
  9. @NoArgsConstructor
  10. @Accessors(chain = true)
  11. public class MessageInfo{
  12. private Long id ;
  13. private String name;
  14. }

ArrayBlockingQueue案例:

  1. package com.dmbjz.contrast;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. /* ArrayBlockingQueue性能测试
  4. * 模拟往其中放入一亿数据并取出需要花费的时间
  5. */
  6. public class ArrayBlockingQueueTest {
  7. public static void main(String[] args) {
  8. ArrayBlockingQueue<MessageInfo> queue = new ArrayBlockingQueue<MessageInfo>(100000000);
  9. long startTime = System.currentTimeMillis();
  10. //添加元素
  11. new Thread(()->{
  12. long i = 0;
  13. while (i < 100000000) {
  14. MessageInfo data = new MessageInfo(i, "c" + i);
  15. try {
  16. queue.put(data);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. i++;
  21. }
  22. }).start();
  23. //取出元素
  24. new Thread(()->{
  25. int k = 0;
  26. while (k < 100000000) {
  27. try {
  28. queue.take();
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. k++;
  33. }
  34. long endTime = System.currentTimeMillis();
  35. System.out.println("ArrayBlockingQueue 花费时间 = " + (endTime - startTime) + "ms");
  36. }).start();
  37. }
  38. }

image.png

Disruptor案例:

消费者:

  1. package com.dmbjz.contrast;
  2. import com.lmax.disruptor.EventHandler;
  3. public class DataConsumer implements EventHandler<MessageInfo> {
  4. private long startTime;
  5. private int i;
  6. public DataConsumer() {
  7. this.startTime = System.currentTimeMillis();
  8. }
  9. @Override
  10. public void onEvent(MessageInfo data, long seq, boolean bool)
  11. throws Exception {
  12. i++;
  13. if (i == 100000000) {
  14. long endTime = System.currentTimeMillis();
  15. System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");
  16. }
  17. }
  18. }

执行:

使用性能最高的 YieldingWaitStrategy 策略进行演示

  1. package com.dmbjz.contrast;
  2. import com.lmax.disruptor.BlockingWaitStrategy;
  3. import com.lmax.disruptor.RingBuffer;
  4. import com.lmax.disruptor.YieldingWaitStrategy;
  5. import com.lmax.disruptor.dsl.Disruptor;
  6. import com.lmax.disruptor.dsl.ProducerType;
  7. import java.util.concurrent.Executors;
  8. public class DisruptorTest {
  9. public static void main(String[] args) {
  10. int ringBufferSize = 1024*1024;
  11. Disruptor<MessageInfo> disruptor = new Disruptor<MessageInfo>(
  12. () -> new MessageInfo(),
  13. ringBufferSize,
  14. Executors.newSingleThreadExecutor(),
  15. ProducerType.SINGLE,
  16. //new BlockingWaitStrategy()
  17. new YieldingWaitStrategy()
  18. );
  19. DataConsumer consumer = new DataConsumer();
  20. //消费数据
  21. disruptor.handleEventsWith(consumer);
  22. //启用框架
  23. disruptor.start();
  24. new Thread(()->{
  25. RingBuffer<MessageInfo> ringBuffer = disruptor.getRingBuffer();
  26. for (long i = 0; i < 100000000; i++) {
  27. long seq = ringBuffer.next();
  28. MessageInfo data = ringBuffer.get(seq);
  29. data.setId(i);
  30. data.setName("c" + i);
  31. ringBuffer.publish(seq);
  32. }
  33. }).start();
  34. }
  35. }

image.png