1、剖析Disruptor:为什么会这么快

https://www.cnblogs.com/haiq/p/4112689.html

Disruptor单消费者单生产者Demo

  1. public class Test2 {
  2. public static void main(String[] args) {
  3. int ringBufferSize = 1024 * 1024;
  4. //初始化disruptor
  5. Disruptor<OrderEvent1> disruptor = new Disruptor(
  6. //事件工厂 用于初始化事件
  7. new OrdetEventFactory1(),
  8. //ringBufferSize
  9. ringBufferSize,
  10. //线程工厂
  11. r -> {
  12. Thread thread = new Thread(r);
  13. return thread;
  14. },
  15. //生产者类型
  16. ProducerType.SINGLE,
  17. //阻塞策略
  18. new BlockingWaitStrategy());
  19. //设置消费者Handler
  20. disruptor.handleEventsWith(new OrderHandler1());
  21. //启动无锁队列
  22. disruptor.start();
  23. //获取ringBuffer
  24. RingBuffer<OrderEvent1> ringBuffer = disruptor.getRingBuffer();
  25. ByteBuffer allocate = ByteBuffer.allocate(8);
  26. for (int i = 0; i < 100; i++) {
  27. //通过ringBuffer向队列发送order事件
  28. OrderProducer1 orderProducer = new OrderProducer1(ringBuffer);
  29. orderProducer.addData(allocate.putLong(0,i));
  30. }
  31. //关闭队列
  32. disruptor.shutdown();
  33. }
  34. }
  35. class OrdetEventFactory1 implements EventFactory<OrderEvent1> {
  36. @Override
  37. public OrderEvent1 newInstance() {
  38. return new OrderEvent1();
  39. }
  40. }
  41. @Data
  42. @AllArgsConstructor
  43. class OrderProducer1 {
  44. private RingBuffer<OrderEvent1> ringBuffer;
  45. public void addData(ByteBuffer bb){
  46. long next = ringBuffer.next();
  47. try {
  48. OrderEvent1 orderEvent = ringBuffer.get(next);
  49. orderEvent.setValue(bb.getLong(0));
  50. } finally {
  51. ringBuffer.publish(next);
  52. }
  53. }
  54. }
  55. class OrderHandler1 implements EventHandler<OrderEvent1> {
  56. @Override
  57. public void onEvent(OrderEvent1 orderEvent, long l, boolean b) throws Exception {
  58. System.out.println("OrderHandler1:" + orderEvent.getValue());
  59. }
  60. }
  61. @Data
  62. class OrderEvent1 {
  63. private Long value;
  64. }

Disruptor多生产者多消费者Demo

  1. //Disruptor 多生产者多消费者Deom
  2. public class Main {
  3. public static void main(String[] args) throws InterruptedException {
  4. Disruptor<Order2> disruptor = new Disruptor(
  5. new Order2Factory(),
  6. 1024 * 1024,
  7. Executors.newFixedThreadPool(5),
  8. ProducerType.MULTI,
  9. new YieldingWaitStrategy()
  10. );
  11. RingBuffer<Order2> ringBuffer = disruptor.getRingBuffer();
  12. Order2Handle[] order2Handles = new Order2Handle[10];
  13. for (int i = 0; i < 10; i++) {
  14. order2Handles[i] = new Order2Handle("C"+i);
  15. }
  16. disruptor.handleEventsWithWorkerPool(order2Handles);
  17. disruptor.start();
  18. CyclicBarrier barrier = new CyclicBarrier(100);
  19. for (int i = 0; i < 100; i++) {
  20. Producer producer = new Producer(ringBuffer);
  21. new Thread(new Runnable() {
  22. @Override
  23. public void run() {
  24. try {
  25. barrier.await();
  26. }catch (Exception e){
  27. }
  28. for (int j = 0; j < 1000; j++) {
  29. producer.putOrder(UUID.randomUUID().toString());
  30. }
  31. }
  32. }).start();
  33. }
  34. Thread.sleep(2000);
  35. System.err.println("----------线程创建完毕,开始生产数据----------");
  36. Thread.sleep(10000);
  37. System.err.println("任务总数1:" + order2Handles[0].getCount());
  38. }
  39. }
  40. @Data
  41. class Order2{
  42. private String id;
  43. private String data;
  44. }
  45. @Slf4j
  46. @AllArgsConstructor
  47. class Order2Handle implements WorkHandler<Order2>{
  48. private String orderId;
  49. private AtomicInteger count = new AtomicInteger(0);
  50. private Random random = new Random();
  51. public Order2Handle(String orderId){
  52. this.orderId = orderId;
  53. }
  54. @Override
  55. public void onEvent(Order2 order2) throws Exception {
  56. log.info("orderId:{} {}",orderId,order2.getId());
  57. count.incrementAndGet();
  58. }
  59. public int getCount(){
  60. return count.get();
  61. }
  62. }
  63. class Order2Factory implements EventFactory<Order2>{
  64. @Override
  65. public Order2 newInstance() {
  66. return new Order2();
  67. }
  68. }
  69. class EventExceptionHandler implements ExceptionHandler{
  70. @Override
  71. public void handleEventException(Throwable throwable, long l, Object o) {
  72. }
  73. @Override
  74. public void handleOnStartException(Throwable throwable) {
  75. }
  76. @Override
  77. public void handleOnShutdownException(Throwable throwable) {
  78. }
  79. }
  80. class Producer{
  81. private RingBuffer<Order2> ringBuffer;
  82. public Producer(RingBuffer<Order2> ringBuffer){
  83. this.ringBuffer = ringBuffer;
  84. }
  85. public void putOrder(String uuid){
  86. long next = ringBuffer.next();
  87. try {
  88. Order2 order2 = ringBuffer.get(next);
  89. order2.setId(uuid);
  90. } finally {
  91. ringBuffer.publish(next);
  92. }
  93. }
  94. }