串并行操作一起执行的情况称为菱形操作,需要注意的是在单生产单消费者模式下,构建 Disruptor 的线程池数量必须大于事件监听器Handler的数量

添加事件监听器:

事件监听器四:

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.EventHandler;
  3. import java.util.concurrent.TimeUnit;
  4. /* 监听事件 */
  5. public class UserHandler4 implements EventHandler<Trade> {
  6. @Override
  7. public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  8. System.out.println("UserHandler4 : SET PRICE");
  9. TimeUnit.SECONDS.sleep(1);
  10. event.setPrice(17.0);
  11. }
  12. }

事件监听器五:

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.EventHandler;
  3. import java.util.concurrent.TimeUnit;
  4. /* 监听事件 */
  5. public class UserHandler5 implements EventHandler<Trade> {
  6. @Override
  7. public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  8. System.out.println("UserHandler5 : GET PRICE: " + event.getPrice());
  9. TimeUnit.SECONDS.sleep(1);
  10. event.setPrice(event.getPrice() + 3.0);
  11. }
  12. }

菱形操作案例一:

并行执行 UserHandler1 与 UserHandler2,然后执行 UserHandler3
image.png

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.BusySpinWaitStrategy;
  3. import com.lmax.disruptor.EventFactory;
  4. import com.lmax.disruptor.RingBuffer;
  5. import com.lmax.disruptor.dsl.Disruptor;
  6. import com.lmax.disruptor.dsl.EventHandlerGroup;
  7. import com.lmax.disruptor.dsl.ProducerType;
  8. import sun.nio.ch.ThreadPool;
  9. import java.util.concurrent.*;
  10. public class Main {
  11. public static void main(String[] args) throws Exception {
  12. //构建一个线程池用于提交任务
  13. ExecutorService taskPool = Executors.newFixedThreadPool(1);
  14. //创建线程池用于构建Disruptor
  15. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));
  16. //构建Disruptor
  17. Disruptor<Trade> disruptor = new Disruptor<Trade>(
  18. () -> new Trade(),
  19. 1024*1024,
  20. threadPool,
  21. ProducerType.SINGLE,
  22. new BusySpinWaitStrategy()
  23. );
  24. //菱形操作写法一:并行执行 UserHandler1 与 UserHandler2,然后执行 UserHandler3
  25. disruptor.handleEventsWith(new UserHandler1(),new UserHandler2())
  26. .handleEventsWith(new UserHandler3());
  27. //菱形操作写法二:并行执行 UserHandler1 与 UserHandler2,然后执行 UserHandler3(then意为"然后")
  28. //EventHandlerGroup<Trade> lxGroup = disruptor.handleEventsWith(new UserHandler1(), new UserHandler2());
  29. //lxGroup.then(new UserHandler3());
  30. //启动disruptor
  31. RingBuffer<Trade> ringBuffer = disruptor.start();
  32. //使用 CountDownLatch 确保资源流释放
  33. CountDownLatch countDownLatch = new CountDownLatch(1);
  34. long begin = System.currentTimeMillis();
  35. //提交任务
  36. taskPool.submit(new TradePushlisher(disruptor,countDownLatch));
  37. try {
  38. countDownLatch.await();
  39. } finally {
  40. disruptor.shutdown();
  41. taskPool.shutdown();
  42. threadPool.shutdown();
  43. }
  44. System.out.println("总耗时: " + (System.currentTimeMillis() - begin));
  45. }
  46. }

image.png

菱形操作案例二:

并行执行 userHandler1 与 userHandler4,userHandler1 与 userHandler2 串行执行, userHandler4 与 userHandler5 串行执行,最终汇总到 userHandler3

image.png

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.BusySpinWaitStrategy;
  3. import com.lmax.disruptor.EventFactory;
  4. import com.lmax.disruptor.RingBuffer;
  5. import com.lmax.disruptor.dsl.Disruptor;
  6. import com.lmax.disruptor.dsl.EventHandlerGroup;
  7. import com.lmax.disruptor.dsl.ProducerType;
  8. import sun.nio.ch.ThreadPool;
  9. import java.util.concurrent.*;
  10. public class Main {
  11. public static void main(String[] args) throws Exception {
  12. //构建一个线程池用于提交任务
  13. ExecutorService taskPool = Executors.newFixedThreadPool(1);
  14. //创建线程池用于构建Disruptor
  15. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,10,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));
  16. //构建Disruptor
  17. Disruptor<Trade> disruptor = new Disruptor<Trade>(
  18. () -> new Trade(),
  19. 1024*1024,
  20. threadPool,
  21. ProducerType.SINGLE,
  22. new BusySpinWaitStrategy()
  23. );
  24. /* 把消费者设置到Disruptor中 handleEventsWith */
  25. //菱形操作案例二:(单生产者单消费者模式下使用了N个事件监听器,因此Disruptor的线程池线程数至少就需要N个)
  26. UserHandler1 userHandler1 = new UserHandler1();
  27. UserHandler2 userHandler2 = new UserHandler2();
  28. UserHandler3 userHandler3 = new UserHandler3();
  29. UserHandler4 userHandler4 = new UserHandler4();
  30. UserHandler5 userHandler5 = new UserHandler5();
  31. //并行执行 userHandler1 与 userHandler4
  32. disruptor.handleEventsWith(userHandler1,userHandler4);
  33. //执行完 userHandler1 后执行 userHandler2
  34. disruptor.after(userHandler1).handleEventsWith(userHandler2);
  35. //执行完 userHandler4 后执行 userHandler5
  36. disruptor.after(userHandler4).handleEventsWith(userHandler5);
  37. //执行完 userHandler2、userHandler5 后执行 userHandler3(汇总到3)
  38. disruptor.after(userHandler2,userHandler5).handleEventsWith(userHandler3);
  39. //启动disruptor
  40. RingBuffer<Trade> ringBuffer = disruptor.start();
  41. //使用 CountDownLatch 确保资源流释放
  42. CountDownLatch countDownLatch = new CountDownLatch(1);
  43. long begin = System.currentTimeMillis();
  44. //提交任务
  45. taskPool.submit(new TradePushlisher(disruptor,countDownLatch));
  46. try {
  47. countDownLatch.await();
  48. } finally {
  49. disruptor.shutdown();
  50. taskPool.shutdown();
  51. threadPool.shutdown();
  52. }
  53. System.out.println("总耗时: " + (System.currentTimeMillis() - begin));
  54. }
  55. }

image.png