1:作用领域:
金融交易领域,撮合交易系统
2:高性能设计方案

前提:

JUC存在的问题

队列 描述
ArrayBlockingQueue 基于数组结构实现的一个有界阻塞队列
LinkedBlockingQueue 基于链表结构实现的一个无界阻塞队列,指定容量为有界阻塞队列
PriorityBlockingQueue 支持按优先级排序的无界阻塞队列
DelayQueue 基于优先级队列(PriorityBlockingQueue)实现的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列
LinkedTransferQueue 基于链表结构实现的一个无界阻塞队列
LinkedBlockingDeque 基于链表结构实现的一个双端阻塞队列

1:juc大部分采用的是ReentrantLock,稳定性要求高,只能选择有界队列
2:加锁方式影响性能,存在死活的隐患
3:有界队列的数组实现方式存在伪共享问题;

简介

英国外汇公司,LMAX开发的功性能队列,
1:解决内存队列的延迟的问题
支持高并发,
Apache Storm ,Camel Log4j2,采用的就是Disrutor
github :https://github.com/LMAX-Exchange/disruptor

设计方案:

  1. 环形数组结构
    避免垃圾回收,数据对处理器的缓存机制友好,空间局部性原理
  2. 元素位置定位
    数组长度2^n,位运算定位,
  3. 无锁设计
    先申请可以操作的元素在数组中的位置,CAS的方式
  4. 缓存行填充解决伪共享问题
  5. 基于时间驱动的生产者消费者模型,观察者模式

RingBuffer数据结构

一个数据,一个序列号,指向下一个可用的元素
image.png

  1. 数据长度要求2的n次幂
  2. 当所有位置都放满了,再放下一个,会把0号位置覆盖

    当覆盖数据的指定策略:

  • BlockingWaitStrategy策略:等待上一个元素被取走,采用ReentrantLock+condition实现阻塞,高并发下性能最差
  • SleepingWaitStrategy:在循环中不断等待数据,Thread.yield让出cpu,使用LockSupport.parkNanos(1L)休眠
    延时较高,异步日志常用
  • YieldingWaitStrategy: 低延时场景,消费者不断循环监视缓冲区变化,使用Thread.yield让出CPU,
  • BusySpinWaitStrategy:采用死循环,对延时非常苛刻的场景使用,CPU 要大于消费者线程,
    推荐现在绑定CPU的场景下

单线程写数据流程:
1:申请写入m个元素,返回最大的序列号,判断是否覆盖未读的元素,如果是返回正确,就写
image.png
多生成者写的场景
防止多线程重复写一个元素的场景,每个线程分配不同的一段数组空间,CAS操作
防止读取的时候,读到为写入的元素,引入与RingBuffer大小相同的 buffer availabelBUffer,标记元素是否已经写入

消费者读数据
1:申请读取的序号
2:判断writer cursor> =n? 就是无法确定可读的最大下标,从read cursor开始去availbuffer 直到不可读的位置,
3:读取元素
读取数据的图示:
image.png

多个生成这写:
1:申请写入m个元素
2:返回最大可写序列号,每个生产者分配一段独享的空间
3:写入元素,同事设置availableBUffer的位置,标记那些已经写了
多生成写入的图示:
image.png

Disruptor核心概念

  • RingBuffer(环形缓冲区):数组,内存级别缓存,创建sequencer,WaitStrateg的入口。
  • Disruptor(总体执行入口):封装RingBuffer,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用。
  • Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。
  • Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
  • SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
  • WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略
  • Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。
  • EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口。
  • EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence

image.png
构造器

  1. public Disruptor(
  2. //创建事件的工厂
  3. final EventFactory<T> eventFactory,
  4. //数据长度
  5. final int ringBufferSize,
  6. //线程工厂
  7. final ThreadFactory threadFactory,
  8. //生产者类型,单生成者,多生成者
  9. final ProducerType producerType,
  10. //等待策略
  11. final WaitStrategy waitStrategy
  12. )

Disruptor的使用
引入依赖

  1. <!-- disruptor -->
  2. <dependency>
  3. <groupId>com.lmax</groupId>
  4. <artifactId>disruptor</artifactId>
  5. <version>3.3.4</version>
  6. </dependency>

单生产者单消费者模式
1.创建Event(消息载体/事件)和EventFactory(事件工厂)

  1. @Data
  2. public class OrderEvent {//被放入队列中的消息内容
  3. private long value;
  4. private String name;
  5. }
  6. public class OrderEventFactory implements EventFactory<OrderEvent> {
  7. @Override
  8. public OrderEvent newInstance() {
  9. return new OrderEvent();
  10. }
  11. }
  1. 创建消息(事件)生产者
    创建 OrderEventProducer 类,它将作为生产者使用

    1. public class OrderEventProducer {
    2. //事件队列
    3. private RingBuffer<OrderEvent> ringBuffer;
    4. public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
    5. this.ringBuffer = ringBuffer;
    6. }
    7. public void onData(long value,String name) {
    8. // 获取事件队列 的下一个槽
    9. long sequence = ringBuffer.next();
    10. try {
    11. //获取消息(事件)
    12. OrderEvent orderEvent = ringBuffer.get(sequence);
    13. // 写入消息数据
    14. orderEvent.setValue(value);
    15. orderEvent.setName(name);
    16. } catch (Exception e) {
    17. // TODO 异常处理
    18. e.printStackTrace();
    19. } finally {
    20. System.out.println("生产者发送数据value:"+value+",name:"+name);
    21. //发布事件
    22. ringBuffer.publish(sequence);
    23. }
    24. }
    25. }

    3.创建消费者
    创建 OrderEventHandler 类,并实现 EventHandler ,作为消费者。

    1. public class OrderEventHandler implements EventHandler<OrderEvent> {
    2. @Override
    3. public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
    4. // TODO 消费逻辑
    5. System.out.println("消费者获取数据value:"+ event.getValue()+",name:"+event.getName());
    6. }
    7. }

    4. 测试

    1. public class DisruptorDemo {
    2. public static void main(String[] args) throws Exception {
    3. //创建disruptor
    4. Disruptor<OrderEvent> disruptor = new Disruptor<>(
    5. new OrderEventFactory(),
    6. 1024 * 1024,
    7. Executors.defaultThreadFactory(),
    8. ProducerType.SINGLE, //单生产者
    9. new YieldingWaitStrategy() //等待策略
    10. );
    11. //设置消费者用于处理RingBuffer的事件
    12. disruptor.handleEventsWith(new OrderEventHandler());
    13. disruptor.start();
    14. //创建ringbuffer容器
    15. RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
    16. //创建生产者
    17. OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
    18. //发送消息
    19. for(int i=0;i<100;i++){
    20. eventProducer.onData(i,"Fox"+i);
    21. }
    22. disruptor.shutdown();
    23. }
    24. }

    单生产者多消费者模式
    消费者是多个的设施
    disruptor.handleEventsWith(new OrderEventHandler());
    disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler())
    上面的类似的是广播模式,一个消息会被全部消费者消费
    如果要一个消费者消费:handleEventsWithWorkerPool 方 此时消费者要继承WorkHandler接口
    disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());

    1. public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
    2. @Override
    3. public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
    4. // TODO 消费逻辑
    5. System.out.println("消费者"+ Thread.currentThread().getName()
    6. +"获取数据value:"+ event.getValue()+",name:"+event.getName());
    7. }
    8. @Override
    9. public void onEvent(OrderEvent event) throws Exception {
    10. // TODO 消费逻辑
    11. System.out.println("消费者"+ Thread.currentThread().getName()
    12. +"获取数据value:"+ event.getValue()+",name:"+event.getName());
    13. }
    14. }

    多生产者多消费者模式 ```java public class DisruptorDemo2 {

    public static void main(String[] args) throws Exception {

    1. //创建disruptor
    2. Disruptor<OrderEvent> disruptor = new Disruptor<>(
    3. new OrderEventFactory(),
    4. 1024 * 1024,
    5. Executors.defaultThreadFactory(),
    6. ProducerType.MULTI, //多生产者
    7. new YieldingWaitStrategy() //等待策略
    8. );
    9. //设置消费者用于处理RingBuffer的事件
    10. //disruptor.handleEventsWith(new OrderEventHandler());
    11. //设置多消费者,消息会被重复消费
    12. //disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
    13. //设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
    14. disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
    15. //启动disruptor
    16. disruptor.start();
    17. //创建ringbuffer容器
    18. RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
    19. new Thread(()->{
    20. //创建生产者
    21. OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
    22. // 发送消息
    23. for(int i=0;i<100;i++){
    24. eventProducer.onData(i,"Fox"+i);
    25. }
    26. },"producer1").start();
    27. new Thread(()->{
    28. //创建生产者
    29. OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
    30. // 发送消息
    31. for(int i=0;i<100;i++){
    32. eventProducer.onData(i,"monkey"+i);
    33. }
    34. },"producer2").start();
  1. //disruptor.shutdown();
  2. }

}

  1. **消费者优先级模式**<br />在配置消费者时,可以通过 .then 方法去实现顺序消费。
  2. ```java
  3. disruptor.handleEventsWith(new OrderEventHandler())
  4. .then(new OrderEventHandler())
  5. .then(new OrderEventHandler());

handleEventsWith 与 handleEventsWithWorkerPool 都是支持 .then

  1. disruptor.handleEventsWith(new OrderEventHandler())
  2. .thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
  3. .then(new OrderEventHandler());