介绍 disruptor 事件监听器的串行操作与并行操作方式,源码下载
代码:
实体类:
package com.dmbjz.height.chain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.util.concurrent.atomic.AtomicInteger;
/* 需要操作的数据实体类 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
public class Trade {
private String id;
private String name;
private double price;
private AtomicInteger count = new AtomicInteger(0);
}
事件执行方法:
package com.dmbjz.height.chain;
import com.lmax.disruptor.EventTranslator;
import java.util.Random;
/** 事件执行方法 */
public class TradeEventTranslator implements EventTranslator<Trade> {
private Random random = new Random();
@Override
public void translateTo(Trade event, long sequence) {
event.setPrice(random.nextDouble() * 9999);
}
}
生产者代码:
package com.dmbjz.height.chain;
import com.lmax.disruptor.dsl.Disruptor;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.util.concurrent.CountDownLatch;
/* 生产者 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
public class TradePushlisher implements Runnable {
private Disruptor<Trade> disruptor;
private CountDownLatch latch;
@Override
public void run() {
TradeEventTranslator eventTranslator = new TradeEventTranslator();
for(int i =0; i < 1; i ++){
//新的提交任务的方式
disruptor.publishEvent(eventTranslator);
}
latch.countDown();
}
}
事件监听器一:
package com.dmbjz.height.chain;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import java.util.concurrent.TimeUnit;
/* 监听事件,使用 WorkHandler 与 EventHandler 进行事件监听
* WorkHandler 相较于 EventHandler仅需要一个参数
*/
public class UserHandler1 implements EventHandler<Trade>, WorkHandler<Trade>{
//EventHandler方法
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
this.onEvent(event);
}
//WorkHandler方法
@Override
public void onEvent(Trade event) throws Exception {
System.out.println("UserHandler1 : SET NAME");
TimeUnit.SECONDS.sleep(1);
event.setName("H1");
}
}
事件监听器二:
package com.dmbjz.height.chain;
import com.lmax.disruptor.EventHandler;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/* 监听事件 */
public class UserHandler2 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("UserHandler2 : SET ID");
TimeUnit.SECONDS.sleep(1);
event.setId(UUID.randomUUID().toString());
}
}
事件监听器三:
package com.dmbjz.height.chain;
import com.lmax.disruptor.EventHandler;
/* 监听事件 */
public class UserHandler3 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("UserHandler3 : NAME: "
+ event.getName()
+ ", ID: "
+ event.getId()
+ ", PRICE: "
+ event.getPrice()
+ " INSTANCE : " + event.toString());
}
}
方法案例:
串行方法:
使用链式调用设置事件监听器,事件监听器将会按顺序依次执行
package com.dmbjz.height.chain;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import sun.nio.ch.ThreadPool;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws Exception {
//构建一个线程池用于提交任务
ExecutorService taskPool = Executors.newFixedThreadPool(1);
//创建线程池用于构建Disruptor
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));
//构建Disruptor
Disruptor<Trade> disruptor = new Disruptor<Trade>(
() -> new Trade(),
1024*1024,
threadPool,
ProducerType.SINGLE,
new BusySpinWaitStrategy()
);
/* 把消费者设置到Disruptor中 handleEventsWith */
//串行操作:按添加顺序执行监听器
disruptor.handleEventsWith(new UserHandler1())
.handleEventsWith(new UserHandler2())
.handleEventsWith(new UserHandler3());
//启动disruptor
RingBuffer<Trade> ringBuffer = disruptor.start();
//使用 CountDownLatch 确保资源流释放
CountDownLatch countDownLatch = new CountDownLatch(1);
long begin = System.currentTimeMillis();
//提交任务
taskPool.submit(new TradePushlisher(disruptor,countDownLatch));
try {
countDownLatch.await();
} finally {
disruptor.shutdown();
taskPool.shutdown();
threadPool.shutdown();
}
System.out.println("总耗时: " + (System.currentTimeMillis() - begin));
}
}
并行方法:
使用并行方法执行事件监听器
package com.dmbjz.height.chain;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import sun.nio.ch.ThreadPool;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws Exception {
//构建一个线程池用于提交任务
ExecutorService taskPool = Executors.newFixedThreadPool(1);
//创建线程池用于构建Disruptor
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3,5,10,TimeUnit.MICROSECONDS,new ArrayBlockingQueue<>(10));
//构建Disruptor
Disruptor<Trade> disruptor = new Disruptor<Trade>(
() -> new Trade(),
1024*1024,
threadPool,
ProducerType.SINGLE,
new BusySpinWaitStrategy()
);
/* 把消费者设置到Disruptor中 handleEventsWith */
//并行操作写法一:同时执行监听器
disruptor.handleEventsWith(new UserHandler1());
disruptor.handleEventsWith(new UserHandler2());
disruptor.handleEventsWith(new UserHandler3());
//并行操作写法2:
//disruptor.handleEventsWith(new UserHandler1(),new UserHandler2(),new UserHandler3());
//启动disruptor
RingBuffer<Trade> ringBuffer = disruptor.start();
//使用 CountDownLatch 确保资源流释放
CountDownLatch countDownLatch = new CountDownLatch(1);
long begin = System.currentTimeMillis();
//提交任务
taskPool.submit(new TradePushlisher(disruptor,countDownLatch));
try {
countDownLatch.await();
} finally {
disruptor.shutdown();
taskPool.shutdown();
threadPool.shutdown();
}
System.out.println("总耗时: " + (System.currentTimeMillis() - begin));
}
}