在串行操作与并行操作与菱形操作中的案例演示皆为单生产者模式,从该模式不难看出如果后续需要新增事件监听器,就需要不断修改 disruptor 线程池的线程数,在后续可能不断变更需求的业务环境下该模式并不合适,因此需要使用多生产者模式
实体类:
package com.dmbjz.height.multi;
import lombok.Data;
import lombok.NoArgsConstructor;
/* Disruptor中的 Event */
@Data
@NoArgsConstructor
public class Order {
private String id;
private String name;
private double price;
}
旧版生产者:
package com.dmbjz.height.multi;
import com.lmax.disruptor.RingBuffer;
/* 旧版的生产者发布数据方式 */
public class OldProducer {
private RingBuffer<Order> ringBuffer;
public OldProducer(RingBuffer<Order> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(String uuid) {
long sequence = ringBuffer.next();
try {
Order order = ringBuffer.get(sequence);
order.setId(uuid);
} finally {
ringBuffer.publish(sequence);
}
}
}
事件处理失败操作:
package com.dmbjz.height.multi;
import com.lmax.disruptor.ExceptionHandler;
/* 事件处理失败时的操作 */
public class EventExceptionHandler implements ExceptionHandler<Order> {
@Override
public void handleEventException(Throwable ex, long sequence, Order event) {
System.out.println("消费时出现异常");
}
@Override
public void handleOnStartException(Throwable ex) {
System.out.println("启动时出现异常");
}
@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("关闭时出现异常");
}
}
消费者:
package com.dmbjz.height.multi;
import com.lmax.disruptor.WorkHandler;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/* 消费者 */
public class Consumer implements WorkHandler<Order> {
private String comsumerId;
private static AtomicInteger count = new AtomicInteger(0);
private Random random = new Random();
public Consumer(String comsumerId) {
this.comsumerId = comsumerId;
}
@Override
public void onEvent(Order event) throws Exception {
TimeUnit.MILLISECONDS.sleep(1 * random.nextInt(5));
System.out.println("当前消费者: " + comsumerId + ", 消费信息ID: " + event.getId());
count.incrementAndGet();
}
public int getCount(){
return count.get();
}
}
多生产者案例:
package com.dmbjz.height.multi;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.UUID;
import java.util.concurrent.*;
/**多生产者多消费者模式案例
* Disruptor默认情况下为单生产者
* 多生产者模式下需要自定义 RingBuffer、SequenceBarrier
*/
public class MultiMain {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8,16,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(20));
//对RingBuffer设置多生产者支持
RingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, () -> new Order(), 1024 * 1024, new YieldingWaitStrategy());
//通过RingBuffer创建一个屏障,用于保持对 RingBuffer 的 Producer 和 Consumer 之间的平衡关系
SequenceBarrier barrier = ringBuffer.newBarrier();
//构建多消费者
Consumer[] consumers = new Consumer[10];
for(int i = 0; i < consumers.length; i++) {
consumers[i] = new Consumer("C" + i);
}
//构建多消费者工作池
WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,barrier, new EventExceptionHandler(),consumers);
//每个消费者的Sequence序号都是单独的,通过WorkPool获取每个消费者的序号然后设置到RingBuffer中
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
//启动工作池
workerPool.start(threadPool);
//阻塞
CountDownLatch countDownLatch = new CountDownLatch(1);
//创建100个生产者
for (int i = 0; i < 100; i++) {
//使用旧版方法进行生成者数据发布
OldProducer producer = new OldProducer(ringBuffer);
new Thread(()->{
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) {
producer.sendData(UUID.randomUUID().toString());
}
}).start();
}
TimeUnit.SECONDS.sleep(2);
System.out.println("----------线程创建完毕,开始生产数据----------");
countDownLatch.countDown();
TimeUnit.SECONDS.sleep(10);
System.out.println("任务总数:" + consumers[3].getCount());
}
}
![image.png](https://cdn.nlark.com/yuque/0/2022/png/21405095/1651297842776-18a684fc-d670-4cd3-8f4b-3f9d1fb1bf18.png#clientId=u47bed5a7-3f96-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=312&id=u420050b6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=542&originWidth=919&originalType=binary&ratio=1&rotation=0&showTitle=true&size=118097&status=done&style=stroke&taskId=u8917886c-d9c1-45cc-9638-2201381da28&title=%E8%BF%90%E8%A1%8C%E7%BB%93%E6%9E%9C&width=528.6666870117188 "运行结果")