最最简单基本的模式,消费者监听指定的消息队列,只要有值就直接获取,一个消息只有一个消费者可以拿到。

    在common模块下新建一个接口存储常量,多种模式的就依此建吧:

    1. package com.zhaoxy.study.common;
    2. /**
    3. * rabbitmq,一对一消费模式使用的常量
    4. */
    5. public interface Constant1 {
    6. String queueName = "study_module_queue_1";
    7. }

    然后在consumer模块下新建一个监听类,用于接受消息,可以配置成两个,看是不是可以做到一个消息只能被一个人消费到:

    1. package com.zhaoxy.mqconsumer;
    2. import com.zhaoxy.study.common.Constant1;
    3. import org.springframework.amqp.rabbit.annotation.Queue;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. @Component
    7. public class MqConsumerListener {
    8. // queuesToDeclare 自动声明队列 queues需手动先创建队列
    9. @RabbitListener(queuesToDeclare = @Queue(Constant1.queueName))
    10. public void listener(String message){
    11. System.out.println("listener接收到了生产者的消息:" + message);
    12. }
    13. @RabbitListener(queuesToDeclare = @Queue(Constant1.queueName))
    14. public void listenerNew(String message){
    15. System.out.println("listenerNew接收到了生产者的消息:" + message);
    16. }
    17. }
    1. package com.zhaoxy.mqprod;
    2. import com.zhaoxy.study.common.Constant1;
    3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    4. import org.springframework.stereotype.Component;
    5. import javax.annotation.PostConstruct;
    6. import javax.annotation.Resource;
    7. import java.util.UUID;
    8. @Component
    9. public class MqProdService {
    10. @Resource
    11. private RabbitTemplate rabbitTemplate;
    12. /**
    13. * 消息生产者,用途就是每次项目启动后都生成一次消息。
    14. *
    15. * 一遍两遍三四遍,五遍六遍七八遍,
    16. * 九遍十遍十一遍,学习完毕全不见。
    17. */
    18. @PostConstruct
    19. public void prodMessage(){
    20. // 模式1,给指定队列发送一个uuid
    21. for (int i = 0; i < 30; i++) {
    22. rabbitTemplate.convertAndSend(Constant1.queueName,
    23. UUID.randomUUID());
    24. }
    25. }
    26. }

    先启动consumer1服务,完成后清空控制台,再启动prod,然后看consumer1服务控制台上,是否有相同的消息被两个方法都消费后输出了。
    image.png
    可以看出这种模式一个消息只会被一个消费者获取到,那么为什么MQ会靠谱些呢,因为若这些消息没有被消费,监听者使用MQ自动声明队列,会默认设置此队列消息是持久化的,那么生产者在产出数据没被消费的话,这些消息会被留存,然后有消费者来了会一个接一个的传给它。

    想要测试这种情况的话,很简单,先把consumer1关闭,然后多启动prod几次让它多传输一些数据,最后再启动consumer1看下控制台会不会输出积压的消息。

    但是这个有一个问题,就是他的消息属于预取模式,你一个我一个谁也别抢,但是若此时生产者来了100条消息,我们有两个节点,其中一个节点可能配置并不高,它只是个辅助,那么它处理50条的话速度就慢的多了。
    针对这个情况,我们可以设置它能者多劳,一次取一部分,执行完再取下一部分,让节点能够有多大锅下多少米。

    1. spring:
    2. rabbitmq:
    3. # 其他略
    4. listener:
    5. simple:
    6. prefetch: 1 # 每次只能获取指定条数,处理完成才能再次获取

    然后改下consumer1的两个监听,设置线程休眠时间模拟一个性能优秀,和一个辅助,然后看消费同样数量的消息,结果如何。

    1. package com.zhaoxy.mqconsumer;
    2. import com.zhaoxy.study.common.Constant1;
    3. import org.springframework.amqp.rabbit.annotation.Queue;
    4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    5. import org.springframework.stereotype.Component;
    6. @Component
    7. public class MqConsumerListener {
    8. // queuesToDeclare 自动声明队列 queues需手动先创建队列
    9. @RabbitListener(queuesToDeclare = @Queue(Constant1.queueName))
    10. public void listener(String message) throws InterruptedException {
    11. System.out.println("listener接收到了生产者的消息:" + message);
    12. Thread.sleep(4000L); // 模拟这个listener处理业务的速度有点儿拉,速度慢
    13. }
    14. @RabbitListener(queuesToDeclare = @Queue(Constant1.queueName))
    15. public void listenerNew(String message) throws InterruptedException {
    16. System.out.println("listenerNew接收到了生产者的消息:" + message);
    17. Thread.sleep(1000L);// 模拟这个listener处理业务速度杠杠的
    18. }
    19. }

    然后再运行程序试验一下:
    image.png
    能够很明显看到效果非常的明显。

    此模式使用场景可以考虑用于应用集群环境下的消息消费,因为一个消息只会被一个节点消费掉,并不存在争夺资源的情况,只要保证队列是持久化的,那么就足以保证消息的稳定性。

    但是简介里说了MQ是接收生产者发布的消息到交换机,再路由匹配到指定队列的,那么示例代码为什么没写这些,在控制台其实就可以发现,如果没有指定那么使用的交换机是默认的。