在串行操作与并行操作与菱形操作中的案例演示皆为单生产者模式,从该模式不难看出如果后续需要新增事件监听器,就需要不断修改 disruptor 线程池的线程数,在后续可能不断变更需求的业务环境下该模式并不合适,因此需要使用多生产者模式
实体类:
package com.dmbjz.height.multi;import lombok.Data;import lombok.NoArgsConstructor;/* Disruptor中的 Event */@Data@NoArgsConstructorpublic class Order {private String id;private String name;private double price;}
旧版生产者:
package com.dmbjz.height.multi;import com.lmax.disruptor.RingBuffer;/* 旧版的生产者发布数据方式 */public class OldProducer {private RingBuffer<Order> ringBuffer;public OldProducer(RingBuffer<Order> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(String uuid) {long sequence = ringBuffer.next();try {Order order = ringBuffer.get(sequence);order.setId(uuid);} finally {ringBuffer.publish(sequence);}}}
事件处理失败操作:
package com.dmbjz.height.multi;import com.lmax.disruptor.ExceptionHandler;/* 事件处理失败时的操作 */public class EventExceptionHandler implements ExceptionHandler<Order> {@Overridepublic void handleEventException(Throwable ex, long sequence, Order event) {System.out.println("消费时出现异常");}@Overridepublic void handleOnStartException(Throwable ex) {System.out.println("启动时出现异常");}@Overridepublic void handleOnShutdownException(Throwable ex) {System.out.println("关闭时出现异常");}}
消费者:
package com.dmbjz.height.multi;import com.lmax.disruptor.WorkHandler;import java.util.Random;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/* 消费者 */public class Consumer implements WorkHandler<Order> {private String comsumerId;private static AtomicInteger count = new AtomicInteger(0);private Random random = new Random();public Consumer(String comsumerId) {this.comsumerId = comsumerId;}@Overridepublic void onEvent(Order event) throws Exception {TimeUnit.MILLISECONDS.sleep(1 * random.nextInt(5));System.out.println("当前消费者: " + comsumerId + ", 消费信息ID: " + event.getId());count.incrementAndGet();}public int getCount(){return count.get();}}
多生产者案例:
package com.dmbjz.height.multi;import com.lmax.disruptor.*;import com.lmax.disruptor.dsl.ProducerType;import java.util.UUID;import java.util.concurrent.*;/**多生产者多消费者模式案例* Disruptor默认情况下为单生产者* 多生产者模式下需要自定义 RingBuffer、SequenceBarrier*/public class MultiMain {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,16,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(20));//对RingBuffer设置多生产者支持RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, () -> new Order(), 1024 * 1024, new YieldingWaitStrategy());//通过RingBuffer创建一个屏障,用于保持对 RingBuffer 的 Producer 和 Consumer 之间的平衡关系SequenceBarrier barrier = ringBuffer.newBarrier();//构建多消费者Consumer[] consumers = new Consumer[10];for(int i = 0; i < consumers.length; i++) {consumers[i] = new Consumer("C" + i);}//构建多消费者工作池WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,barrier, new EventExceptionHandler(),consumers);//每个消费者的Sequence序号都是单独的,通过WorkPool获取每个消费者的序号然后设置到RingBuffer中ringBuffer.addGatingSequences(workerPool.getWorkerSequences());//启动工作池workerPool.start(threadPool);//阻塞CountDownLatch countDownLatch = new CountDownLatch(1);//创建100个生产者for (int i = 0; i < 100; i++) {//使用旧版方法进行生成者数据发布OldProducer producer = new OldProducer(ringBuffer);new Thread(()->{try {countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}for (int j = 0; j < 100; j++) {producer.sendData(UUID.randomUUID().toString());}}).start();}TimeUnit.SECONDS.sleep(2);System.out.println("----------线程创建完毕,开始生产数据----------");countDownLatch.countDown();TimeUnit.SECONDS.sleep(10);System.out.println("任务总数:" + consumers[3].getCount());}}

