什么是disruptor?

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。
基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。
目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。
说白了disruptor是一款高性能有界内存队列,对标的是ArrayBlockingQueueLinkedBlockingQueue,而不是分布式队列。

Java内置的内存队列

在稳定性和性能要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;Java SDK提供的两个有界队列:ArrayBlockingQueue和LinkedBlockingQueue;

队列 有锁 有界 数据结构
ArrayBlockingQueue ReentrantLock arraylist
LinkedBlockingQueue ReentrantLock linkedlist

链表结构缺点

  • 频繁的插入、删除操作,会导致内存的频繁申请和释放,容易造成内存碎片,Java中容易触发系统GC(Garbage Collection,垃圾回收)机制。故优先使用ArrayBlockingQueue。

ArrayBlockingQueue VS Disruptor性能对比

示例:
ArrayBlockingQueue

  1. public class ArrayBlockingQueue4Test {
  2. public static void main(String[] args) {
  3. // 避免扩容影响性能
  4. final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);
  5. final long startTime = System.currentTimeMillis();
  6. //向容器中添加元素
  7. new Thread(() -> {
  8. long i = 0;
  9. while (i < Constants.EVENT_NUM_OHM) {
  10. Data data = new Data(i, "c" + i);
  11. try {
  12. queue.put(data);
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. i++;
  17. }
  18. }).start();
  19. new Thread(() -> {
  20. int k = 0;
  21. while (k < Constants.EVENT_NUM_OHM) {
  22. try {
  23. queue.take();
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. k++;
  28. }
  29. long endTime = System.currentTimeMillis();
  30. System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");
  31. }).start();
  32. }
  33. }

Disruptor

  1. public class DisruptorSingle4Test {
  2. public static void main(String[] args) {
  3. int ringBufferSize = 65536;
  4. final Disruptor<Data> disruptor = new Disruptor<Data>(
  5. () -> new Data(),
  6. ringBufferSize,
  7. Executors.newSingleThreadExecutor(),
  8. ProducerType.SINGLE,
  9. //new BlockingWaitStrategy()
  10. new YieldingWaitStrategy()
  11. );
  12. DataConsumer consumer = new DataConsumer();
  13. //消费数据
  14. disruptor.handleEventsWith(consumer);
  15. disruptor.start();
  16. new Thread(() -> {
  17. RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
  18. for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {
  19. long seq = ringBuffer.next();
  20. Data data = ringBuffer.get(seq);
  21. data.setId(i);
  22. data.setName("c" + i);
  23. ringBuffer.publish(seq);
  24. }
  25. }).start();
  26. }
  27. // 消费者
  28. public class DataConsumer implements EventHandler<Data> {
  29. private long startTime;
  30. private int i;
  31. public DataConsumer() {
  32. this.startTime = System.currentTimeMillis();
  33. }
  34. @Override
  35. public void onEvent(Data data, long seq, boolean bool) throws Exception {
  36. i++;
  37. if (i == Constants.EVENT_NUM_OHM) {
  38. long endTime = System.currentTimeMillis();
  39. System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");
  40. }
  41. }
  42. }

性能对比:

NUM Disruptor ArrayBlockingQueue
1千万 828ms 2194ms
5千万 3180ms 12422ms
1亿 6375ms 25647ms

ArrayBlockingQueue多线程情况下性能更差,有锁;Disruptor天生是个多生产者、多消费者的模型,高并发情况下更好。更能说明ArrayBlockingQueue与Disruptor差距。

ArrayBlockingQueue存在的问题

  1. 使用ReentrantLock保证线程安全;有锁性能太差
  2. 伪共享,缓存失效问题

�伪共享

共享

如图,计算机三级缓存架构,L1,L2缓存由CPU独占,L3缓存由多个CPU共享,内存所有CPU共享;速度从下至上依次递减,存储大小从下至上一直递增。

当CPU执行运算的时候,它先去L1查找所需的数据、再去L2、然后是L3,如果最后这些缓存中都没有,所需的数据就要去主内存拿。走得越远,运算耗费的时间就越长。

线程之间共享一份数据的时候,需要一个线程把数据写回主存,而另一个线程访问主存中相应的数据。

image.png

速度对比
image.png

缓存行

Cache Line可以简单的理解为CPU Cache中的最小缓存单位。目前主流的CPU Cache的Cache Line大小都是64Bytes。假设我们有一个512字节的一级缓存,那么按照64B的缓存单位大小来算,这个一级缓存所能存放的缓存个数就是512/64 = 8个。
image.png

什么是伪共享

CPU从内存加载数据时,会把相邻的数据缓存到同一个缓存行。例如:加载long型的属性X(8字节)到Line1,Line1会把X邻近56个字节加载到缓存行,提升缓存命中。
但是会出现伪共享的问题,由于多核CPU多级缓存一致性协议MESI,线程修改了其中的一个缓存数据,会导致所在的缓存行失效,影响到X数据的缓存命中,这就是伪共享。

Disruptor设计

  1. 内存分配更加合理,环形数组结构(RingBuffer),内存预加载,初始化时提前一次性创建;对象循环利用,避免频繁GC。
  2. 才用大量的CAS,无锁算法,减少性能消耗。
  3. 解决伪共享的问题,提升缓存利用率。
  4. 支持批量消费,消费者可以无锁方式消费多个消息。

如何解决伪共享?

每个变量独占一个缓存行、不共享缓存行,就是缓存行填充技术。想让X独占一个缓存行,就在其前后各填充56个字节即可。Disruptor很多对象都是利用这种填充技术来避免伪共享。在1.8就可以通过注解@Contended来解决这个问题。

  1. public class Sequence extends RhsPadding {
  2. static final long INITIAL_VALUE = -1L;
  3. private static final Unsafe UNSAFE = Util.getUnsafe();
  4. private static final long VALUE_OFFSET;
  5. ...
  6. }
  7. class RhsPadding extends Value {
  8. protected long p9;
  9. protected long p10;
  10. protected long p11;
  11. protected long p12;
  12. protected long p13;
  13. protected long p14;
  14. protected long p15;
  15. ...
  16. }
  17. class Value extends LhsPadding {
  18. protected volatile long value;
  19. Value() {
  20. }
  21. ...
  22. }
  23. class LhsPadding {
  24. protected long p1;
  25. protected long p2;
  26. protected long p3;
  27. protected long p4;
  28. protected long p5;
  29. protected long p6;
  30. protected long p7;
  31. ...
  32. }

RingBuffer设计

image.png

  1. RingBuffer:环状数组,承载数据的容器。
  2. Event:生产者生产对象
  3. EventHandler:消费者业务处理器,实现接口WorkHandler并实现onEvent方法(具体的业务逻辑)
  4. Sequence:序号,Long类型常量,记录RingBuffer投递下标,生产者消费位置,通过对齐填充消除伪共享。
  5. Sequence Barrier:用来跟踪发布者(publisher)的游标(cursor)和事件处理者(EventProcessor)的序列号(sequence)。
  6. WaitStrategy:生产者、消费者等待策略
  7. WorkProcessor:确保每个sequence只被一个processor消费,在同一个WorkPool中,确保多个WorkProcessor不会消费同样的sequence;

环形数组结构是整个Disruptor的核心所在。

  1. 首先因为是数组,所以要比链表快.
  2. 根据我们对上面缓存行的解释知道,数组中的一个元素加载,相邻的数组元素也是会被预加载的,因此在这样的结构中,cpu无需时不时去主存加载数组中的下一个元素。
  3. 为数组预先分配内存,使得数组对象一直存在(除非程序终止)。这就意味着不需要花大量的时间用于垃圾回收。
  4. 其次结构作为环形,数组的大小为2的n次方,这样元素定位可以通过位运算效率会更高,这个跟一致性哈希中的环形策略有点像。
  5. 其实质只是一个普通的数组,只是当放置数据填充满队列(即到达2^n-1位置)之后,再填充数据,就会从0开始,覆盖之前的数据,于是就相当于一个环。

image.png

Disruptor无锁算法

ArrayBlockingQueue是利用管程实现的,生产、消费操作都需要加锁,实现起来简单,但性能不是十分理想。Disruptor采用的是无锁算法,大量的CAS操作,但核心无非是生产和消费两个操作。最复杂是入队操作,不能把没有消费的数据给覆盖。

SingleProducerSequencer投递数据(入队)核心代码
image.png

  1. public long next(int n) {
  2. //生产者获取n个写入位置
  3. if (n < 1) {
  4. throw new IllegalArgumentException("n must be > 0");
  5. } else {
  6. //类似于入队索引,上次生产到的地方
  7. long nextValue = this.nextValue;
  8. // 获取生产的第n个的位置下标
  9. long nextSequence = nextValue + (long)n;
  10. // 减掉一个循环,来判断是否会覆盖掉未消费的数据
  11. long wrapPoint = nextSequence - (long)this.bufferSize;
  12. // 获取上一次的最小消费位置
  13. long cachedGatingSequence = this.cachedValue;
  14. // 没有足够的空余位置(wrapPoint如果比最小的消费下标大,说明会覆盖)
  15. if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
  16. long minSequence;
  17. // 没有足够的空余位置,挂起
  18. while(wrapPoint > (minSequence = Util.getMinimumSequence(this.gatingSequences, nextValue))) {
  19. LockSupport.parkNanos(1L);
  20. }
  21. // 重新设置最小消费位置
  22. this.cachedValue = minSequence;
  23. }
  24. this.nextValue = nextSequence;
  25. return nextSequence;
  26. }
  27. }

消费者消费逻辑解析:BatchEventProcessor(生产者不能覆盖消费的最小下标)
image.png

  1. while (true)
  2. {
  3. try
  4. {
  5. // 获取可用消息的最大值
  6. final long availableSequence = sequenceBarrier.waitFor(nextSequence);
  7. // 如果当前的位置小于可用的位置,说明有消息可以处理,进行消息处理
  8. while (nextSequence <= availableSequence)
  9. {
  10. event = dataProvider.get(nextSequence);
  11. // 回调
  12. eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
  13. nextSequence++;
  14. }
  15. sequence.set(availableSequence);
  16. }
  17. catch (final TimeoutException e)
  18. {
  19. notifyTimeout(sequence.get());
  20. }
  21. catch (final AlertException ex)
  22. {
  23. if (!running.get())
  24. {
  25. break;
  26. }
  27. }
  28. catch (final Throwable ex)
  29. {
  30. exceptionHandler.handleEventException(ex, nextSequence, event);
  31. sequence.set(nextSequence);
  32. nextSequence++;
  33. }
  34. }

Disruptor实战

disruptor快速使用

  1. 建立一个工厂Event类,用于创建Event类实例对象
  1. public class OrderEvent {
  2. /**
  3. * 订单的价格
  4. */
  5. private long value;
  6. public long getValue() {
  7. return value;
  8. }
  9. public void setValue(long value) {
  10. this.value = value;
  11. }
  12. }
  13. /**
  14. * @author liushiqiao
  15. * @desc 继承 EventFactory工厂
  16. */
  17. public class OrderEventFactory implements EventFactory<OrderEvent> {
  18. /**
  19. * 这个方法就是为了返回空的数据对象(Event)
  20. *
  21. * @return
  22. */
  23. @Override
  24. public OrderEvent newInstance() {
  25. return new OrderEvent();
  26. }
  27. }
  1. 需要有一个监听事件类,用户处理数据(Event类)
  1. /**
  2. * @author liushiqiao
  3. * 消费者对象
  4. */
  5. public class OrderEventHandler implements EventHandler<OrderEvent> {
  6. @Override
  7. public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
  8. Thread.sleep(Integer.MAX_VALUE);
  9. System.err.println("消费者: " + event.getValue());
  10. }
  11. }
  1. 实例化Disruptor实例,配置参数,编写Disruptor核心组件
  1. /**
  2. * @author liushiqiao
  3. */
  4. public class Main {
  5. public static void main(String[] args) {
  6. // 参数准备工作
  7. OrderEventFactory orderEventFactory = new OrderEventFactory();
  8. int ringBufferSize = 4;
  9. ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  10. /**
  11. * 1 eventFactory: 消息(event)工厂对象
  12. * 2 ringBufferSize: 容器的长度
  13. * 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
  14. * 4 ProducerType: 单生产者 还是 多生产者
  15. * 5 waitStrategy: 等待策略
  16. */
  17. //1. 实例化disruptor对象
  18. Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
  19. ringBufferSize,
  20. executor,
  21. ProducerType.SINGLE,
  22. new BlockingWaitStrategy());
  23. //2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
  24. disruptor.handleEventsWith(new OrderEventHandler());
  25. //3. 启动disruptor
  26. disruptor.start();
  27. //4. 获取实际存储数据的容器: RingBuffer
  28. RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
  29. OrderEventProducer producer = new OrderEventProducer(ringBuffer);
  30. ByteBuffer bb = ByteBuffer.allocate(8);
  31. for (long i = 0; i < 5; i++) {
  32. bb.putLong(0, i);
  33. producer.sendData(bb);
  34. }
  35. disruptor.shutdown();
  36. executor.shutdown();
  37. }
  38. }
  1. 编写生产者组件,向Disruptor容器投递数据
  1. /**
  2. * @author liushiqiao
  3. * 生产者
  4. */
  5. public class OrderEventProducer {
  6. private RingBuffer<OrderEvent> ringBuffer;
  7. public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
  8. this.ringBuffer = ringBuffer;
  9. }
  10. public void sendData(ByteBuffer data) {
  11. //1 在生产者发送消息的时候, 首先 需要从我们的ringBuffer里面 获取一个可用的序号
  12. long sequence = ringBuffer.next();
  13. try {
  14. //2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
  15. OrderEvent event = ringBuffer.get(sequence);
  16. //3 进行实际的赋值处理
  17. event.setValue(data.getLong(0));
  18. } finally {
  19. //4 提交发布操作
  20. ringBuffer.publish(sequence);
  21. }
  22. }
  23. }

串行操作

  1. //2 把消费者设置到Disruptor中 handleEventsWith
  2. //2.1 串行操作:
  3. disruptor
  4. .handleEventsWith(new Handler1())
  5. .handleEventsWith(new Handler2())
  6. .handleEventsWith(new Handler3());

并行操作: 可以有两种方式去进行

  1. disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
  2. // 分别调用
  3. disruptor.handleEventsWith(new Handler2());
  4. disruptor.handleEventsWith(new Handler3());

�菱形操作

  1. disruptor.handleEventsWith(new Handler1(), new Handler2())
  2. .handleEventsWith(new Handler3());
  3. EventHandlerGroup<Trade> ehGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
  4. ehGroup.then(new Handler3());

�六边形操作

  1. Handler1 h1 = new Handler1();
  2. Handler2 h2 = new Handler2();
  3. Handler3 h3 = new Handler3();
  4. Handler4 h4 = new Handler4();
  5. Handler5 h5 = new Handler5();
  6. disruptor.handleEventsWith(h1, h4);
  7. disruptor.after(h1).handleEventsWith(h2);
  8. disruptor.after(h4).handleEventsWith(h5);
  9. disruptor.after(h2, h5).handleEventsWith(h3);

消费者常见的等待策略

BusySpinWaitStrategy : 自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。
BlockingWaitStrategy : 使用锁和条件变量。CPU资源的占用少,延迟大,默认等待策略。
SleepingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。
YieldingWaitStrategy : 在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。
PhasedBackoffWaitStrategy : 上面多种策略的综合,CPU资源的占用少,延迟大

高并发下使用场景

缓存架构
image.png
缓存不一致场景
image.png

  1. 线程一:读取数据,缓存不存在(淘汰),读取数据库(旧值)
  2. 线程二:写入数据库,更成功更新缓存
  3. 线程一:把读取的旧值更新到缓存,导致数据库、缓存不一致

解决方案

内存队列异步读写串行化
image.png

问题:内存队列数据慢了,阻塞,系统崩溃了都会导致数据异常。

最终解决方案
缓存架构设计.png

1、根据数据库时间戳当写入版本号,避免覆盖问题
2、通过业务id,hash到某个队列上,顺序消费
3、避免吞吐量性能问题,一个队列使用一个线程消费,可以使用内存队列多线程消费
4、避免内存队列数据丢失问题使用手动ACK机制,内存队列消费完后手动ACK
5、如果更新失败,丢到重试队列,防止阻塞
6、重试队列一直失败,丢到死信队列中,人工接入
7、使用时间戳保证重复消费幂等信
[

](https://shiqiao-study.yuque.com/liushiqiao/rs4y4y/lhwszw)