一、Disruptor并发框架

1 介绍

Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。 在使用之前,首先说明disruptor主要功能加以说明,你可以理解为他是一种高效的”生产者-消费者”模型。也就性能远远高于传统的BlockingQueue容器。 官方学习网站:http://ifeve.com/disruptor-getting-started/

2 关键知识点

RingBuffer**(储存结构,环形数组): **

被看作Disruptor最主要的组件,然而从3.0开始RingBuffer仅仅负责存储和更新在Disruptor中流通的数据。对一些特殊的使用场景能够被用户(使用其他数据结构)完全替代。

Sequence**(序号,下标,或者说槽)**:

Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这些Sequence值的运转,因此Sequence支持多种当前为AtomicLong类的特性。

Sequencer(序号生成器)**:**

这是Disruptor真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递。

SequenceBarrier**(序号栅栏): **

由Sequencer生成,并且包含了已经发布的Sequence的引用,这些的Sequence源于Sequencer和一些独立的消费者的Sequence。它包含了决定是否有供消费者来消费的Event的逻辑,必须在消费者和生产者互相关联后才能起作用。

WaitStrategy(等待策略):

决定一个消费者将如何等待生产者将Event置入Disruptor。

Event**(事件):**

从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因为它完全是由用户定义的。

EventProcessor**(事件处理器):**

主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象。

EventHandler**(消费者,或者说处理方法):**

由用户实现并且代表了Disruptor中的一个消费者的接口。

Producer**(事件发布器):**

由用户实现,它调用RingBuffer来插入事件(Event),在Disruptor中没有相应的实现代码,由用户实现。

WorkProcessor**(自带消费者):**

确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence。

WorkerPool**(自带事件处理器):**

一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker吃间移交

LifecycleAware(时间处理时的通知):

当BatchEventProcessor启动和停止时,于实现这个接口用于接收通知。

3 实现方法

1)RingBuffer+Disruptor(单线程)

JAVA的并发编程(八):Disruptor并发框架 - 图1image.gif
在Disruptor中,我们想实现hello world 需要如下几步骤:
(1) 建立一个Event

  1. public class LongEvent {
  2. private long value;
  3. public long getValue() {
  4. return value;
  5. }
  6. public void setValue(long value) {
  7. this.value = value;
  8. }
  9. }

image.gif
(2) 建立一个工厂Event类,用于创建Event类实例对象,实现EventFactory接口

  1. public class LongEventFactory implements EventFactory {
  2. @Override
  3. public Object newInstance() {
  4. return new LongEvent();
  5. }
  6. }

image.gif
(3) 需要有一个监听事件类,或者说消费者,实现EventHandler用于处理数据(Event类)

  1. public class LongEventHandler implements EventHandler<LongEvent> {
  2. @Override
  3. public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
  4. System.out.println(longEvent.getValue());
  5. }
  6. }

image.gif
(4) Disruptor需要一个事件发布类用来发布事件(自定义),发布事件分为两步
首先从RingBuffer获取sequence槽
然后调用RingBuffer的get方法new一个event并设值,完成事件的发布或者说生产

  1. public class LongEventProducer {
  2. private final RingBuffer<LongEvent> ringBuffer;
  3. public LongEventProducer(RingBuffer<LongEvent> ringBuffer){
  4. this.ringBuffer = ringBuffer;
  5. }
  6. /**
  7. * onData用来发布事件,每调用一次就发布一次事件
  8. * 它的参数会用过事件传递给消费者
  9. */
  10. public void onData(ByteBuffer bb){//ByteBuffer可以理解为LongEvent,只是一种性能的优化
  11. //1.可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
  12. long sequence = ringBuffer.next();
  13. try {
  14. //2.用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
  15. LongEvent event = ringBuffer.get(sequence);
  16. //3.获取要通过事件传递的业务数据
  17. event.setValue(bb.getLong(0));
  18. } finally {
  19. //4.发布事件
  20. //注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
  21. ringBuffer.publish(sequence);//发布的是下标,或者说槽
  22. }
  23. }
  24. }

image.gif
Disruptor 3.0提供了lambda式的API。这样可以把一些复杂的操作放在Ring Buffer,所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator来发布事件

  1. public class LongEventProducerWithTranslator {
  2. private final RingBuffer<LongEvent> ringBuffer;
  3. //一个translator可以看做一个事件初始化器,publicEvent方法会调用它
  4. //填充Event
  5. private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
  6. new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
  7. @Override
  8. public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {
  9. event.setValue(buffer.getLong(0));
  10. }
  11. };
  12. public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
  13. this.ringBuffer = ringBuffer;
  14. }
  15. public void onData(ByteBuffer buffer){
  16. //直接调用发布方法就可以了,action写在translator里
  17. ringBuffer.publishEvent(TRANSLATOR, buffer);
  18. }
  19. }

image.gif
(5) 我们需要进行测试代码编写。实例化Disruptor实例,配置一系列参数。然后我们对Disruptor实例绑定监听事件类,接受并处理数据。

  1. Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());

image.gif

  • 第一个参数工厂类对象,用于创建event
  • 第二个参数缓冲区大小
  • 第三个参数线程池, 进行Disruptor内部的数据处理调度
  • 第四个参数ProducerType.SINGLE和ProducerType.MULTI 生产者个数
  • 第五个参数生产策略

Disruptor有三种生产策略
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();

BlockingWaitStrategy 是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现

WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();

SleepingWaitStrategy 的性能表现跟BlockingWaitStrategy差不多,对CPU的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景

WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

YieldingWaitStrategy 的性能是最好的,适合用于低延迟的系统。在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性

  1. public class LongEventMain {
  2. public static void main(String[] args) throws Exception {
  3. //创建缓冲池
  4. ExecutorService executor = Executors.newCachedThreadPool();
  5. //创建工厂
  6. LongEventFactory factory = new LongEventFactory();
  7. //创建bufferSize ,也就是RingBuffer大小,必须是2的N次方
  8. int ringBufferSize = 1024 * 1024; //
  9. //创建disrupor消息处理对象
  10. Disruptor<LongEvent> disruptor =
  11. new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
  12. // 连接消费事件方法
  13. disruptor.handleEventsWith(new LongEventHandler());
  14. // 启动
  15. disruptor.start();
  16. //Disruptor 的事件发布过程是一个两阶段提交的过程:
  17. //发布事件
  18. //使用该方法获取具体存放数据的容器RingBuffer(环形结构)
  19. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
  20. LongEventProducer producer = new LongEventProducer(ringBuffer);
  21. //生成事件发布器
  22. //LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
  23. ByteBuffer byteBuffer = ByteBuffer.allocate(8);
  24. //将数据插入LongEventProducer生产者内的RingBuffer容器内
  25. for(long l = 0; l<100; l++){
  26. byteBuffer.putLong(0, l);
  27. producer.onData(byteBuffer);//发布事件++
  28. //Thread.sleep(1000);
  29. }
  30. disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理;
  31. executor.shutdown();//关闭 disruptor 使用的线程池;如果需要的话,必须手动关闭, disruptor 在 shutdown 时不会自动关闭;
  32. }
  33. }

image.gif

2)RingBuffer+Squencebarrier+BatchEventprocessor(多线程 - 单生产者 - 多消费者)

JAVA的并发编程(八):Disruptor并发框架 - 图10image.gif
A. 创建带事件发布器的RingBuffer,指定event工厂,handler,缓冲区,拒绝策略
B. 生成序列屏障SequenceBarrier 平衡性能
C. 创建消息处理器processor与生产者关联(实现了Runnable接口)
D. 把消息处理器提交到线程池执行
E. 最终利用future模式来快速发布任务,处理数据
F. 释放资源

不了解Future模式的可以看我之前的笔记JAVA的并发编程(六): 多线程的设计模式

  1. public class Main1 {
  2. public static void main(String[] args) throws Exception {
  3. int BUFFER_SIZE=1024; //缓冲区大小
  4. int THREAD_NUMBERS=4; //线程数
  5. /**生成带单个事件发布器的RingBuffer
  6. * 内嵌eventfactory事件工厂
  7. * 设置RingBuffer缓冲区大小
  8. * 配置event生成策略
  9. */
  10. final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {
  11. @Override
  12. public Trade newInstance() {
  13. return new Trade();
  14. }
  15. }, BUFFER_SIZE, new YieldingWaitStrategy());
  16. //创建生成事件的线程池
  17. ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);
  18. //创建SequenceBarrier(序列屏障,平衡生产者和消费者的行为)
  19. SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
  20. //创建消息处理器(传入RingBuffer,SequenceBarrier序列屏障,Handler消费方法)
  21. BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(
  22. ringBuffer, sequenceBarrier, new TradeHandler());
  23. //这一步的目的就是把消费者的位置信息引用注入到生产者
  24. //如果只有一个消费者的情况可以使用单数方法addGatingSequence
  25. ringBuffer.addGatingSequences(transProcessor.getSequence());
  26. //把消息处理器提交到线程池执行
  27. executors.submit(transProcessor);
  28. //早期的2.0API 如果存在多个消费者 那重复执行上面3行代码 把TradeHandler换成其它消费者类
  29. //利用 future模式来给RingBuffer中提交任务,也可以直接使用for循环
  30. Future<?> future= executors.submit(new Callable<Void>() {
  31. @Override
  32. public Void call() throws Exception {
  33. long seq;
  34. for(int i=0;i<10;i++){
  35. seq = ringBuffer.next();//占个坑 --ringBuffer一个可用区块
  36. ringBuffer.get(seq).setPrice(Math.random()*9999);//给这个区块放入 数据
  37. ringBuffer.publish(seq);//发布这个区块的数据使handler(consumer)可见
  38. }
  39. return null;
  40. }
  41. });
  42. future.get();//future等待生产者结束
  43. Thread.sleep(1000);//等上5秒,等消费都处理完成(其实在生产的时候就直接被消费的,会马上打印出结果)
  44. transProcessor.halt();//通知事件(或者说消息)处理器 可以结束了(并不是马上结束!!!)
  45. executors.shutdown();//终止线程池
  46. }
  47. }

image.gif

3)RingBuffer+Squencebarrier+WorkPool(多线程 - 单生产者 - 多消费者)

JAVA的并发编程(八):Disruptor并发框架 - 图13image.gif
A. 创建RingBuffer,指定event生产模式,event工厂,缓冲区 ,拒绝策略 (没有handler)
B. 生成序列屏障平衡性能
C. 创建3个WorkHanler消费类
D. 创建WorkPool,指定RingBuffer,屏障SequenceBarrier ,IgnoreExceptionHandler异常处理,WorkHanler
E. RingBuffer调用addGatingSequences传入WorkPool,关联生产者和消费者以便性能调控
E. 把ServiceExcutor线程池传入WorkPool后start
F. 利用CountDownLatch来同时发布任务
H. 释放资源

  1. public class Main2 {
  2. public static void main(String[] args) throws InterruptedException {
  3. int BUFFER_SIZE=1024;
  4. int THREAD_NUMBERS=4;
  5. //创建事件工厂
  6. EventFactory<Trade> eventFactory = new EventFactory<Trade>() {
  7. public Trade newInstance() {
  8. return new Trade();
  9. }
  10. };
  11. //创建带事件发布器的ringbuffer,指定事件工厂,缓冲区大小,不带拒绝策略
  12. RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);
  13. //创建序列或者说下标屏障,利用算法调控消费者和生产者的行为
  14. SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
  15. //创建线程池
  16. ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);
  17. //创建消费方法
  18. WorkHandler<Trade> handler = new TradeHandler();
  19. //原本应该创建消息处理器,这里直接创建了工作连接池子,
  20. WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);
  21. workerPool.start(executor);
  22. //下面这个生产8个数据
  23. for(int i=0;i<8;i++){
  24. long seq=ringBuffer.next();
  25. ringBuffer.get(seq).setPrice(Math.random()*9999);
  26. ringBuffer.publish(seq);
  27. }
  28. Thread.sleep(1000);
  29. workerPool.halt();
  30. executor.shutdown();
  31. }
  32. }

image.gif

4)RingBuffer+EventHandlerGroup(多线程 - 多生产者 - 多消费者)

JAVA的并发编程(八):Disruptor并发框架 - 图16image.gif
A. 创建带事件发布器的RingBuffer,指定event工厂,缓冲区 (没有handler和拒绝策略)
B. 生成序列屏障平衡性能
C. 创建WorkHanler消费类
D. 创建WorkPool,指定RingBuffer,屏障SequenceBarrier ,IgnoreExceptionHandler异常处理,WorkHanler
E. 把ServiceExcutor线程池传入WorkPool后start
F. 最终利用future模式来快速发布任务,处理数据
G. 最终发布任务(用future模式的话会更快,但是用主线程的for循环也可以)
H. 释放资源

  1. public class Main {
  2. public static void main(String[] args) throws Exception {
  3. //创建ringBuffer,生产模式为multi
  4. RingBuffer<Order> ringBuffer =
  5. RingBuffer.create(ProducerType.MULTI,
  6. new EventFactory<Order>() {
  7. @Override
  8. public Order newInstance() {
  9. return new Order();
  10. }
  11. },
  12. 1024 * 1024,
  13. new YieldingWaitStrategy());
  14. //下标栅栏
  15. SequenceBarrier barriers = ringBuffer.newBarrier();
  16. //创建3个消费方法,或者说消费者
  17. Consumer[] consumers = new Consumer[3];
  18. for(int i = 0; i < consumers.length; i++){
  19. consumers[i] = new Consumer("c" + i);
  20. }
  21. //创建工作线程池
  22. WorkerPool<Order> workerPool =
  23. new WorkerPool<Order>(ringBuffer,
  24. barriers,
  25. new IntEventExceptionHandler(),
  26. consumers);
  27. //链接消费者和生成者方便控制性能
  28. ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
  29. //传入线程池启动workerpool
  30. workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
  31. //利用contdownlatch,实例化100个Producer完毕后统一发布事件,合计100*100个事件
  32. final CountDownLatch latch = new CountDownLatch(1);
  33. for (int i = 0; i < 100; i++) {
  34. final Producer p = new Producer(ringBuffer);
  35. new Thread(new Runnable() {
  36. @Override
  37. public void run() {
  38. try {
  39. latch.await();
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. for(int j = 0; j < 100; j ++){
  44. p.onData(UUID.randomUUID().toString());
  45. }
  46. }
  47. }).start();
  48. }
  49. Thread.sleep(2000);
  50. System.out.println("---------------开始生产-----------------");
  51. latch.countDown();
  52. Thread.sleep(5000);
  53. System.out.println("总数:" + consumers[0].getCount() );
  54. }
  55. static class IntEventExceptionHandler implements ExceptionHandler {
  56. public void handleEventException(Throwable ex, long sequence, Object event) {}
  57. public void handleOnStartException(Throwable ex) {}
  58. public void handleOnShutdownException(Throwable ex) {}
  59. }
  60. }

image.gif

5)Disruptor+EventHandlerGroup(多线程 - 单生产者 - 菱形消费)

JAVA的并发编程(八):Disruptor并发框架 - 图19image.gif
A. 创建一个Disruptor,指定event工厂,缓冲区,线程池,生产模式,消费策略
B. 使用di’sruptor创建消费者组EventHandlerGroup,调用hanlderEventsWith(不定参数)传入消费者1和消费者2
C.调用组方法then(不定参数)传入消费者3
D. 启动disruptor
E. 通过线程池新建EventTranslator事件发布器,利用CountDownLanch同步线程
H. 释放资源
不了解CountDownLanch的可以看我之前的笔记JAVA的并发编程(四): 线程的通信

消费者1:给name赋值

  1. public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> {
  2. @Override
  3. public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  4. this.onEvent(event);
  5. }
  6. @Override
  7. public void onEvent(Trade event) throws Exception {
  8. System.out.println("handler1: set name");
  9. event.setName("h1");
  10. Thread.sleep(1000);
  11. }
  12. }

image.gif
消费者2:给price赋值

  1. public class Handler2 implements EventHandler<Trade> {
  2. @Override
  3. public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  4. System.out.println("handler2: set price");
  5. event.setPrice(17.0);
  6. Thread.sleep(1000);
  7. }
  8. }

image.gif
消费者3:打印event实体

  1. public class Handler3 implements EventHandler<Trade> {
  2. @Override
  3. public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  4. System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + "; instance: " + event.toString());
  5. }
  6. }

image.gif
事件发布器TradePublisher:导入CountDownLanch和EventTranslator

  1. public class TradePublisher implements Runnable {
  2. Disruptor<Trade> disruptor;
  3. private CountDownLatch latch;
  4. private static int LOOP=3;//模拟百万次交易的发生
  5. public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {
  6. this.disruptor=disruptor;
  7. this.latch=latch;
  8. }
  9. @Override
  10. public void run() {
  11. TradeEventTranslator tradeTransloator = new TradeEventTranslator();
  12. for(int i=0;i<LOOP;i++){
  13. disruptor.publishEvent(tradeTransloator);
  14. }
  15. latch.countDown();
  16. }
  17. }
  18. class TradeEventTranslator implements EventTranslator<Trade>{
  19. private Random random=new Random();
  20. @Override
  21. public void translateTo(Trade event, long sequence) {
  22. this.generateTrade(event);
  23. }
  24. private Trade generateTrade(Trade trade){
  25. trade.setPrice(random.nextDouble()*9999);
  26. return trade;
  27. }
  28. }

image.gif
主程序Main

  1. public class Main {
  2. public static void main(String[] args) throws InterruptedException {
  3. long beginTime=System.currentTimeMillis();
  4. int bufferSize=1024;
  5. ExecutorService executor=Executors.newFixedThreadPool(8);
  6. Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {
  7. @Override
  8. public Trade newInstance() {
  9. return new Trade();
  10. }
  11. }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
  12. //菱形操作
  13. //使用disruptor创建消费者组C1,C2
  14. EventHandlerGroup<Trade> handlerGroup =
  15. disruptor.handleEventsWith(new Handler1(), new Handler2());
  16. //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3
  17. handlerGroup.then(new Handler3());
  18. disruptor.start();//启动
  19. CountDownLatch latch=new CountDownLatch(1);
  20. //生产者准备
  21. executor.submit(new TradePublisher(latch, disruptor));
  22. latch.await();//等待生产者完事.
  23. disruptor.shutdown();
  24. executor.shutdown();
  25. System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));
  26. }
  27. }

image.gif

6)Disruptor多消费者消费流程(多线程 - 单生产者 - 多消费者)

(1) 顺序执行
JAVA的并发编程(八):Disruptor并发框架 - 图26image.gif

  1. disruptor.handleEventsWith(new Handler1()).
  2. handleEventsWith(new Handler2()).
  3. handleEventsWith(new Handler3());

image.gif
(2) 六边形
JAVA的并发编程(八):Disruptor并发框架 - 图29image.gif

  1. Handler1 h1 = new Handler1();
  2. Handler2 h2 = new Handler2();
  3. Handler3 h3 = new Handler3();
  4. Handler4 h4 = new Handler4();
  5. Handler5 h5 = new Handler5();
  6. disruptor.handleEventsWith(h1, h2);
  7. disruptor.after(h1).handleEventsWith(h4);
  8. disruptor.after(h2).handleEventsWith(h5);
  9. disruptor.after(h4, h5).handleEventsWith(h3);

image.gif
(3) 其他
由于使用了函数式编程和不定参数,用户可以自由定义消费流程,这里不再一 一列举。
使用RingBuffer可以实现简单的1对多,多对多的消费流程
使用Disruptor可以实现几乎所有种类的消费流程