一、Callable&Future&FutureTask

直接继承Thread或者实现Runnable接口都可以创建线程,但是这两种方法都有一个问题就是:没有返回值,也就是不能获取执行完的结果。因此java1.5就提供了Callable接口来实现这一场景,而Future和FutureTask就可以和Callable接口配合起来使用。

1、Callable和Runnable

  1. // Callable 的接口
  2. @FunctionalInterface
  3. public interface Callable<V> {
  4. V call() throws Exception;
  5. }
  6. // Runnable 的接口
  7. @FunctionalInterface
  8. public interface Runnable {
  9. public abstract void run();
  10. }

Runnable 的run()方法是没有返回值且不抛异常的。
Callable 配合的有一个 Future 类, 通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 强大。

2、Future 的主要功能

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。 必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
boolean cancel (boolean mayInterruptIfRunning) 取消任务的执行。参数指定是否立即中断任务执行,或者等等任务结束。
boolean isCancelled () 任务是否已经取消,任务正常完成前将其取消,则返回 true。
boolean isDone () 任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true。
V get () throws InterruptedException, ExecutionException 等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出 CancellationException。
V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一样,多了设置超时时间。 参数timeout指定超时时间,uint指定时间的单位,在枚举类TimeUnit中有相关的定义。如果计算超时,将抛出TimeoutException。
FutureTask实现了Future,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。
FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。

3、FutureTask的使用

把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果。
get()方法是Future接口提供的方法,这个方法在获取到结果之前是阻塞的。
FutureTask 实现了Runnable接口,所以可以传入线程作为入参。

  1. FutureTask task = new FutureTask(new Callable() {
  2. @Override
  3. public Object call() throws Exception {
  4. log.debug("通过Callable方式执行任务");
  5. Thread.sleep(3000);
  6. return "返回任务结果";
  7. }
  8. });
  9. new Thread(task).start();
  10. log.debug("结果:{}", task.get());

1.png

4、Future的局限性

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成, 并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制。
并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法。如果使用了多个线程获取多个结果,当第一个线程没有获取到结果(get()方法没有返回)时主线程会一直阻塞在这里,后面的线程即使已经返回了结果,也不会执行到。
无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力。
无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的。
没有异常处理:Future接口中没有关于异常处理的方法。
使用Callable 和Future 时并没有创建线程,仅是执行任务,java中创建线程只有一种方法就是new Thread,包括线程池创建线程也是new Thread,这种设计把线程和任务完全解耦。

二、CompletionService

Callable+Future 可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。而CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。

1、CompletionService原理

内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的 take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。

2、使用案例

把执行完成的任务保存到一个阻塞队列中,先执行完的任务先入队,不会存在某个任务执行时间过长导致整个流程阻塞的情况。获取任务结果直接从阻塞队列中take()任务再get()结果。

  1. public class CompletionServiceDemo {
  2. public static void main(String[] args) throws InterruptedException, ExecutionException {
  3. //创建线程池
  4. ExecutorService executor = Executors.newFixedThreadPool(10);
  5. //创建CompletionService
  6. CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
  7. //异步向电商S1询价
  8. cs.submit(() -> getPriceByS1());
  9. //异步向电商S2询价
  10. cs.submit(() -> getPriceByS2());
  11. //异步向电商S3询价
  12. cs.submit(() -> getPriceByS3());
  13. //将询价结果异步保存到数据库
  14. for (int i = 0; i < 3; i++) {
  15. //从阻塞队列获取futureTask
  16. Integer r = cs.take().get();
  17. executor.execute(() -> save(r));
  18. }
  19. executor.shutdown();
  20. }
  21. private static void save(Integer r) {
  22. log.debug("保存询价结果:{}",r);
  23. }
  24. private static Integer getPriceByS1() throws InterruptedException {
  25. TimeUnit.MILLISECONDS.sleep(5000);
  26. log.debug("电商S1询价信息1200");
  27. return 1200;
  28. }
  29. private static Integer getPriceByS2() throws InterruptedException {
  30. TimeUnit.MILLISECONDS.sleep(8000);
  31. log.debug("电商S2询价信息1000");
  32. return 1000;
  33. }
  34. private static Integer getPriceByS3() throws InterruptedException {
  35. TimeUnit.MILLISECONDS.sleep(3000);
  36. log.debug("电商S3询价信息800");
  37. return 800;
  38. }
  39. }

三、CompletableFuture

简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。
CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接 口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是, CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
2.png
CompletionStage接口:执行某一个阶段,可向下执行后续阶段。异步执行,默认线程池是ForkJoinPool.commonPool(),这个线程池是全局的,如果多处使用,会导致线程不够用的情况,所以再使用这个API时,需要自己传入线程池。

1、创建异步操作

CompletableFuture 提供了四个静态方法来创建一个异步操作

  1. public static CompletableFuture<Void> runAsync(Runnable runnable)
  2. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  3. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  4. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync 方法以Runnable函数式接口类型为参数,没有返回结果。
supplyAsync 方法Supplier函数式接口类型为参数,返回结果类型为U;Supplier 接口的 get() 方法是有返回值的(会阻塞)。
没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

2、runAsync&supplyAsync

  1. // runAsync
  2. Runnable runnable = () -> System.out.println("执行无返回结果的异步任务");
  3. CompletableFuture.runAsync(runnable);
  4. // supplyAsync
  5. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  6. System.out.println("执行有返回值的异步任务");
  7. try {
  8. Thread.sleep(5000);
  9. } catch (InterruptedException e) {
  10. e.printStackTrace();
  11. }
  12. return "Hello World";
  13. });
  14. // String result = future.get();
  15. String result = future.join();

在CompletableFuture中,runAsync和supplyAsync 的内部类中实现了run()方法,以supplyAsync为例

  1. public void run() {
  2. CompletableFuture<T> d; Supplier<T> f;
  3. if ((d = dep) != null && (f = fn) != null) {
  4. dep = null; fn = null;
  5. if (d.result == null) {
  6. try {
  7. d.completeValue(f.get());
  8. } catch (Throwable ex) {
  9. d.completeThrowable(ex);
  10. }
  11. }
  12. d.postComplete();
  13. }
  14. }

在run()方法中,调用了作为参数的函数 Supplier f 的get()方法,开始调用函数的逻辑。当函数返回结果之后,会通过CAS修改result属性的值。

3、获取结果 join&get

join()和get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是 uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)。

4、结果处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。

  1. public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
  2. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  3. public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,
  4. Executor executor)
  5. public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

Action的类型是BiConsumer,它可以处理正常的计算结果,或者异常情况。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。

5、whenComplete&exceptionally

  1. CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  2. try {
  3. TimeUnit.SECONDS.sleep(1);
  4. } catch (InterruptedException e) {
  5. }
  6. if (new Random().nextInt(10) % 2 == 0) {
  7. int i = 12 / 0;
  8. }
  9. System.out.println("执行结束!");
  10. return "test";
  11. });
  12. // 此方法会消费掉结果,这个方法在异常处理后也会调用,因为异常也算处理完毕
  13. // 异常处理完毕再调用这个方法就拿不到结果了
  14. future.whenComplete(new BiConsumer<String, Throwable>() {
  15. @Override
  16. public void accept(String t, Throwable action) {
  17. System.out.println(t+" 执行完成!");
  18. }
  19. });
  20. // 异常处理
  21. future.exceptionally(new Function<Throwable, String>() {
  22. @Override
  23. public String apply(Throwable t) {
  24. System.out.println("执行失败:" + t.getMessage());
  25. return "异常xxxx";
  26. }
  27. }).join();

四、Disruptor

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。
Disruptor实现了队列的功能并且是一个有界队列,可以用于生产者-消费者模型。
juc下的队列大部分采用加ReentrantLock锁方式保证线程安全。在稳定性要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列。
加锁的方式通常会严重影响性能。线程会因为竞争不到锁而被挂起,等待其他线程释放锁而唤醒,这个过程存在很大的开销,而且存在死锁的隐患。
有界队列通常采用数组实现。但是采用数组实现又会引发另外一个问题false sharing(伪共享)。

1、Disruptor的设计方案

环形数组结构:为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)。
元素位置定位: 数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。
无锁设计: 每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
利用缓存行填充解决了伪共享的问题,填充7个long类型。
实现了基于事件驱动的生产者消费者模型(观察者模式)。
消费者时刻关注着队列里有没有消息,一旦有新消息产生,消费者线程就会立刻把它消费。

2、RingBuffer数据结构

使用RingBuffer来作为队列的数据结构,RingBuffer就是一个可自定义大小的环形数组。除数组外还有一个序列号(sequence),用以指向下一个可用的元素,供生产者与消费者使用。原理图如下所示:
3.png Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)。
当所有位置都放满了,再放下一个时,就会把0号位置覆盖掉。
当需要覆盖数据时,会执行一个策略,Disruptor给提供多种策略,比较常用的策略:
BlockingWaitStrategy策略:常见且默认的等待策略,当这个队列里满了,不执行覆盖,而是阻塞等待。使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景。
YieldingWaitStrategy策略:这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略。
BusySpinWaitStrategy策略:采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用。

3、一个生产者单线程写数据的流程

1)申请写入m个元素
2)若是有m个元素可以写入,则返回最大的序列号。这里主要判断是否会覆盖未读的元素。
3)若是返回的正确,则生产者开始写入元素。
4.png

4、多个生产者写数据的流程

多个生产者的情况下,会遇到“如何防止多个线程重复写同一个元素”的问题。Disruptor的解决方法是每个线程获取不同的一段数组空间进行操作。这个通过CAS很容易达到。只需要在分配元素的时候,通过CAS判断一下这段空间是否已经分配出去即可。
但是会遇到一个新问题:如何防止读取的时候,读到还未写的元素。Disruptor在多个生产者的情况下,引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪。
1)申请写入m个元素
2)若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间
3)生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的。
如下图所示,Writer1和Writer2两个线程写入数组,都申请可写的数组空间。Writer1被分配了下标3到下表5的空间,Writer2被分配了下标6到下标9的空间。Writer1写入下标3位置的元素, 同时把available Buffer相应位置置位,标记已经写入成功,往后移一位,开始写下标4位置的元素。Writer2同样的方式。最终都写入完成。
6.png

5、消费者读数据

生产者多线程写入的情况下读数据会复杂很多。
1)申请读取到序号n
2)若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置。
3)消费者读取元素。
如下图所示,读线程读到下标为2的元素,三个线程Writer1/Writer2/Writer3正在向RingBuffer 相应位置写数据,写线程被分配到的最大元素下标是11。读线程申请读取到下标从3到11的元素,判断writer cursor>=11。然后开始读取availableBuffer,从3开始,往后读取,发现下标为7的元素没有生产成功,于是WaitFor(11)返回6。然后,消费者读取下标从3到6共计4个元素。

5.png6、Disruptor核心概念

RingBuffer(环形缓冲区):基于数组的内存级别缓存,是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口。
Disruptor(总体执行入口):对RingBuffer的封装,持有RingBuffer、消费者线程池 Executor、消费之集合ConsumerRepository等引用。
Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。
Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、 MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之 间快速、正确传递数据的并发算法。
SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进 Disruptor,WaitStrategy有多种实现策略。
EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了 Disruptor中的一个消费者的接口。
EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环, 处理Event,拥有消费者的Sequence。

7、Disruptor构造器

  1. public class Disruptor<T> {
  2. private final RingBuffer<T> ringBuffer;
  3. private final Executor executor;
  4. private final ConsumerRepository<T> consumerRepository;
  5. private final AtomicBoolean started;
  6. private ExceptionHandler<? super T> exceptionHandler;

EventFactory:创建事件(任务)的工厂类。
ringBufferSize:容器的长度。
ThreadFactory:用于创建执行任务的线程。
ProductType:生产者类型:单生产者、多生产者。
WaitStrategy:等待策略。

8、Disruptor的使用

引入依赖

  1. <dependency>
  2. <groupId>com.lmax</groupId>
  3. <artifactId>disruptor</artifactId>
  4. <version>3.3.4</version>
  5. </dependency>

1)单生产者单消费者模式

创建Event(消息载体/事件)和EventFactory(事件工厂),创建 OrderEvent 类,这个类将会被放入环形队列中作为消息内容。
创建 OrderEventFactory类,用于创建OrderEvent事件。

  1. @Data
  2. public class OrderEvent {
  3. private long value;
  4. private String name;
  5. }
  6. public class OrderEventFactory implements EventFactory<OrderEvent> {
  7. @Override
  8. public OrderEvent newInstance() {
  9. return new OrderEvent();
  10. }
  11. }

创建消息(事件)生产者,创建 OrderEventProducer 类,它将作为生产者使用。

  1. public class OrderEventProducer {
  2. //事件队列
  3. private RingBuffer<OrderEvent> ringBuffer;
  4. public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
  5. this.ringBuffer = ringBuffer;
  6. }
  7. public void onData(long value,String name) {
  8. // 获取事件队列 的下一个槽
  9. long sequence = ringBuffer.next();
  10. try {
  11. //获取消息(事件)
  12. OrderEvent orderEvent = ringBuffer.get(sequence);
  13. // 写入消息数据
  14. orderEvent.setValue(value);
  15. orderEvent.setName(name);
  16. } catch (Exception e) {
  17. // TODO 异常处理
  18. e.printStackTrace();
  19. } finally {
  20. System.out.println("生产者"+ Thread.currentThread().getName()
  21. +"发送数据value:"+value+",name:"+name);
  22. //发布事件
  23. ringBuffer.publish(sequence);
  24. }
  25. }
  26. }


创建消费者,创建 OrderEventHandler 类,并实现 EventHandler ,作为消费者。

  1. public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {
  2. @Override
  3. public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
  4. // TODO 消费逻辑
  5. System.out.println("消费者"+ Thread.currentThread().getName()
  6. +"获取数据value:"+ event.getValue()+",name:"+event.getName());
  7. }
  8. @Override
  9. public void onEvent(OrderEvent event) throws Exception {
  10. // TODO 消费逻辑
  11. System.out.println("消费者"+ Thread.currentThread().getName()
  12. +"获取数据value:"+ event.getValue()+",name:"+event.getName());
  13. }
  14. }

测试类

  1. public class DisruptorDemo {
  2. public static void main(String[] args) throws Exception {
  3. //创建disruptor
  4. Disruptor<OrderEvent> disruptor = new Disruptor<>(
  5. OrderEvent::new,
  6. 1024 * 1024,
  7. Executors.defaultThreadFactory(),
  8. ProducerType.SINGLE, //单生产者
  9. new YieldingWaitStrategy() //等待策略
  10. );
  11. //设置消费者用于处理RingBuffer的事件
  12. //disruptor.handleEventsWith(new OrderEventHandler());
  13. //设置多消费者,消息会被重复消费
  14. //disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
  15. //设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
  16. //disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
  17. //启动disruptor
  18. disruptor.start();
  19. //创建ringbuffer容器
  20. RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
  21. //创建生产者
  22. OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
  23. // 发送消息
  24. for(int i=0;i<100;i++){
  25. eventProducer.onData(i,"Fox"+i);
  26. }
  27. disruptor.shutdown();
  28. }
  29. }

2)单生产者多消费者模式

如果消费者是多个,只需要在调用 handleEventsWith 方法时将多个消费者传递进去。

  1. disruptor.handleEventsWith(new OrderEventHandler());
  2. + disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());

上面传入的两个消费者会重复消费每一条消息,如果想实现一条消息在有多个消费者的情况下, 只会被一个消费者消费,那么需要调用 handleEventsWithWorkerPool 方法。

  1. disruptor.handleEventsWith(new OrderEventHandler());
  2. + disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());

消费者要实现WorkHandler接口

3)多生产者多消费者模式

在实际开发中,多个生产者发送消息,多个消费者处理消息才是常态。

  1. public class DisruptorDemo2 {
  2. public static void main(String[] args) throws Exception {
  3. //创建disruptor
  4. Disruptor<OrderEvent> disruptor = new Disruptor<>(
  5. new OrderEventFactory(),
  6. 1024 * 1024,
  7. Executors.defaultThreadFactory(),
  8. ProducerType.MULTI, //多生产者
  9. new YieldingWaitStrategy() //等待策略
  10. );
  11. //设置消费者用于处理RingBuffer的事件
  12. //disruptor.handleEventsWith(new OrderEventHandler());
  13. //设置多消费者,消息会被重复消费
  14. //disruptor.handleEventsWith(new OrderEventHandler(),new OrderEventHandler());
  15. //设置多消费者,消费者要实现WorkHandler接口,一条消息只会被一个消费者消费
  16. disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
  17. //启动disruptor
  18. disruptor.start();
  19. //创建ringbuffer容器
  20. RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
  21. new Thread(()->{
  22. //创建生产者
  23. OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
  24. // 发送消息
  25. for(int i=0;i<100;i++){
  26. eventProducer.onData(i,"Fox"+i);
  27. }
  28. },"producer1").start();
  29. new Thread(()->{
  30. //创建生产者
  31. OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);
  32. // 发送消息
  33. for(int i=0;i<100;i++){
  34. eventProducer.onData(i,"monkey"+i);
  35. }
  36. },"producer2").start();
  37. //disruptor.shutdown();
  38. }
  39. }

4)消费者优先级模式

在实际场景中,我们通常会因为业务逻辑而形成一条消费链。比如一个消息必须由消费者A -> 消费者B -> 消费者C 的顺序依次进行消费。在配置消费者时,可以通过 .then 方法去实现顺序消费。

  1. disruptor.handleEventsWith(new OrderEventHandler()).then(new OrderEventHandler())
  2. .then(new OrderEventHandler());

handleEventsWith 与 handleEventsWithWorkerPool 都是支持 .then 的,它们可以结合使用。比如可以按照 消费者A -> (消费者B 消费者C) -> 消费者D 的消费顺序。

  1. disruptor.handleEventsWith(new OrderEventHandler())
  2. .thenHandleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler())
  3. .then(new OrderEventHandler());