概要
Disruptor是一个高性能的异步处理框架。性能远高于传统的Queue。
- 环型数据结构
- 无锁
- 数据结构为数组
Disruptor实现生产和消费
多个消费者合作处理一批事件
1. pom maven依赖
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version></dependency>
2. event
包含数据实体。
@Datapublic class MyEvent implements Serializable {private static final long serialVersionUID = -3902498674035052931L;private long id;}
3. ring buffer
public class MyRingBuffer {private static final int CONSUMER_NUM = 10;public static RingBuffer<MyEvent> buffer = getRingBuffer();private static RingBuffer<MyEvent> getRingBuffer() {//创建ringbuffer 多生产者模式RingBuffer<MyEvent> buffer = RingBuffer.createMultiProducer(MyEvent::new, 512, new BlockingWaitStrategy());//创建消费者组MyConsumer[] consumers = new MyConsumer[CONSUMER_NUM];for (int i = 0; i < consumers.length; i++) {consumers[i] = new MyConsumer();}WorkerPool<MyEvent> workerPool = new WorkerPool<>(buffer, buffer.newBarrier(), new MyExceptionHandler(), consumers);//设置多个消费者的sequence序号 用于单独统计消费进度, 并且设置到ringbuffer中//把消费者的位置信息引用注入到生产者buffer.addGatingSequences(workerPool.getWorkerSequences());workerPool.start(Executors.newFixedThreadPool(CONSUMER_NUM));return buffer;}}
4. producer
public class MyProducer {private RingBuffer<MyEvent> buffer;public MyProducer(RingBuffer<MyEvent> buffer) {this.buffer = buffer;}void sent(long data) {RingBuffer<MyEvent> buffer = this.buffer;long next = buffer.next();try {MyEvent myEvent = buffer.get(next);myEvent.setId(data);} finally {buffer.publish(next);}}}
5. consumer
public class MyConsumer implements WorkHandler<MyEvent> {@Overridepublic void onEvent(MyEvent myEvent) throws Exception {System.out.println(Thread.currentThread().getName()+" "+myEvent);}}
6 main方法
public class Main {public static void main(String[] args) {RingBuffer<MyEvent> buffer = MyRingBuffer.buffer;MyProducer myProducer = new MyProducer(buffer);IntStream.range(0,40000).forEach(myProducer::sent);}}
多个消费者各自处理事件(Multicast)
每个消费者都会处理所有的事件,是一种多播模式。
EventHandler<LogEvent>[] consumers = new LogEventConsumer[WORKER_SIZE];for (int i = 0; i < consumers.length; i++) {consumers[i] = new LogEventConsumer();}disruptor.handleEventsWith(consumers);
