1:作用领域:
金融交易领域,撮合交易系统
2:高性能设计方案
前提:
JUC存在的问题
队列 | 描述 |
---|---|
ArrayBlockingQueue | 基于数组结构实现的一个有界阻塞队列 |
LinkedBlockingQueue | 基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列 |
PriorityBlockingQueue | 支持按优先级排序的无界阻塞队列 |
DelayQueue | 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列 |
LinkedTransferQueue | 基于链表结构实现的一个无界阻塞队列 |
LinkedBlockingDeque | 基于链表结构实现的一个双端阻塞队列 |
1:juc大部分采用的是ReentrantLock,稳定性要求高,只能选择有界队列
2:加锁方式影响性能,存在死活的隐患
3:有界队列的数组实现方式存在伪共享问题;
简介
英国外汇公司,LMAX开发的功性能队列,
1:解决内存队列的延迟的问题
支持高并发,
Apache Storm ,Camel Log4j2,采用的就是Disrutor
github :https://github.com/LMAX-Exchange/disruptor
设计方案:
- 环形数组结构
避免垃圾回收,数据对处理器的缓存机制友好,空间局部性原理 - 元素位置定位
数组长度2^n,位运算定位, - 无锁设计
先申请可以操作的元素在数组中的位置,CAS的方式 - 缓存行填充解决伪共享问题
- 基于时间驱动的生产者消费者模型,观察者模式
RingBuffer数据结构
一个数据,一个序列号,指向下一个可用的元素
- BlockingWaitStrategy策略:等待上一个元素被取走,采用ReentrantLock+condition实现阻塞,高并发下性能最差
- SleepingWaitStrategy:在循环中不断等待数据,Thread.yield让出cpu,使用LockSupport.parkNanos(1L)休眠
延时较高,异步日志常用 - YieldingWaitStrategy: 低延时场景,消费者不断循环监视缓冲区变化,使用Thread.yield让出CPU,
- BusySpinWaitStrategy:采用死循环,对延时非常苛刻的场景使用,CPU 要大于消费者线程,
推荐现在绑定CPU的场景下
单线程写数据流程:
1:申请写入m个元素,返回最大的序列号,判断是否覆盖未读的元素,如果是返回正确,就写
多生成者写的场景
防止多线程重复写一个元素的场景,每个线程分配不同的一段数组空间,CAS操作
防止读取的时候,读到为写入的元素,引入与RingBuffer大小相同的 buffer availabelBUffer,标记元素是否已经写入
消费者读数据
1:申请读取的序号
2:判断writer cursor> =n? 就是无法确定可读的最大下标,从read cursor开始去availbuffer 直到不可读的位置,
3:读取元素
读取数据的图示:
多个生成这写:
1:申请写入m个元素
2:返回最大可写序列号,每个生产者分配一段独享的空间
3:写入元素,同事设置availableBUffer的位置,标记那些已经写了
多生成写入的图示:
Disruptor核心概念
- RingBuffer(环形缓冲区):数组,内存级别缓存,创建sequencer,WaitStrateg的入口。
- Disruptor(总体执行入口):封装RingBuffer,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用。
- Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。
- Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
- SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
- WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略
- Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。
- EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口。
- EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence
构造器
public Disruptor(
//创建事件的工厂
final EventFactory<T> eventFactory,
//数据长度
final int ringBufferSize,
//线程工厂
final ThreadFactory threadFactory,
//生产者类型,单生成者,多生成者
final ProducerType producerType,
//等待策略
final WaitStrategy waitStrategy
)
Disruptor的使用
引入依赖
<!-- disruptor -->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
单生产者单消费者模式
1.创建Event(消息载体/事件)和EventFactory(事件工厂)
@Data
public class OrderEvent {//被放入队列中的消息内容
private long value;
private String name;
}
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
创建消息(事件)生产者
创建 OrderEventProducer 类,它将作为生产者使用public class OrderEventProducer {
//事件队列
private RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(long value,String name) {
// 获取事件队列 的下一个槽
long sequence = ringBuffer.next();
try {
//获取消息(事件)
OrderEvent orderEvent = ringBuffer.get(sequence);
// 写入消息数据
orderEvent.setValue(value);
orderEvent.setName(name);
} catch (Exception e) {
// TODO 异常处理
e.printStackTrace();
} finally {
System.out.println("生产者发送数据value:"+value+",name:"+name);
//发布事件
ringBuffer.publish(sequence);
}
}
}
3.创建消费者
创建 OrderEventHandler 类,并实现 EventHandler ,作为消费者。public class OrderEventHandler implements EventHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
// TODO 消费逻辑
System.out.println("消费者获取数据value:"+ event.getValue()+",name:"+event.getName());
}
}
4. 测试
public class DisruptorDemo {
public static void main(String[] args) throws Exception {
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.SINGLE, //单生产者
new YieldingWaitStrategy() //等待策略
);
//设置消费者用于处理RingBuffer的事件
disruptor.handleEventsWith(new OrderEventHandler());
disruptor.start();
//创建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
//发送消息
for(int i=0;i<100;i++){
eventProducer.onData(i,"Fox"+i);
}
disruptor.shutdown();
}
}
单生产者多消费者模式
消费者是多个的设施disruptor.handleEventsWith(new OrderEventHandler());
disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler())
上面的类似的是广播模式,一个消息会被全部消费者消费
如果要一个消费者消费:handleEventsWithWorkerPool 方 此时消费者要继承WorkHandler接口
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
// TODO 消费逻辑
System.out.println("消费者"+ Thread.currentThread().getName()
+"获取数据value:"+ event.getValue()+",name:"+event.getName());
}
@Override
public void onEvent(OrderEvent event) throws Exception {
// TODO 消费逻辑
System.out.println("消费者"+ Thread.currentThread().getName()
+"获取数据value:"+ event.getValue()+",name:"+event.getName());
}
}
多生产者多消费者模式 ```java public class DisruptorDemo2 {
public static void main(String[] args) throws Exception {
//创建disruptor
Disruptor<OrderEvent> disruptor = new Disruptor<>(
new OrderEventFactory(),
1024 * 1024,
Executors.defaultThreadFactory(),
ProducerType.MULTI, //多生产者
new YieldingWaitStrategy() //等待策略
);
//设置消费者用于处理RingBuffer的事件
//disruptor.handleEventsWith(new OrderEventHandler());
//设置多消费者,消息会被重复消费
//disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
//设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
//启动disruptor
disruptor.start();
//创建ringbuffer容器
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
new Thread(()->{
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for(int i=0;i<100;i++){
eventProducer.onData(i,"Fox"+i);
}
},"producer1").start();
new Thread(()->{
//创建生产者
OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
// 发送消息
for(int i=0;i<100;i++){
eventProducer.onData(i,"monkey"+i);
}
},"producer2").start();
//disruptor.shutdown();
}
}
**消费者优先级模式**<br />在配置消费者时,可以通过 .then 方法去实现顺序消费。
```java
disruptor.handleEventsWith(new OrderEventHandler())
.then(new OrderEventHandler())
.then(new OrderEventHandler());
handleEventsWith 与 handleEventsWithWorkerPool 都是支持 .then
disruptor.handleEventsWith(new OrderEventHandler())
.thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
.then(new OrderEventHandler());