串并行操作一起执行的情况称为菱形操作,需要注意的是在单生产单消费者模式下,构建 Disruptor 的线程池数量必须大于事件监听器Handler的数量
添加事件监听器:
事件监听器四:
package com.dmbjz.height.chain;import com.lmax.disruptor.EventHandler;import java.util.concurrent.TimeUnit;/* 监听事件 */public class UserHandler4 implements EventHandler<Trade> {@Overridepublic void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.out.println("UserHandler4 : SET PRICE");TimeUnit.SECONDS.sleep(1);event.setPrice(17.0);}}
事件监听器五:
package com.dmbjz.height.chain;import com.lmax.disruptor.EventHandler;import java.util.concurrent.TimeUnit;/* 监听事件 */public class UserHandler5 implements EventHandler<Trade> {@Overridepublic void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.out.println("UserHandler5 : GET PRICE: " + event.getPrice());TimeUnit.SECONDS.sleep(1);event.setPrice(event.getPrice() + 3.0);}}
菱形操作案例一:
并行执行 UserHandler1 与 UserHandler2,然后执行 UserHandler3
package com.dmbjz.height.chain;import com.lmax.disruptor.BusySpinWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.EventHandlerGroup;import com.lmax.disruptor.dsl.ProducerType;import sun.nio.ch.ThreadPool;import java.util.concurrent.*;public class Main {public static void main(String[] args) throws Exception {//构建一个线程池用于提交任务ExecutorService taskPool = Executors.newFixedThreadPool(1);//创建线程池用于构建DisruptorThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));//构建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(() -> new Trade(),1024*1024,threadPool,ProducerType.SINGLE,new BusySpinWaitStrategy());//菱形操作写法一:并行执行 UserHandler1 与 UserHandler2,然后执行 UserHandler3disruptor.handleEventsWith(new UserHandler1(),new UserHandler2()).handleEventsWith(new UserHandler3());//菱形操作写法二:并行执行 UserHandler1 与 UserHandler2,然后执行 UserHandler3(then意为"然后")//EventHandlerGroup<Trade> lxGroup = disruptor.handleEventsWith(new UserHandler1(), new UserHandler2());//lxGroup.then(new UserHandler3());//启动disruptorRingBuffer<Trade> ringBuffer = disruptor.start();//使用 CountDownLatch 确保资源流释放CountDownLatch countDownLatch = new CountDownLatch(1);long begin = System.currentTimeMillis();//提交任务taskPool.submit(new TradePushlisher(disruptor,countDownLatch));try {countDownLatch.await();} finally {disruptor.shutdown();taskPool.shutdown();threadPool.shutdown();}System.out.println("总耗时: " + (System.currentTimeMillis() - begin));}}

菱形操作案例二:
并行执行 userHandler1 与 userHandler4,userHandler1 与 userHandler2 串行执行, userHandler4 与 userHandler5 串行执行,最终汇总到 userHandler3

package com.dmbjz.height.chain;import com.lmax.disruptor.BusySpinWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.EventHandlerGroup;import com.lmax.disruptor.dsl.ProducerType;import sun.nio.ch.ThreadPool;import java.util.concurrent.*;public class Main {public static void main(String[] args) throws Exception {//构建一个线程池用于提交任务ExecutorService taskPool = Executors.newFixedThreadPool(1);//创建线程池用于构建DisruptorThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,10,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));//构建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(() -> new Trade(),1024*1024,threadPool,ProducerType.SINGLE,new BusySpinWaitStrategy());/* 把消费者设置到Disruptor中 handleEventsWith *///菱形操作案例二:(单生产者单消费者模式下使用了N个事件监听器,因此Disruptor的线程池线程数至少就需要N个)UserHandler1 userHandler1 = new UserHandler1();UserHandler2 userHandler2 = new UserHandler2();UserHandler3 userHandler3 = new UserHandler3();UserHandler4 userHandler4 = new UserHandler4();UserHandler5 userHandler5 = new UserHandler5();//并行执行 userHandler1 与 userHandler4disruptor.handleEventsWith(userHandler1,userHandler4);//执行完 userHandler1 后执行 userHandler2disruptor.after(userHandler1).handleEventsWith(userHandler2);//执行完 userHandler4 后执行 userHandler5disruptor.after(userHandler4).handleEventsWith(userHandler5);//执行完 userHandler2、userHandler5 后执行 userHandler3(汇总到3)disruptor.after(userHandler2,userHandler5).handleEventsWith(userHandler3);//启动disruptorRingBuffer<Trade> ringBuffer = disruptor.start();//使用 CountDownLatch 确保资源流释放CountDownLatch countDownLatch = new CountDownLatch(1);long begin = System.currentTimeMillis();//提交任务taskPool.submit(new TradePushlisher(disruptor,countDownLatch));try {countDownLatch.await();} finally {disruptor.shutdown();taskPool.shutdown();threadPool.shutdown();}System.out.println("总耗时: " + (System.currentTimeMillis() - begin));}}

