串并行操作一起执行的情况称为菱形操作,需要注意的是在单生产单消费者模式下,构建 Disruptor 的线程池数量必须大于事件监听器Handler的数量
添加事件监听器:
事件监听器四:
package com.dmbjz.height.chain;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.TimeUnit;
/* 监听事件 */
public class UserHandler4 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("UserHandler4 : SET PRICE");
TimeUnit.SECONDS.sleep(1);
event.setPrice(17.0);
}
}
事件监听器五:
package com.dmbjz.height.chain;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.TimeUnit;
/* 监听事件 */
public class UserHandler5 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("UserHandler5 : GET PRICE: " + event.getPrice());
TimeUnit.SECONDS.sleep(1);
event.setPrice(event.getPrice() + 3.0);
}
}
菱形操作案例一:
并行执行 UserHandler1 与 UserHandler2,然后执行 UserHandler3
package com.dmbjz.height.chain;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import sun.nio.ch.ThreadPool;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws Exception {
//构建一个线程池用于提交任务
ExecutorService taskPool = Executors.newFixedThreadPool(1);
//创建线程池用于构建Disruptor
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));
//构建Disruptor
Disruptor<Trade> disruptor = new Disruptor<Trade>(
() -> new Trade(),
1024*1024,
threadPool,
ProducerType.SINGLE,
new BusySpinWaitStrategy()
);
//菱形操作写法一:并行执行 UserHandler1 与 UserHandler2,然后执行 UserHandler3
disruptor.handleEventsWith(new UserHandler1(),new UserHandler2())
.handleEventsWith(new UserHandler3());
//菱形操作写法二:并行执行 UserHandler1 与 UserHandler2,然后执行 UserHandler3(then意为"然后")
//EventHandlerGroup<Trade> lxGroup = disruptor.handleEventsWith(new UserHandler1(), new UserHandler2());
//lxGroup.then(new UserHandler3());
//启动disruptor
RingBuffer<Trade> ringBuffer = disruptor.start();
//使用 CountDownLatch 确保资源流释放
CountDownLatch countDownLatch = new CountDownLatch(1);
long begin = System.currentTimeMillis();
//提交任务
taskPool.submit(new TradePushlisher(disruptor,countDownLatch));
try {
countDownLatch.await();
} finally {
disruptor.shutdown();
taskPool.shutdown();
threadPool.shutdown();
}
System.out.println("总耗时: " + (System.currentTimeMillis() - begin));
}
}
菱形操作案例二:
并行执行 userHandler1 与 userHandler4,userHandler1 与 userHandler2 串行执行, userHandler4 与 userHandler5 串行执行,最终汇总到 userHandler3
package com.dmbjz.height.chain;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import sun.nio.ch.ThreadPool;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws Exception {
//构建一个线程池用于提交任务
ExecutorService taskPool = Executors.newFixedThreadPool(1);
//创建线程池用于构建Disruptor
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5,10,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));
//构建Disruptor
Disruptor<Trade> disruptor = new Disruptor<Trade>(
() -> new Trade(),
1024*1024,
threadPool,
ProducerType.SINGLE,
new BusySpinWaitStrategy()
);
/* 把消费者设置到Disruptor中 handleEventsWith */
//菱形操作案例二:(单生产者单消费者模式下使用了N个事件监听器,因此Disruptor的线程池线程数至少就需要N个)
UserHandler1 userHandler1 = new UserHandler1();
UserHandler2 userHandler2 = new UserHandler2();
UserHandler3 userHandler3 = new UserHandler3();
UserHandler4 userHandler4 = new UserHandler4();
UserHandler5 userHandler5 = new UserHandler5();
//并行执行 userHandler1 与 userHandler4
disruptor.handleEventsWith(userHandler1,userHandler4);
//执行完 userHandler1 后执行 userHandler2
disruptor.after(userHandler1).handleEventsWith(userHandler2);
//执行完 userHandler4 后执行 userHandler5
disruptor.after(userHandler4).handleEventsWith(userHandler5);
//执行完 userHandler2、userHandler5 后执行 userHandler3(汇总到3)
disruptor.after(userHandler2,userHandler5).handleEventsWith(userHandler3);
//启动disruptor
RingBuffer<Trade> ringBuffer = disruptor.start();
//使用 CountDownLatch 确保资源流释放
CountDownLatch countDownLatch = new CountDownLatch(1);
long begin = System.currentTimeMillis();
//提交任务
taskPool.submit(new TradePushlisher(disruptor,countDownLatch));
try {
countDownLatch.await();
} finally {
disruptor.shutdown();
taskPool.shutdown();
threadPool.shutdown();
}
System.out.println("总耗时: " + (System.currentTimeMillis() - begin));
}
}