Disruptor是一个用于生产者-消费者模型的高并发框架(CAS),由于其实现原理(RingBuffer环形队列、CAS),性能极高
下面是一个由生产者放入整数,消费者得到后进行平方运算的例子
code
package concurrent;import com.lmax.disruptor.*;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.dsl.ProducerType;import java.nio.ByteBuffer;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class C11_Disruptor {//用于存放数据的类static class LongEvent {private long data;public void setData(long data){ this.data = data; }public long getData(){ return data; }}/*工厂类,它会在Disruptor系统初始化时,构造所有的缓冲区中的对象实例(Disruptor会预先分配系统)*/static class LongEventFactory implements EventFactory<LongEvent>{@Overridepublic LongEvent newInstance() {return new LongEvent();}}/*消费者类,需要实现WorkHandler接口,我们只需要在onEvent中进行数据处理即可,线程同步等事情Disruptor会做*/static class Consumer implements WorkHandler<LongEvent>{@Overridepublic void onEvent(LongEvent event) throws Exception {System.out.println(Thread.currentThread().getId() + ":" + event.getData() * event.getData());}}/*生产者*/static class Producer{private final RingBuffer<LongEvent> ringBuffer; //环形队列public Producer(RingBuffer<LongEvent> ringBuffer){this.ringBuffer = ringBuffer;}public void pushData(ByteBuffer bb){long sequence = ringBuffer.next(); //获取下一个可用序列号try{//通过序列号获取在环形队列中的入口(空的LongEvent对象),然后设置其数据LongEvent event = ringBuffer.get(sequence);event.setData(bb.getLong(0));}finally {//数据发布,只有发布后才能被消费者发现ringBuffer.publish(sequence);}}}public static void main(String[] args) throws InterruptedException {ExecutorService executor = Executors.newCachedThreadPool();LongEventFactory factory = new LongEventFactory();//RingBuffer环形队列的大小必须是2的整数幂int bufferSize = 1024;Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, //定义事件工厂 -- 返回事件实例bufferSize, //环形队列的大小 -- 2的整数幂executor, //用于事件处理的线程池ProducerType.SINGLE, //生产者是单例还是多例new YieldingWaitStrategy() //指定等待策略/* 等待策略* BlockingWaitStrategy:默认策略,使用了锁和条件,最节省CPU,但性能最差。* SleepingWaitStrategy:节省CPU,但是对于数据处理可能产生比较高的平均延时,对生产者线程影响最小* YieldingWaitStrategy:数据处理延时低,,但需要逻辑CPU数量>消费者线程数(这里的逻辑CPU,指的是“双核四线程”中的四线程)* BusySpinWaitStrategy:数据处理延时苛刻,消费者线程采取轮询方式监视缓冲区的的变化,这意味着会消耗所有的CPU资源。* 你的物理CPU数量>消费者线程数* */);//设置处理数据的消费者disruptor.handleEventsWithWorkerPool(new Consumer(),new Consumer(),new Consumer(),new Consumer());//初始化Disruptor系统disruptor.start();//模拟一个生产者不断向RingBuffer缓冲区添加数据RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();Producer producer = new Producer(ringBuffer);//分配一个8字节缓冲区 ---- long为8字节ByteBuffer bb = ByteBuffer.allocate(8);for(long l=0 ; l<9 ; l++){bb.putLong(0, l);producer.pushData(bb);Thread.sleep(100);System.out.println("add data " + l);}//关闭资源disruptor.shutdown();executor.shutdown();}}
console
add data 011:0add data 114:1add data 213:4add data 312:9add data 411:16add data 514:25add data 613:36add data 712:49add data 811:64
