介绍
消息队列通常有三个概念
- 发送消息(生产者)
- 队列
- 接收消息(消费者)。
RabbitMQ在这个基本概念之上,多做了一层抽象,在发送消息和队列之间,加入了交换机。这样发送消息和队列就没有直接关系,而是通过交换机来做转发,交换机会根据分发策略把消息转给队列。
RabbitMQ比较重要的几个概念:
- 虚拟主机:RabbitMQ支持权限控制,但是最小控制粒度为虚拟主机。一个虚拟主机可以包含多个交换机、队列、绑定。
- 交换机:RabbitMQ分发器,根据不同的策略将消息分发到相关的队列。
- 队列:缓存消息的容器。
- 绑定:设置交换机与队列的关系。
黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。
队列
位于org.springframework.amqp.core
这个包
当声明一个队列的时候,它会自动绑定到默认交换机上,并以队列名字作为路由键。
队列构造器参数
- name 队列名字
- durable=”true” ,持久化 rabbitmq重启的时候不需要创建新的队列,默认是true
- exclusive 表示该消息队列是否只在当前connection生效,默认是false
- autoDelete 表示消息队列没有在使用时将被自动删除 默认是false
交换机
服务器必须实现Direct类型交换机,包含一个空白字符串名称的默认交换器。
交换机构造器参数
- durable=”true” ,rabbitmq重启的时候不需要创建新的交换机
- auto-delete 表示交换机没有在使用时将被自动删除 默认是false
交换机类型
- DirectExchange交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
- topic交换器,采用模糊匹配路由键的原则进行转发消息到队列中
- fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
DirectExchange 直连型交换机
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
- 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class DirectExchangeConfig {
@Bean(name = "queueDirect")
public Queue directQueue(){
return new Queue("queue-direct");
}
@Bean(name = "directExchange")
public DirectExchange directExchange() {
return new DirectExchange("directExchange", true, false);
}
@Bean
public Binding bindingDirect(Queue queueDirect, DirectExchange directExchange) {
return BindingBuilder.bind(queueDirect).to(directExchange).with("routingKey-direct");
}
}
- 生产者
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producter {
private Logger logger = LoggerFactory.getLogger(Producter.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void direct(String message){
logger.info("product direct:" + message);
rabbitTemplate.convertAndSend("directExchange","routingKey-direct",message);
}
}
- 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitHandler
@RabbitListener(queues = "queue-direct")
public void direct(String message){
logger.info("consume direct:"+message);
}
}
- 测试
```java
@RestController
@RequestMapping(value = "mq-test")
public class MQTest {
private Logger logger = LoggerFactory.getLogger(MQTest.class);
@Autowired
private Producter producter;
@GetMapping
public void test(){
producter.direct("direct message");
}
}
TopicExchange 主题交换机
TopicExchange 主题交换机,采用模糊匹配路由键的原则进行转发消息到队列中,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:
*
(星号) 用来表示一个单词 (必须出现的)#
(井号) 用来表示任意数量(零个或多个)单词
通配符的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
主题交换机是非常强大的,理由有二:
- 当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
- 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
- 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
// topic交换器 采用模糊匹配路由键的原则进行转发消息到队列中 @Configuration public class TopicExchangeConfig { //topic模式 public static final String TOPIC_QUEUE_FIRST = “topicQueueFirst”; public static final String TOPIC_QUEUE_SECOND = “topicQueueSecond”; //路由key public static final String ROUTING_KEY_TOPIC_FIRST = “routingKey-topic-first.*”; public static final String ROUTING_KEY_TOPIC_SECOND = “routingKey-topic-second.#”; // topic交换机 public static final String TOPIC_EXCHANGE = “topicExchange”; @Bean(name = “firstTopicQueue”) public Queue firstTopicQueue() { return new Queue(TOPIC_QUEUE_FIRST,true,false,false); }
@Bean(name = "secondTopicQueue")
public Queue secondTopicQueue() {
return new Queue(TOPIC_QUEUE_SECOND,true,false,false);
}
@Bean(value = "topicExchange")
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding bindingFirstTopicExchange(Queue firstTopicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(firstTopicQueue).to(topicExchange).with(ROUTING_KEY_TOPIC_FIRST);
}
@Bean
public Binding bindingSecondTopicExchange(Queue secondTopicQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(secondTopicQueue).to(topicExchange).with(ROUTING_KEY_TOPIC_SECOND);
}
}
- 生产者
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producter {
private Logger logger = LoggerFactory.getLogger(Producter.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void topicFirstOne(String message){
logger.info("product topic first:" + message);
rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-first.one",message);
}
// 并未发送到topicQueueSecond队列里
public void topicFirstTwo(String message){
logger.info("product topic first:" + message);
rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-first.one.two",message);
}
public void topicSecondOne(String message){
logger.info("product topic second:" + message);
rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-second.one",message);
}
public void topicSecondTwo(String message){
logger.info("product topic second:" + message);
rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-second.one.two",message);
}
}
- 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class); @Autowired private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "topicQueueFirst")
@RabbitHandler
public void topicFirst(String message){
logger.info("consume topic first:"+message);
}
@RabbitHandler
@RabbitListener(queues = "topicQueueSecond")
public void topicSecond(String message){
logger.info("consume topic second:"+message);
}
}
- 测试
```java
@RestController
@RequestMapping(value = "mq-test")
public class MQTest {
private Logger logger = LoggerFactory.getLogger(MQTest.class);
@Autowired
private Producter producter;
@GetMapping
public void test(){
producter.topicFirstOne("First One");
producter.topicFirstTwo("First Two");
producter.topicSecondOne("Second One");
producter.topicSecondTwo("Second Two");
}
}
FanoutExchang 扇型交换机
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
- 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutExchangeConfig {
@Bean(name = "fanoutQueueFirst")
public Queue firstFanoutQueue() {
return new Queue("queue-fanout-first");
}
@Bean(name = "fanoutQueueSecond")
public Queue secondFanoutQueue() {
return new Queue("queue-fanout-second");
}
@Bean(name = "fanoutExchange")
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding bindingFirstFanoutExchange(Queue fanoutQueueFirst, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueueFirst).to(fanoutExchange);
}
@Bean
public Binding bindingSecondFanoutExchange(Queue fanoutQueueSecond, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueueSecond).to(fanoutExchange);
}
}
- 生产者
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producter {
private Logger logger = LoggerFactory.getLogger(Producter.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void fanout(String message){
logger.info("product fanout: "+message);
rabbitTemplate.convertAndSend("fanoutExchange", null, message);
}
}
- 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class); @RabbitHandler @RabbitListener(queues = “queue-fanout-first”) public void fanoutFirst(String message){ logger.info(“consume fanout first:”+message); } @RabbitHandler @RabbitListener(queues = “queue-fanout-second”) public void fanoutSecond(String message){ logger.info(“consume fanout second:”+message); } }
- 测试
```java
@RestController
@RequestMapping(value = "mq-test")
public class MQTest {
private Logger logger = LoggerFactory.getLogger(MQTest.class);
@Autowired
private Producter producter;
@GetMapping
public void test(){
producter.fanout("fanout message");
}
}
HeadersExchange交换机
- 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.HashMap; import java.util.Map;
@Configuration public class HeadersExchangeConfig { @Bean(name = “headersQueueFirst”) public Queue firstHeadersQueue() { return new Queue(“queue-headers-first”); }
@Bean(name = "headersQueueSecond")
public Queue secondHeadersQueue() {
return new Queue("queue-headers-second");
}
@Bean(name = "headersExchangeFirst")
public HeadersExchange firstHeadersExchange() {
return new HeadersExchange("headersExchange-first");
}
@Bean(name = "headersExchangeSecond")
public HeadersExchange secondHeadersExchange() {
return new HeadersExchange("headersExchange-second");
}
@Bean
public Binding bindingFirstHeadersExchange(Queue headersQueueFirst, HeadersExchange headersExchangeFirst) {
Map<String,Object> headerValues = new HashMap<>();
headerValues.put("type", "message");
headerValues.put("name", "headers");
return BindingBuilder.bind(headersQueueFirst).to(headersExchangeFirst).whereAll(headerValues).match();
}
@Bean
public Binding bindingSecondHeadersExchange(Queue headersQueueSecond, HeadersExchange headersExchangeSecond) {
Map<String,Object> headerValues = new HashMap<>();
headerValues.put("type", "message");
headerValues.put("name", "headers");
return BindingBuilder.bind(headersQueueSecond).to(headersExchangeSecond).whereAny(headerValues).match();
}
}
- 生产者
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class Producter {
private Logger logger = LoggerFactory.getLogger(Producter.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void headersFirst(Map<String, Object> headers,String message){
logger.info("product headers first: "+ message);
rabbitTemplate.convertAndSend("headersExchange-first",null, getMessage(headers, message));
}
public void headersSecond(Map<String, Object> headers,String message){
logger.info("product headers second: "+ message);
rabbitTemplate.convertAndSend("headersExchange-second", null, getMessage(headers, message));
}
private Object getMessage(Map<String, Object> head, String msg) {
// 声明消息 (消息体, 消息属性)
MessageProperties messageProperties = new MessageProperties();
// 设置消息是否持久化。Persistent表示持久化,Non-persistent表示不持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
messageProperties.setContentType("UTF-8");
messageProperties.getHeaders().putAll(head);
return new Message(msg.getBytes(), messageProperties);
}
}
- 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class); @RabbitHandler @RabbitListener(queues = “queue-headers-first”) public void headersFirst(String message){ logger.info(“consume headers first:”+message); } @RabbitHandler @RabbitListener(queues = “queue-headers-second”) public void headersSecond(String message){ logger.info(“consume headers second:”+message); } }
- 测试
```java
@RestController
@RequestMapping(value = "mq-test")
public class MQTest {
private Logger logger = LoggerFactory.getLogger(MQTest.class);
@Autowired
private Producter producter;
@GetMapping
public void test(){
Map<String,Object> headers = new HashMap<>();
headers.put("type", "message");
Map<String,Object> headersAll = new HashMap<>();
headersAll.put("type", "message");
headersAll.put("name", "headers");
producter.headersFirst(headers,"headers message first");
producter.headersFirst(headersAll,"headers message first all");
producter.headersSecond(headers,"headers message second");
producter.headersSecond(headersAll,"headers message second all");
}
}
拓展
发送对象
注意:传递的对象必须支持序列化(实现了Serializable接口)
- 对象类 ```java import java.io.Serializable;
public class Message implements Serializable { private Integer status; private String content;
public Message() {
}
public Message(Integer status, String content) {
this.status = status;
this.content = content;
}
@Override
public String toString() {
return "Message{" +
"status=" + status +
", content='" + content + '\'' +
'}';
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
- 配置交换机、队列
```java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
@Bean(name = "classQueue")
public Queue classQueue(){
return new Queue("queue-class");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange-class", true, false);
}
@Bean
public Binding bindingDirect(Queue classQueue, DirectExchange directExchange) {
return BindingBuilder.bind(classQueue).to(directExchange).with("routingKey-class");
}
}
- 生产者 ```java import com.example.demo.rabbit.entity.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Producter { private Logger logger = LoggerFactory.getLogger(Producter.class);
@Autowired
private RabbitTemplate rabbitTemplate;
public void classTest(Message message){
logger.info("product class:" + message.toString());
String routingKey = "routingKey-class";
rabbitTemplate.convertAndSend("directExchange-class",routingKey,message);
}
}
- 消费者
```java
import com.example.demo.rabbit.entity.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitHandler
@RabbitListener(queues = "queue-class")
public void classTest(Message message){
logger.info("consume class:"+message);
}
}
- 测试 ```java
@RestController @RequestMapping(value = “mq-test”) public class MQTest { private Logger logger = LoggerFactory.getLogger(MQTest.class);
@Autowired
private Producter producter;
@GetMapping
public void test(){
Message message = new Message(1,"Message");
producter.classTest(message);
}
}
- 结果
![image.png](https://cdn.nlark.com/yuque/0/2021/png/1416299/1630305134801-14c015cc-3f2a-40b7-852c-ab4af64c036b.png#clientId=uc8cf89ec-e11c-4&from=paste&height=38&id=u2f9d40e9&margin=%5Bobject%20Object%5D&name=image.png&originHeight=50&originWidth=864&originalType=binary&ratio=1&size=39169&status=done&style=none&taskId=u42f207d7-fbf5-42f4-91aa-1d29a1c3045&width=648)
<a name="N9A4U"></a>
### 工作模式 - 多个消费者
采用上面DirectExchange交换机,需启动多个消费者,并不会重复消费
- 消费者
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitHandler
@RabbitListener(queues = "queue-direct")
public void direct(String message){
logger.info("consume direct:"+message);
}
@RabbitHandler
@RabbitListener(queues = "queue-direct")
public void direct1(String message){
logger.info("consume direct 1:"+message);
}
}
测试
@RestController
@RequestMapping(value = "mq-test")
public class MQTest {
private Logger logger = LoggerFactory.getLogger(MQTest.class);
@Autowired
private Producter producter;
@GetMapping
public void test(){
for (int i=0;i<5;i++){
producter.direct("direct message "+i);
}
}
}
RPC
RabbitMQ支持RPC远程调用,同步返回结果。返回的结果可以是一个对象
- 配置交换机、队列,采用上面DirectExchange 交换机配置
- 生产者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Producter { private Logger logger = LoggerFactory.getLogger(Producter.class); @Autowired private RabbitTemplate rabbitTemplate; public void sender(String content){ logger.info(“product message: “+content); String message= (String) rabbitTemplate.convertSendAndReceive(“directExchange”,”routingKey-direct”,content); logger.info(“return message:”+message); } }
- 消费者
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitHandler
@RabbitListener(queues = "queue-direct")
public String receive(String content){
logger.info("consume message: "+content);
return "OK";
}
}
测试
@RestController
@RequestMapping(value = "mq-test")
public class MQTest {
private Logger logger = LoggerFactory.getLogger(MQTest.class);
@Autowired
private Producter producter;
@GetMapping
public void test(){
producter.sender("RPC");
}
}