1:作用领域:
金融交易领域,撮合交易系统
2:高性能设计方案
前提:
JUC存在的问题
| 队列 | 描述 |
|---|---|
| ArrayBlockingQueue | 基于数组结构实现的一个有界阻塞队列 |
| LinkedBlockingQueue | 基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列 |
| PriorityBlockingQueue | 支持按优先级排序的无界阻塞队列 |
| DelayQueue | 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列 |
| SynchronousQueue | 不存储元素的阻塞队列 |
| LinkedTransferQueue | 基于链表结构实现的一个无界阻塞队列 |
| LinkedBlockingDeque | 基于链表结构实现的一个双端阻塞队列 |
1:juc大部分采用的是ReentrantLock,稳定性要求高,只能选择有界队列
2:加锁方式影响性能,存在死活的隐患
3:有界队列的数组实现方式存在伪共享问题;
简介
英国外汇公司,LMAX开发的功性能队列,
1:解决内存队列的延迟的问题
支持高并发,
Apache Storm ,Camel Log4j2,采用的就是Disrutor
github :https://github.com/LMAX-Exchange/disruptor
设计方案:
- 环形数组结构
避免垃圾回收,数据对处理器的缓存机制友好,空间局部性原理 - 元素位置定位
数组长度2^n,位运算定位, - 无锁设计
先申请可以操作的元素在数组中的位置,CAS的方式 - 缓存行填充解决伪共享问题
- 基于时间驱动的生产者消费者模型,观察者模式
RingBuffer数据结构
一个数据,一个序列号,指向下一个可用的元素
- BlockingWaitStrategy策略:等待上一个元素被取走,采用ReentrantLock+condition实现阻塞,高并发下性能最差
- SleepingWaitStrategy:在循环中不断等待数据,Thread.yield让出cpu,使用LockSupport.parkNanos(1L)休眠
延时较高,异步日志常用 - YieldingWaitStrategy: 低延时场景,消费者不断循环监视缓冲区变化,使用Thread.yield让出CPU,
- BusySpinWaitStrategy:采用死循环,对延时非常苛刻的场景使用,CPU 要大于消费者线程,
推荐现在绑定CPU的场景下
单线程写数据流程:
1:申请写入m个元素,返回最大的序列号,判断是否覆盖未读的元素,如果是返回正确,就写
多生成者写的场景
防止多线程重复写一个元素的场景,每个线程分配不同的一段数组空间,CAS操作
防止读取的时候,读到为写入的元素,引入与RingBuffer大小相同的 buffer availabelBUffer,标记元素是否已经写入
消费者读数据
1:申请读取的序号
2:判断writer cursor> =n? 就是无法确定可读的最大下标,从read cursor开始去availbuffer 直到不可读的位置,
3:读取元素
读取数据的图示:
多个生成这写:
1:申请写入m个元素
2:返回最大可写序列号,每个生产者分配一段独享的空间
3:写入元素,同事设置availableBUffer的位置,标记那些已经写了
多生成写入的图示:
Disruptor核心概念
- RingBuffer(环形缓冲区):数组,内存级别缓存,创建sequencer,WaitStrateg的入口。
- Disruptor(总体执行入口):封装RingBuffer,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用。
- Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。
- Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
- SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
- WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略
- Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。
- EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口。
- EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence

构造器
public Disruptor(//创建事件的工厂final EventFactory<T> eventFactory,//数据长度final int ringBufferSize,//线程工厂final ThreadFactory threadFactory,//生产者类型,单生成者,多生成者final ProducerType producerType,//等待策略final WaitStrategy waitStrategy)
Disruptor的使用
引入依赖
<!-- disruptor --><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency>
单生产者单消费者模式
1.创建Event(消息载体/事件)和EventFactory(事件工厂)
@Datapublic class OrderEvent {//被放入队列中的消息内容private long value;private String name;}public class OrderEventFactory implements EventFactory<OrderEvent> {@Overridepublic OrderEvent newInstance() {return new OrderEvent();}}
创建消息(事件)生产者
创建 OrderEventProducer 类,它将作为生产者使用public class OrderEventProducer {//事件队列private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void onData(long value,String name) {// 获取事件队列 的下一个槽long sequence = ringBuffer.next();try {//获取消息(事件)OrderEvent orderEvent = ringBuffer.get(sequence);// 写入消息数据orderEvent.setValue(value);orderEvent.setName(name);} catch (Exception e) {// TODO 异常处理e.printStackTrace();} finally {System.out.println("生产者发送数据value:"+value+",name:"+name);//发布事件ringBuffer.publish(sequence);}}}
3.创建消费者
创建 OrderEventHandler 类,并实现 EventHandler ,作为消费者。public class OrderEventHandler implements EventHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println("消费者获取数据value:"+ event.getValue()+",name:"+event.getName());}}
4. 测试
public class DisruptorDemo {public static void main(String[] args) throws Exception {//创建disruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.SINGLE, //单生产者new YieldingWaitStrategy() //等待策略);//设置消费者用于处理RingBuffer的事件disruptor.handleEventsWith(new OrderEventHandler());disruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();//创建生产者OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);//发送消息for(int i=0;i<100;i++){eventProducer.onData(i,"Fox"+i);}disruptor.shutdown();}}
单生产者多消费者模式
消费者是多个的设施disruptor.handleEventsWith(new OrderEventHandler());disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler())
上面的类似的是广播模式,一个消息会被全部消费者消费
如果要一个消费者消费:handleEventsWithWorkerPool 方 此时消费者要继承WorkHandler接口
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println("消费者"+ Thread.currentThread().getName()+"获取数据value:"+ event.getValue()+",name:"+event.getName());}@Overridepublic void onEvent(OrderEvent event) throws Exception {// TODO 消费逻辑System.out.println("消费者"+ Thread.currentThread().getName()+"获取数据value:"+ event.getValue()+",name:"+event.getName());}}
多生产者多消费者模式 ```java public class DisruptorDemo2 {
public static void main(String[] args) throws Exception {
//创建disruptorDisruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.MULTI, //多生产者new YieldingWaitStrategy() //等待策略);//设置消费者用于处理RingBuffer的事件//disruptor.handleEventsWith(new OrderEventHandler());//设置多消费者,消息会被重复消费//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());//启动disruptordisruptor.start();//创建ringbuffer容器RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();new Thread(()->{//创建生产者OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);// 发送消息for(int i=0;i<100;i++){eventProducer.onData(i,"Fox"+i);}},"producer1").start();new Thread(()->{//创建生产者OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);// 发送消息for(int i=0;i<100;i++){eventProducer.onData(i,"monkey"+i);}},"producer2").start();
//disruptor.shutdown();}
}
**消费者优先级模式**<br />在配置消费者时,可以通过 .then 方法去实现顺序消费。```javadisruptor.handleEventsWith(new OrderEventHandler()).then(new OrderEventHandler()).then(new OrderEventHandler());
handleEventsWith 与 handleEventsWithWorkerPool 都是支持 .then
disruptor.handleEventsWith(new OrderEventHandler()).thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler()).then(new OrderEventHandler());
