概要

Disruptor是一个高性能的异步处理框架。性能远高于传统的Queue。

  • 环型数据结构
  • 无锁
  • 数据结构为数组

Disruptor实现生产和消费

多个消费者合作处理一批事件

1. pom maven依赖

  1. <dependency>
  2. <groupId>com.lmax</groupId>
  3. <artifactId>disruptor</artifactId>
  4. <version>3.4.2</version>
  5. </dependency>

2. event

包含数据实体。

  1. @Data
  2. public class MyEvent implements Serializable {
  3. private static final long serialVersionUID = -3902498674035052931L;
  4. private long id;
  5. }

3. ring buffer

  1. public class MyRingBuffer {
  2. private static final int CONSUMER_NUM = 10;
  3. public static RingBuffer<MyEvent> buffer = getRingBuffer();
  4. private static RingBuffer<MyEvent> getRingBuffer() {
  5. //创建ringbuffer 多生产者模式
  6. RingBuffer<MyEvent> buffer = RingBuffer.createMultiProducer(MyEvent::new, 512, new BlockingWaitStrategy());
  7. //创建消费者组
  8. MyConsumer[] consumers = new MyConsumer[CONSUMER_NUM];
  9. for (int i = 0; i < consumers.length; i++) {
  10. consumers[i] = new MyConsumer();
  11. }
  12. WorkerPool<MyEvent> workerPool = new WorkerPool<>(buffer, buffer.newBarrier(), new MyExceptionHandler(), consumers);
  13. //设置多个消费者的sequence序号 用于单独统计消费进度, 并且设置到ringbuffer中
  14. //把消费者的位置信息引用注入到生产者
  15. buffer.addGatingSequences(workerPool.getWorkerSequences());
  16. workerPool.start(Executors.newFixedThreadPool(CONSUMER_NUM));
  17. return buffer;
  18. }
  19. }

4. producer

  1. public class MyProducer {
  2. private RingBuffer<MyEvent> buffer;
  3. public MyProducer(RingBuffer<MyEvent> buffer) {
  4. this.buffer = buffer;
  5. }
  6. void sent(long data) {
  7. RingBuffer<MyEvent> buffer = this.buffer;
  8. long next = buffer.next();
  9. try {
  10. MyEvent myEvent = buffer.get(next);
  11. myEvent.setId(data);
  12. } finally {
  13. buffer.publish(next);
  14. }
  15. }
  16. }

5. consumer

  1. public class MyConsumer implements WorkHandler<MyEvent> {
  2. @Override
  3. public void onEvent(MyEvent myEvent) throws Exception {
  4. System.out.println(Thread.currentThread().getName()+" "+myEvent);
  5. }
  6. }

6 main方法

  1. public class Main {
  2. public static void main(String[] args) {
  3. RingBuffer<MyEvent> buffer = MyRingBuffer.buffer;
  4. MyProducer myProducer = new MyProducer(buffer);
  5. IntStream.range(0,40000).forEach(myProducer::sent);
  6. }
  7. }

多个消费者各自处理事件(Multicast)

每个消费者都会处理所有的事件,是一种多播模式。

  1. EventHandler<LogEvent>[] consumers = new LogEventConsumer[WORKER_SIZE];
  2. for (int i = 0; i < consumers.length; i++) {
  3. consumers[i] = new LogEventConsumer();
  4. }
  5. disruptor.handleEventsWith(consumers);