介绍 disruptor 事件监听器的串行操作与并行操作方式,源码下载

代码:

实体类:

  1. package com.dmbjz.height.chain;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import lombok.experimental.Accessors;
  6. import java.util.concurrent.atomic.AtomicInteger;
  7. /* 需要操作的数据实体类 */
  8. @Data
  9. @AllArgsConstructor
  10. @NoArgsConstructor
  11. @Accessors(chain = true)
  12. public class Trade {
  13. private String id;
  14. private String name;
  15. private double price;
  16. private AtomicInteger count = new AtomicInteger(0);
  17. }

事件执行方法:

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.EventTranslator;
  3. import java.util.Random;
  4. /** 事件执行方法 */
  5. public class TradeEventTranslator implements EventTranslator<Trade> {
  6. private Random random = new Random();
  7. @Override
  8. public void translateTo(Trade event, long sequence) {
  9. event.setPrice(random.nextDouble() * 9999);
  10. }
  11. }

生产者代码:

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.dsl.Disruptor;
  3. import lombok.AllArgsConstructor;
  4. import lombok.Data;
  5. import lombok.NoArgsConstructor;
  6. import lombok.experimental.Accessors;
  7. import java.util.concurrent.CountDownLatch;
  8. /* 生产者 */
  9. @Data
  10. @AllArgsConstructor
  11. @NoArgsConstructor
  12. @Accessors(chain = true)
  13. public class TradePushlisher implements Runnable {
  14. private Disruptor<Trade> disruptor;
  15. private CountDownLatch latch;
  16. @Override
  17. public void run() {
  18. TradeEventTranslator eventTranslator = new TradeEventTranslator();
  19. for(int i =0; i < 1; i ++){
  20. //新的提交任务的方式
  21. disruptor.publishEvent(eventTranslator);
  22. }
  23. latch.countDown();
  24. }
  25. }

事件监听器一:

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.EventHandler;
  3. import com.lmax.disruptor.WorkHandler;
  4. import java.util.concurrent.TimeUnit;
  5. /* 监听事件,使用 WorkHandler 与 EventHandler 进行事件监听
  6. * WorkHandler 相较于 EventHandler仅需要一个参数
  7. */
  8. public class UserHandler1 implements EventHandler<Trade>, WorkHandler<Trade>{
  9. //EventHandler方法
  10. @Override
  11. public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  12. this.onEvent(event);
  13. }
  14. //WorkHandler方法
  15. @Override
  16. public void onEvent(Trade event) throws Exception {
  17. System.out.println("UserHandler1 : SET NAME");
  18. TimeUnit.SECONDS.sleep(1);
  19. event.setName("H1");
  20. }
  21. }

事件监听器二:

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.EventHandler;
  3. import java.util.UUID;
  4. import java.util.concurrent.TimeUnit;
  5. /* 监听事件 */
  6. public class UserHandler2 implements EventHandler<Trade> {
  7. @Override
  8. public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  9. System.out.println("UserHandler2 : SET ID");
  10. TimeUnit.SECONDS.sleep(1);
  11. event.setId(UUID.randomUUID().toString());
  12. }
  13. }

事件监听器三:

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.EventHandler;
  3. /* 监听事件 */
  4. public class UserHandler3 implements EventHandler<Trade> {
  5. @Override
  6. public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  7. System.out.println("UserHandler3 : NAME: "
  8. + event.getName()
  9. + ", ID: "
  10. + event.getId()
  11. + ", PRICE: "
  12. + event.getPrice()
  13. + " INSTANCE : " + event.toString());
  14. }
  15. }

方法案例:

串行方法:

使用链式调用设置事件监听器,事件监听器将会按顺序依次执行

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.BusySpinWaitStrategy;
  3. import com.lmax.disruptor.EventFactory;
  4. import com.lmax.disruptor.RingBuffer;
  5. import com.lmax.disruptor.dsl.Disruptor;
  6. import com.lmax.disruptor.dsl.ProducerType;
  7. import sun.nio.ch.ThreadPool;
  8. import java.util.concurrent.*;
  9. public class Main {
  10. public static void main(String[] args) throws Exception {
  11. //构建一个线程池用于提交任务
  12. ExecutorService taskPool = Executors.newFixedThreadPool(1);
  13. //创建线程池用于构建Disruptor
  14. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));
  15. //构建Disruptor
  16. Disruptor<Trade> disruptor = new Disruptor<Trade>(
  17. () -> new Trade(),
  18. 1024*1024,
  19. threadPool,
  20. ProducerType.SINGLE,
  21. new BusySpinWaitStrategy()
  22. );
  23. /* 把消费者设置到Disruptor中 handleEventsWith */
  24. //串行操作:按添加顺序执行监听器
  25. disruptor.handleEventsWith(new UserHandler1())
  26. .handleEventsWith(new UserHandler2())
  27. .handleEventsWith(new UserHandler3());
  28. //启动disruptor
  29. RingBuffer<Trade> ringBuffer = disruptor.start();
  30. //使用 CountDownLatch 确保资源流释放
  31. CountDownLatch countDownLatch = new CountDownLatch(1);
  32. long begin = System.currentTimeMillis();
  33. //提交任务
  34. taskPool.submit(new TradePushlisher(disruptor,countDownLatch));
  35. try {
  36. countDownLatch.await();
  37. } finally {
  38. disruptor.shutdown();
  39. taskPool.shutdown();
  40. threadPool.shutdown();
  41. }
  42. System.out.println("总耗时: " + (System.currentTimeMillis() - begin));
  43. }
  44. }

image.png

并行方法:

使用并行方法执行事件监听器

  1. package com.dmbjz.height.chain;
  2. import com.lmax.disruptor.BusySpinWaitStrategy;
  3. import com.lmax.disruptor.EventFactory;
  4. import com.lmax.disruptor.RingBuffer;
  5. import com.lmax.disruptor.dsl.Disruptor;
  6. import com.lmax.disruptor.dsl.ProducerType;
  7. import sun.nio.ch.ThreadPool;
  8. import java.util.concurrent.*;
  9. public class Main {
  10. public static void main(String[] args) throws Exception {
  11. //构建一个线程池用于提交任务
  12. ExecutorService taskPool = Executors.newFixedThreadPool(1);
  13. //创建线程池用于构建Disruptor
  14. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));
  15. //构建Disruptor
  16. Disruptor<Trade> disruptor = new Disruptor<Trade>(
  17. () -> new Trade(),
  18. 1024*1024,
  19. threadPool,
  20. ProducerType.SINGLE,
  21. new BusySpinWaitStrategy()
  22. );
  23. /* 把消费者设置到Disruptor中 handleEventsWith */
  24. //并行操作写法一:同时执行监听器
  25. disruptor.handleEventsWith(new UserHandler1());
  26. disruptor.handleEventsWith(new UserHandler2());
  27. disruptor.handleEventsWith(new UserHandler3());
  28. //并行操作写法2:
  29. //disruptor.handleEventsWith(new UserHandler1(),new UserHandler2(),new UserHandler3());
  30. //启动disruptor
  31. RingBuffer<Trade> ringBuffer = disruptor.start();
  32. //使用 CountDownLatch 确保资源流释放
  33. CountDownLatch countDownLatch = new CountDownLatch(1);
  34. long begin = System.currentTimeMillis();
  35. //提交任务
  36. taskPool.submit(new TradePushlisher(disruptor,countDownLatch));
  37. try {
  38. countDownLatch.await();
  39. } finally {
  40. disruptor.shutdown();
  41. taskPool.shutdown();
  42. threadPool.shutdown();
  43. }
  44. System.out.println("总耗时: " + (System.currentTimeMillis() - begin));
  45. }
  46. }


image.png