最最简单基本的模式,消费者监听指定的消息队列,只要有值就直接获取,一个消息只有一个消费者可以拿到。
在common模块下新建一个接口存储常量,多种模式的就依此建吧:
package com.zhaoxy.study.common;/*** rabbitmq,一对一消费模式使用的常量*/public interface Constant1 {String queueName = "study_module_queue_1";}
然后在consumer模块下新建一个监听类,用于接受消息,可以配置成两个,看是不是可以做到一个消息只能被一个人消费到:
package com.zhaoxy.mqconsumer;import com.zhaoxy.study.common.Constant1;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class MqConsumerListener {// queuesToDeclare 自动声明队列 queues需手动先创建队列@RabbitListener(queuesToDeclare = @Queue(Constant1.queueName))public void listener(String message){System.out.println("listener接收到了生产者的消息:" + message);}@RabbitListener(queuesToDeclare = @Queue(Constant1.queueName))public void listenerNew(String message){System.out.println("listenerNew接收到了生产者的消息:" + message);}}
package com.zhaoxy.mqprod;import com.zhaoxy.study.common.Constant1;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.annotation.Resource;import java.util.UUID;@Componentpublic class MqProdService {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 消息生产者,用途就是每次项目启动后都生成一次消息。** 一遍两遍三四遍,五遍六遍七八遍,* 九遍十遍十一遍,学习完毕全不见。*/@PostConstructpublic void prodMessage(){// 模式1,给指定队列发送一个uuidfor (int i = 0; i < 30; i++) {rabbitTemplate.convertAndSend(Constant1.queueName,UUID.randomUUID());}}}
先启动consumer1服务,完成后清空控制台,再启动prod,然后看consumer1服务控制台上,是否有相同的消息被两个方法都消费后输出了。
可以看出这种模式一个消息只会被一个消费者获取到,那么为什么MQ会靠谱些呢,因为若这些消息没有被消费,监听者使用MQ自动声明队列,会默认设置此队列消息是持久化的,那么生产者在产出数据没被消费的话,这些消息会被留存,然后有消费者来了会一个接一个的传给它。
想要测试这种情况的话,很简单,先把consumer1关闭,然后多启动prod几次让它多传输一些数据,最后再启动consumer1看下控制台会不会输出积压的消息。
但是这个有一个问题,就是他的消息属于预取模式,你一个我一个谁也别抢,但是若此时生产者来了100条消息,我们有两个节点,其中一个节点可能配置并不高,它只是个辅助,那么它处理50条的话速度就慢的多了。
针对这个情况,我们可以设置它能者多劳,一次取一部分,执行完再取下一部分,让节点能够有多大锅下多少米。
spring:rabbitmq:# 其他略listener:simple:prefetch: 1 # 每次只能获取指定条数,处理完成才能再次获取
然后改下consumer1的两个监听,设置线程休眠时间模拟一个性能优秀,和一个辅助,然后看消费同样数量的消息,结果如何。
package com.zhaoxy.mqconsumer;import com.zhaoxy.study.common.Constant1;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class MqConsumerListener {// queuesToDeclare 自动声明队列 queues需手动先创建队列@RabbitListener(queuesToDeclare = @Queue(Constant1.queueName))public void listener(String message) throws InterruptedException {System.out.println("listener接收到了生产者的消息:" + message);Thread.sleep(4000L); // 模拟这个listener处理业务的速度有点儿拉,速度慢}@RabbitListener(queuesToDeclare = @Queue(Constant1.queueName))public void listenerNew(String message) throws InterruptedException {System.out.println("listenerNew接收到了生产者的消息:" + message);Thread.sleep(1000L);// 模拟这个listener处理业务速度杠杠的}}
然后再运行程序试验一下:
能够很明显看到效果非常的明显。
此模式使用场景可以考虑用于应用集群环境下的消息消费,因为一个消息只会被一个节点消费掉,并不存在争夺资源的情况,只要保证队列是持久化的,那么就足以保证消息的稳定性。
但是简介里说了MQ是接收生产者发布的消息到交换机,再路由匹配到指定队列的,那么示例代码为什么没写这些,在控制台其实就可以发现,如果没有指定那么使用的交换机是默认的。
