介绍 disruptor 事件监听器的串行操作与并行操作方式,源码下载
代码:
实体类:
package com.dmbjz.height.chain;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.experimental.Accessors;import java.util.concurrent.atomic.AtomicInteger;/* 需要操作的数据实体类 */@Data@AllArgsConstructor@NoArgsConstructor@Accessors(chain = true)public class Trade {private String id;private String name;private double price;private AtomicInteger count = new AtomicInteger(0);}
事件执行方法:
package com.dmbjz.height.chain;import com.lmax.disruptor.EventTranslator;import java.util.Random;/** 事件执行方法 */public class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();@Overridepublic void translateTo(Trade event, long sequence) {event.setPrice(random.nextDouble() * 9999);}}
生产者代码:
package com.dmbjz.height.chain;import com.lmax.disruptor.dsl.Disruptor;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.experimental.Accessors;import java.util.concurrent.CountDownLatch;/* 生产者 */@Data@AllArgsConstructor@NoArgsConstructor@Accessors(chain = true)public class TradePushlisher implements Runnable {private Disruptor<Trade> disruptor;private CountDownLatch latch;@Overridepublic void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for(int i =0; i < 1; i ++){//新的提交任务的方式disruptor.publishEvent(eventTranslator);}latch.countDown();}}
事件监听器一:
package com.dmbjz.height.chain;import com.lmax.disruptor.EventHandler;import com.lmax.disruptor.WorkHandler;import java.util.concurrent.TimeUnit;/* 监听事件,使用 WorkHandler 与 EventHandler 进行事件监听* WorkHandler 相较于 EventHandler仅需要一个参数*/public class UserHandler1 implements EventHandler<Trade>, WorkHandler<Trade>{//EventHandler方法@Overridepublic void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//WorkHandler方法@Overridepublic void onEvent(Trade event) throws Exception {System.out.println("UserHandler1 : SET NAME");TimeUnit.SECONDS.sleep(1);event.setName("H1");}}
事件监听器二:
package com.dmbjz.height.chain;import com.lmax.disruptor.EventHandler;import java.util.UUID;import java.util.concurrent.TimeUnit;/* 监听事件 */public class UserHandler2 implements EventHandler<Trade> {@Overridepublic void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.out.println("UserHandler2 : SET ID");TimeUnit.SECONDS.sleep(1);event.setId(UUID.randomUUID().toString());}}
事件监听器三:
package com.dmbjz.height.chain;import com.lmax.disruptor.EventHandler;/* 监听事件 */public class UserHandler3 implements EventHandler<Trade> {@Overridepublic void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.out.println("UserHandler3 : NAME: "+ event.getName()+ ", ID: "+ event.getId()+ ", PRICE: "+ event.getPrice()+ " INSTANCE : " + event.toString());}}
方法案例:
串行方法:
使用链式调用设置事件监听器,事件监听器将会按顺序依次执行
package com.dmbjz.height.chain;import com.lmax.disruptor.BusySpinWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import sun.nio.ch.ThreadPool;import java.util.concurrent.*;public class Main {public static void main(String[] args) throws Exception {//构建一个线程池用于提交任务ExecutorService taskPool = Executors.newFixedThreadPool(1);//创建线程池用于构建DisruptorThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));//构建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(() -> new Trade(),1024*1024,threadPool,ProducerType.SINGLE,new BusySpinWaitStrategy());/* 把消费者设置到Disruptor中 handleEventsWith *///串行操作:按添加顺序执行监听器disruptor.handleEventsWith(new UserHandler1()).handleEventsWith(new UserHandler2()).handleEventsWith(new UserHandler3());//启动disruptorRingBuffer<Trade> ringBuffer = disruptor.start();//使用 CountDownLatch 确保资源流释放CountDownLatch countDownLatch = new CountDownLatch(1);long begin = System.currentTimeMillis();//提交任务taskPool.submit(new TradePushlisher(disruptor,countDownLatch));try {countDownLatch.await();} finally {disruptor.shutdown();taskPool.shutdown();threadPool.shutdown();}System.out.println("总耗时: " + (System.currentTimeMillis() - begin));}}

并行方法:
使用并行方法执行事件监听器
package com.dmbjz.height.chain;import com.lmax.disruptor.BusySpinWaitStrategy;import com.lmax.disruptor.EventFactory;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import sun.nio.ch.ThreadPool;import java.util.concurrent.*;public class Main {public static void main(String[] args) throws Exception {//构建一个线程池用于提交任务ExecutorService taskPool = Executors.newFixedThreadPool(1);//创建线程池用于构建DisruptorThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));//构建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(() -> new Trade(),1024*1024,threadPool,ProducerType.SINGLE,new BusySpinWaitStrategy());/* 把消费者设置到Disruptor中 handleEventsWith *///并行操作写法一:同时执行监听器disruptor.handleEventsWith(new UserHandler1());disruptor.handleEventsWith(new UserHandler2());disruptor.handleEventsWith(new UserHandler3());//并行操作写法2://disruptor.handleEventsWith(new UserHandler1(),new UserHandler2(),new UserHandler3());//启动disruptorRingBuffer<Trade> ringBuffer = disruptor.start();//使用 CountDownLatch 确保资源流释放CountDownLatch countDownLatch = new CountDownLatch(1);long begin = System.currentTimeMillis();//提交任务taskPool.submit(new TradePushlisher(disruptor,countDownLatch));try {countDownLatch.await();} finally {disruptor.shutdown();taskPool.shutdown();threadPool.shutdown();}System.out.println("总耗时: " + (System.currentTimeMillis() - begin));}}

