在串行操作与并行操作与菱形操作中的案例演示皆为单生产者模式,从该模式不难看出如果后续需要新增事件监听器,就需要不断修改 disruptor 线程池的线程数,在后续可能不断变更需求的业务环境下该模式并不合适,因此需要使用多生产者模式

实体类:

  1. package com.dmbjz.height.multi;
  2. import lombok.Data;
  3. import lombok.NoArgsConstructor;
  4. /* Disruptor中的 Event */
  5. @Data
  6. @NoArgsConstructor
  7. public class Order {
  8. private String id;
  9. private String name;
  10. private double price;
  11. }

旧版生产者:

  1. package com.dmbjz.height.multi;
  2. import com.lmax.disruptor.RingBuffer;
  3. /* 旧版的生产者发布数据方式 */
  4. public class OldProducer {
  5. private RingBuffer<Order> ringBuffer;
  6. public OldProducer(RingBuffer<Order> ringBuffer) {
  7. this.ringBuffer = ringBuffer;
  8. }
  9. public void sendData(String uuid) {
  10. long sequence = ringBuffer.next();
  11. try {
  12. Order order = ringBuffer.get(sequence);
  13. order.setId(uuid);
  14. } finally {
  15. ringBuffer.publish(sequence);
  16. }
  17. }
  18. }

事件处理失败操作:

  1. package com.dmbjz.height.multi;
  2. import com.lmax.disruptor.ExceptionHandler;
  3. /* 事件处理失败时的操作 */
  4. public class EventExceptionHandler implements ExceptionHandler<Order> {
  5. @Override
  6. public void handleEventException(Throwable ex, long sequence, Order event) {
  7. System.out.println("消费时出现异常");
  8. }
  9. @Override
  10. public void handleOnStartException(Throwable ex) {
  11. System.out.println("启动时出现异常");
  12. }
  13. @Override
  14. public void handleOnShutdownException(Throwable ex) {
  15. System.out.println("关闭时出现异常");
  16. }
  17. }

消费者:

  1. package com.dmbjz.height.multi;
  2. import com.lmax.disruptor.WorkHandler;
  3. import java.util.Random;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. /* 消费者 */
  7. public class Consumer implements WorkHandler<Order> {
  8. private String comsumerId;
  9. private static AtomicInteger count = new AtomicInteger(0);
  10. private Random random = new Random();
  11. public Consumer(String comsumerId) {
  12. this.comsumerId = comsumerId;
  13. }
  14. @Override
  15. public void onEvent(Order event) throws Exception {
  16. TimeUnit.MILLISECONDS.sleep(1 * random.nextInt(5));
  17. System.out.println("当前消费者: " + comsumerId + ", 消费信息ID: " + event.getId());
  18. count.incrementAndGet();
  19. }
  20. public int getCount(){
  21. return count.get();
  22. }
  23. }

多生产者案例:

  1. package com.dmbjz.height.multi;
  2. import com.lmax.disruptor.*;
  3. import com.lmax.disruptor.dsl.ProducerType;
  4. import java.util.UUID;
  5. import java.util.concurrent.*;
  6. /**多生产者多消费者模式案例
  7. * Disruptor默认情况下为单生产者
  8. * 多生产者模式下需要自定义 RingBuffer、SequenceBarrier
  9. */
  10. public class MultiMain {
  11. public static void main(String[] args) throws InterruptedException {
  12. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,16,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(20));
  13. //对RingBuffer设置多生产者支持
  14. RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, () -> new Order(), 1024 * 1024, new YieldingWaitStrategy());
  15. //通过RingBuffer创建一个屏障,用于保持对 RingBuffer 的 Producer 和 Consumer 之间的平衡关系
  16. SequenceBarrier barrier = ringBuffer.newBarrier();
  17. //构建多消费者
  18. Consumer[] consumers = new Consumer[10];
  19. for(int i = 0; i < consumers.length; i++) {
  20. consumers[i] = new Consumer("C" + i);
  21. }
  22. //构建多消费者工作池
  23. WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,barrier, new EventExceptionHandler(),consumers);
  24. //每个消费者的Sequence序号都是单独的,通过WorkPool获取每个消费者的序号然后设置到RingBuffer中
  25. ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
  26. //启动工作池
  27. workerPool.start(threadPool);
  28. //阻塞
  29. CountDownLatch countDownLatch = new CountDownLatch(1);
  30. //创建100个生产者
  31. for (int i = 0; i < 100; i++) {
  32. //使用旧版方法进行生成者数据发布
  33. OldProducer producer = new OldProducer(ringBuffer);
  34. new Thread(()->{
  35. try {
  36. countDownLatch.await();
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. for (int j = 0; j < 100; j++) {
  41. producer.sendData(UUID.randomUUID().toString());
  42. }
  43. }).start();
  44. }
  45. TimeUnit.SECONDS.sleep(2);
  46. System.out.println("----------线程创建完毕,开始生产数据----------");
  47. countDownLatch.countDown();
  48. TimeUnit.SECONDS.sleep(10);
  49. System.out.println("任务总数:" + consumers[3].getCount());
  50. }
  51. }
  1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1651297842776-18a684fc-d670-4cd3-8f4b-3f9d1fb1bf18.png#clientId=u47bed5a7-3f96-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=312&id=u420050b6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=542&originWidth=919&originalType=binary&ratio=1&rotation=0&showTitle=true&size=118097&status=done&style=stroke&taskId=u8917886c-d9c1-45cc-9638-2201381da28&title=%E8%BF%90%E8%A1%8C%E7%BB%93%E6%9E%9C&width=528.6666870117188 "运行结果")