Disruptor是一个用于生产者-消费者模型的高并发框架(CAS),由于其实现原理(RingBuffer环形队列、CAS),性能极高
下面是一个由生产者放入整数,消费者得到后进行平方运算的例子
code
package concurrent;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class C11_Disruptor {
//用于存放数据的类
static class LongEvent {
private long data;
public void setData(long data){ this.data = data; }
public long getData(){ return data; }
}
/*
工厂类,它会在Disruptor系统初始化时,构造所有的缓冲区中的对象实例(Disruptor会预先分配系统)
*/
static class LongEventFactory implements EventFactory<LongEvent>{
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
/*
消费者类,需要实现WorkHandler接口,我们只需要在onEvent中进行数据处理即可,线程同步等事情Disruptor会做
*/
static class Consumer implements WorkHandler<LongEvent>{
@Override
public void onEvent(LongEvent event) throws Exception {
System.out.println(Thread.currentThread().getId() + ":" + event.getData() * event.getData());
}
}
/*
生产者
*/
static class Producer{
private final RingBuffer<LongEvent> ringBuffer; //环形队列
public Producer(RingBuffer<LongEvent> ringBuffer){
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer bb){
long sequence = ringBuffer.next(); //获取下一个可用序列号
try{
//通过序列号获取在环形队列中的入口(空的LongEvent对象),然后设置其数据
LongEvent event = ringBuffer.get(sequence);
event.setData(bb.getLong(0));
}finally {
//数据发布,只有发布后才能被消费者发现
ringBuffer.publish(sequence);
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
LongEventFactory factory = new LongEventFactory();
//RingBuffer环形队列的大小必须是2的整数幂
int bufferSize = 1024;
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
factory, //定义事件工厂 -- 返回事件实例
bufferSize, //环形队列的大小 -- 2的整数幂
executor, //用于事件处理的线程池
ProducerType.SINGLE, //生产者是单例还是多例
new YieldingWaitStrategy() //指定等待策略
/* 等待策略
* BlockingWaitStrategy:默认策略,使用了锁和条件,最节省CPU,但性能最差。
* SleepingWaitStrategy:节省CPU,但是对于数据处理可能产生比较高的平均延时,对生产者线程影响最小
* YieldingWaitStrategy:数据处理延时低,,但需要逻辑CPU数量>消费者线程数(这里的逻辑CPU,指的是“双核四线程”中的四线程)
* BusySpinWaitStrategy:数据处理延时苛刻,消费者线程采取轮询方式监视缓冲区的的变化,这意味着会消耗所有的CPU资源。
* 你的物理CPU数量>消费者线程数
* */
);
//设置处理数据的消费者
disruptor.handleEventsWithWorkerPool(
new Consumer(),
new Consumer(),
new Consumer(),
new Consumer()
);
//初始化Disruptor系统
disruptor.start();
//模拟一个生产者不断向RingBuffer缓冲区添加数据
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);
//分配一个8字节缓冲区 ---- long为8字节
ByteBuffer bb = ByteBuffer.allocate(8);
for(long l=0 ; l<9 ; l++){
bb.putLong(0, l);
producer.pushData(bb);
Thread.sleep(100);
System.out.println("add data " + l);
}
//关闭资源
disruptor.shutdown();
executor.shutdown();
}
}
console
add data 0
11:0
add data 1
14:1
add data 2
13:4
add data 3
12:9
add data 4
11:16
add data 5
14:25
add data 6
13:36
add data 7
12:49
add data 8
11:64