Disruptor是一个用于生产者-消费者模型的高并发框架(CAS),由于其实现原理(RingBuffer环形队列、CAS),性能极高

    下面是一个由生产者放入整数,消费者得到后进行平方运算的例子

    code

    1. package concurrent;
    2. import com.lmax.disruptor.*;
    3. import com.lmax.disruptor.dsl.Disruptor;
    4. import com.lmax.disruptor.dsl.ProducerType;
    5. import java.nio.ByteBuffer;
    6. import java.util.concurrent.Executor;
    7. import java.util.concurrent.ExecutorService;
    8. import java.util.concurrent.Executors;
    9. public class C11_Disruptor {
    10. //用于存放数据的类
    11. static class LongEvent {
    12. private long data;
    13. public void setData(long data){ this.data = data; }
    14. public long getData(){ return data; }
    15. }
    16. /*
    17. 工厂类,它会在Disruptor系统初始化时,构造所有的缓冲区中的对象实例(Disruptor会预先分配系统)
    18. */
    19. static class LongEventFactory implements EventFactory<LongEvent>{
    20. @Override
    21. public LongEvent newInstance() {
    22. return new LongEvent();
    23. }
    24. }
    25. /*
    26. 消费者类,需要实现WorkHandler接口,我们只需要在onEvent中进行数据处理即可,线程同步等事情Disruptor会做
    27. */
    28. static class Consumer implements WorkHandler<LongEvent>{
    29. @Override
    30. public void onEvent(LongEvent event) throws Exception {
    31. System.out.println(Thread.currentThread().getId() + ":" + event.getData() * event.getData());
    32. }
    33. }
    34. /*
    35. 生产者
    36. */
    37. static class Producer{
    38. private final RingBuffer<LongEvent> ringBuffer; //环形队列
    39. public Producer(RingBuffer<LongEvent> ringBuffer){
    40. this.ringBuffer = ringBuffer;
    41. }
    42. public void pushData(ByteBuffer bb){
    43. long sequence = ringBuffer.next(); //获取下一个可用序列号
    44. try{
    45. //通过序列号获取在环形队列中的入口(空的LongEvent对象),然后设置其数据
    46. LongEvent event = ringBuffer.get(sequence);
    47. event.setData(bb.getLong(0));
    48. }finally {
    49. //数据发布,只有发布后才能被消费者发现
    50. ringBuffer.publish(sequence);
    51. }
    52. }
    53. }
    54. public static void main(String[] args) throws InterruptedException {
    55. ExecutorService executor = Executors.newCachedThreadPool();
    56. LongEventFactory factory = new LongEventFactory();
    57. //RingBuffer环形队列的大小必须是2的整数幂
    58. int bufferSize = 1024;
    59. Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
    60. factory, //定义事件工厂 -- 返回事件实例
    61. bufferSize, //环形队列的大小 -- 2的整数幂
    62. executor, //用于事件处理的线程池
    63. ProducerType.SINGLE, //生产者是单例还是多例
    64. new YieldingWaitStrategy() //指定等待策略
    65. /* 等待策略
    66. * BlockingWaitStrategy:默认策略,使用了锁和条件,最节省CPU,但性能最差。
    67. * SleepingWaitStrategy:节省CPU,但是对于数据处理可能产生比较高的平均延时,对生产者线程影响最小
    68. * YieldingWaitStrategy:数据处理延时低,,但需要逻辑CPU数量>消费者线程数(这里的逻辑CPU,指的是“双核四线程”中的四线程)
    69. * BusySpinWaitStrategy:数据处理延时苛刻,消费者线程采取轮询方式监视缓冲区的的变化,这意味着会消耗所有的CPU资源。
    70. * 你的物理CPU数量>消费者线程数
    71. * */
    72. );
    73. //设置处理数据的消费者
    74. disruptor.handleEventsWithWorkerPool(
    75. new Consumer(),
    76. new Consumer(),
    77. new Consumer(),
    78. new Consumer()
    79. );
    80. //初始化Disruptor系统
    81. disruptor.start();
    82. //模拟一个生产者不断向RingBuffer缓冲区添加数据
    83. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    84. Producer producer = new Producer(ringBuffer);
    85. //分配一个8字节缓冲区 ---- long为8字节
    86. ByteBuffer bb = ByteBuffer.allocate(8);
    87. for(long l=0 ; l<9 ; l++){
    88. bb.putLong(0, l);
    89. producer.pushData(bb);
    90. Thread.sleep(100);
    91. System.out.println("add data " + l);
    92. }
    93. //关闭资源
    94. disruptor.shutdown();
    95. executor.shutdown();
    96. }
    97. }

    console

    1. add data 0
    2. 11:0
    3. add data 1
    4. 14:1
    5. add data 2
    6. 13:4
    7. add data 3
    8. 12:9
    9. add data 4
    10. 11:16
    11. add data 5
    12. 14:25
    13. add data 6
    14. 13:36
    15. add data 7
    16. 12:49
    17. add data 8
    18. 11:64