介绍

消息队列通常有三个概念

  • 发送消息(生产者)
  • 队列
  • 接收消息(消费者)。

RabbitMQ在这个基本概念之上,多做了一层抽象,在发送消息和队列之间,加入了交换机。这样发送消息和队列就没有直接关系,而是通过交换机来做转发,交换机会根据分发策略把消息转给队列。

RabbitMQ比较重要的几个概念:

  • 虚拟主机:RabbitMQ支持权限控制,但是最小控制粒度为虚拟主机。一个虚拟主机可以包含多个交换机、队列、绑定。
  • 交换机:RabbitMQ分发器,根据不同的策略将消息分发到相关的队列。
  • 队列:缓存消息的容器。
  • 绑定:设置交换机与队列的关系。

黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。

image.png

队列

位于org.springframework.amqp.core这个包
当声明一个队列的时候,它会自动绑定到默认交换机上,并以队列名字作为路由键。
队列构造器参数

  • name 队列名字
  • durable=”true” ,持久化 rabbitmq重启的时候不需要创建新的队列,默认是true
  • exclusive 表示该消息队列是否只在当前connection生效,默认是false
  • autoDelete 表示消息队列没有在使用时将被自动删除 默认是false

交换机

服务器必须实现Direct类型交换机,包含一个空白字符串名称的默认交换器。
交换机构造器参数

  • durable=”true” ,rabbitmq重启的时候不需要创建新的交换机
  • auto-delete 表示交换机没有在使用时将被自动删除 默认是false

交换机类型

  • DirectExchange交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
  • topic交换器,采用模糊匹配路由键的原则进行转发消息到队列中
  • fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。

    DirectExchange 直连型交换机

    直连型交换机,根据消息携带的路由键将消息投递给对应队列。

大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

  • 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

@Configuration public class DirectExchangeConfig {

  1. @Bean(name = "queueDirect")
  2. public Queue directQueue(){
  3. return new Queue("queue-direct");
  4. }
  5. @Bean(name = "directExchange")
  6. public DirectExchange directExchange() {
  7. return new DirectExchange("directExchange", true, false);
  8. }
  9. @Bean
  10. public Binding bindingDirect(Queue queueDirect, DirectExchange directExchange) {
  11. return BindingBuilder.bind(queueDirect).to(directExchange).with("routingKey-direct");
  12. }

}

  1. - 生产者
  2. ```java
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class Producter {
  10. private Logger logger = LoggerFactory.getLogger(Producter.class);
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. public void direct(String message){
  14. logger.info("product direct:" + message);
  15. rabbitTemplate.convertAndSend("directExchange","routingKey-direct",message);
  16. }
  17. }
  • 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class);

  1. @RabbitHandler
  2. @RabbitListener(queues = "queue-direct")
  3. public void direct(String message){
  4. logger.info("consume direct:"+message);
  5. }

}

  1. - 测试
  2. ```java
  3. @RestController
  4. @RequestMapping(value = "mq-test")
  5. public class MQTest {
  6. private Logger logger = LoggerFactory.getLogger(MQTest.class);
  7. @Autowired
  8. private Producter producter;
  9. @GetMapping
  10. public void test(){
  11. producter.direct("direct message");
  12. }
  13. }

image.png

TopicExchange 主题交换机

TopicExchange 主题交换机,采用模糊匹配路由键的原则进行转发消息到队列中,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:

  • * (星号) 用来表示一个单词 (必须出现的)
  • # (井号) 用来表示任意数量(零个或多个)单词

通配符的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

主题交换机是非常强大的,理由有二:

  • 当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
  • 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。

  • 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

// topic交换器 采用模糊匹配路由键的原则进行转发消息到队列中 @Configuration public class TopicExchangeConfig { //topic模式 public static final String TOPIC_QUEUE_FIRST = “topicQueueFirst”; public static final String TOPIC_QUEUE_SECOND = “topicQueueSecond”; //路由key public static final String ROUTING_KEY_TOPIC_FIRST = “routingKey-topic-first.*”; public static final String ROUTING_KEY_TOPIC_SECOND = “routingKey-topic-second.#”; // topic交换机 public static final String TOPIC_EXCHANGE = “topicExchange”; @Bean(name = “firstTopicQueue”) public Queue firstTopicQueue() { return new Queue(TOPIC_QUEUE_FIRST,true,false,false); }

  1. @Bean(name = "secondTopicQueue")
  2. public Queue secondTopicQueue() {
  3. return new Queue(TOPIC_QUEUE_SECOND,true,false,false);
  4. }
  5. @Bean(value = "topicExchange")
  6. public TopicExchange topicExchange() {
  7. return new TopicExchange(TOPIC_EXCHANGE);
  8. }
  9. @Bean
  10. public Binding bindingFirstTopicExchange(Queue firstTopicQueue, TopicExchange topicExchange) {
  11. return BindingBuilder.bind(firstTopicQueue).to(topicExchange).with(ROUTING_KEY_TOPIC_FIRST);
  12. }
  13. @Bean
  14. public Binding bindingSecondTopicExchange(Queue secondTopicQueue, TopicExchange topicExchange) {
  15. return BindingBuilder.bind(secondTopicQueue).to(topicExchange).with(ROUTING_KEY_TOPIC_SECOND);
  16. }

}

  1. - 生产者
  2. ```java
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class Producter {
  10. private Logger logger = LoggerFactory.getLogger(Producter.class);
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. public void topicFirstOne(String message){
  14. logger.info("product topic first:" + message);
  15. rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-first.one",message);
  16. }
  17. // 并未发送到topicQueueSecond队列里
  18. public void topicFirstTwo(String message){
  19. logger.info("product topic first:" + message);
  20. rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-first.one.two",message);
  21. }
  22. public void topicSecondOne(String message){
  23. logger.info("product topic second:" + message);
  24. rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-second.one",message);
  25. }
  26. public void topicSecondTwo(String message){
  27. logger.info("product topic second:" + message);
  28. rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-second.one.two",message);
  29. }
  30. }
  • 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class); @Autowired private RabbitTemplate rabbitTemplate;

  1. @RabbitListener(queues = "topicQueueFirst")
  2. @RabbitHandler
  3. public void topicFirst(String message){
  4. logger.info("consume topic first:"+message);
  5. }
  6. @RabbitHandler
  7. @RabbitListener(queues = "topicQueueSecond")
  8. public void topicSecond(String message){
  9. logger.info("consume topic second:"+message);
  10. }

}

  1. - 测试
  2. ```java
  3. @RestController
  4. @RequestMapping(value = "mq-test")
  5. public class MQTest {
  6. private Logger logger = LoggerFactory.getLogger(MQTest.class);
  7. @Autowired
  8. private Producter producter;
  9. @GetMapping
  10. public void test(){
  11. producter.topicFirstOne("First One");
  12. producter.topicFirstTwo("First Two");
  13. producter.topicSecondOne("Second One");
  14. producter.topicSecondTwo("Second Two");
  15. }
  16. }

image.png

FanoutExchang 扇型交换机

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

  • 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

@Configuration public class FanoutExchangeConfig {

  1. @Bean(name = "fanoutQueueFirst")
  2. public Queue firstFanoutQueue() {
  3. return new Queue("queue-fanout-first");
  4. }
  5. @Bean(name = "fanoutQueueSecond")
  6. public Queue secondFanoutQueue() {
  7. return new Queue("queue-fanout-second");
  8. }
  9. @Bean(name = "fanoutExchange")
  10. public FanoutExchange fanoutExchange() {
  11. return new FanoutExchange("fanoutExchange");
  12. }
  13. @Bean
  14. public Binding bindingFirstFanoutExchange(Queue fanoutQueueFirst, FanoutExchange fanoutExchange) {
  15. return BindingBuilder.bind(fanoutQueueFirst).to(fanoutExchange);
  16. }
  17. @Bean
  18. public Binding bindingSecondFanoutExchange(Queue fanoutQueueSecond, FanoutExchange fanoutExchange) {
  19. return BindingBuilder.bind(fanoutQueueSecond).to(fanoutExchange);
  20. }

}

  1. - 生产者
  2. ```java
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Component;
  8. @Component
  9. public class Producter {
  10. private Logger logger = LoggerFactory.getLogger(Producter.class);
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. public void fanout(String message){
  14. logger.info("product fanout: "+message);
  15. rabbitTemplate.convertAndSend("fanoutExchange", null, message);
  16. }
  17. }
  • 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class); @RabbitHandler @RabbitListener(queues = “queue-fanout-first”) public void fanoutFirst(String message){ logger.info(“consume fanout first:”+message); } @RabbitHandler @RabbitListener(queues = “queue-fanout-second”) public void fanoutSecond(String message){ logger.info(“consume fanout second:”+message); } }

  1. - 测试
  2. ```java
  3. @RestController
  4. @RequestMapping(value = "mq-test")
  5. public class MQTest {
  6. private Logger logger = LoggerFactory.getLogger(MQTest.class);
  7. @Autowired
  8. private Producter producter;
  9. @GetMapping
  10. public void test(){
  11. producter.fanout("fanout message");
  12. }
  13. }

image.png

HeadersExchange交换机

  • 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

import java.util.HashMap; import java.util.Map;

@Configuration public class HeadersExchangeConfig { @Bean(name = “headersQueueFirst”) public Queue firstHeadersQueue() { return new Queue(“queue-headers-first”); }

  1. @Bean(name = "headersQueueSecond")
  2. public Queue secondHeadersQueue() {
  3. return new Queue("queue-headers-second");
  4. }
  5. @Bean(name = "headersExchangeFirst")
  6. public HeadersExchange firstHeadersExchange() {
  7. return new HeadersExchange("headersExchange-first");
  8. }
  9. @Bean(name = "headersExchangeSecond")
  10. public HeadersExchange secondHeadersExchange() {
  11. return new HeadersExchange("headersExchange-second");
  12. }
  13. @Bean
  14. public Binding bindingFirstHeadersExchange(Queue headersQueueFirst, HeadersExchange headersExchangeFirst) {
  15. Map<String,Object> headerValues = new HashMap<>();
  16. headerValues.put("type", "message");
  17. headerValues.put("name", "headers");
  18. return BindingBuilder.bind(headersQueueFirst).to(headersExchangeFirst).whereAll(headerValues).match();
  19. }
  20. @Bean
  21. public Binding bindingSecondHeadersExchange(Queue headersQueueSecond, HeadersExchange headersExchangeSecond) {
  22. Map<String,Object> headerValues = new HashMap<>();
  23. headerValues.put("type", "message");
  24. headerValues.put("name", "headers");
  25. return BindingBuilder.bind(headersQueueSecond).to(headersExchangeSecond).whereAny(headerValues).match();
  26. }

}

  1. - 生产者
  2. ```java
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.core.MessageDeliveryMode;
  7. import org.springframework.amqp.core.MessageProperties;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11. import java.util.Map;
  12. @Component
  13. public class Producter {
  14. private Logger logger = LoggerFactory.getLogger(Producter.class);
  15. @Autowired
  16. private RabbitTemplate rabbitTemplate;
  17. public void headersFirst(Map<String, Object> headers,String message){
  18. logger.info("product headers first: "+ message);
  19. rabbitTemplate.convertAndSend("headersExchange-first",null, getMessage(headers, message));
  20. }
  21. public void headersSecond(Map<String, Object> headers,String message){
  22. logger.info("product headers second: "+ message);
  23. rabbitTemplate.convertAndSend("headersExchange-second", null, getMessage(headers, message));
  24. }
  25. private Object getMessage(Map<String, Object> head, String msg) {
  26. // 声明消息 (消息体, 消息属性)
  27. MessageProperties messageProperties = new MessageProperties();
  28. // 设置消息是否持久化。Persistent表示持久化,Non-persistent表示不持久化
  29. messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
  30. messageProperties.setContentType("UTF-8");
  31. messageProperties.getHeaders().putAll(head);
  32. return new Message(msg.getBytes(), messageProperties);
  33. }
  34. }
  • 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class); @RabbitHandler @RabbitListener(queues = “queue-headers-first”) public void headersFirst(String message){ logger.info(“consume headers first:”+message); } @RabbitHandler @RabbitListener(queues = “queue-headers-second”) public void headersSecond(String message){ logger.info(“consume headers second:”+message); } }

  1. - 测试
  2. ```java
  3. @RestController
  4. @RequestMapping(value = "mq-test")
  5. public class MQTest {
  6. private Logger logger = LoggerFactory.getLogger(MQTest.class);
  7. @Autowired
  8. private Producter producter;
  9. @GetMapping
  10. public void test(){
  11. Map<String,Object> headers = new HashMap<>();
  12. headers.put("type", "message");
  13. Map<String,Object> headersAll = new HashMap<>();
  14. headersAll.put("type", "message");
  15. headersAll.put("name", "headers");
  16. producter.headersFirst(headers,"headers message first");
  17. producter.headersFirst(headersAll,"headers message first all");
  18. producter.headersSecond(headers,"headers message second");
  19. producter.headersSecond(headersAll,"headers message second all");
  20. }
  21. }

image.png

拓展

发送对象

注意:传递的对象必须支持序列化(实现了Serializable接口)

  • 对象类 ```java import java.io.Serializable;

public class Message implements Serializable { private Integer status; private String content;

  1. public Message() {
  2. }
  3. public Message(Integer status, String content) {
  4. this.status = status;
  5. this.content = content;
  6. }
  7. @Override
  8. public String toString() {
  9. return "Message{" +
  10. "status=" + status +
  11. ", content='" + content + '\'' +
  12. '}';
  13. }
  14. public Integer getStatus() {
  15. return status;
  16. }
  17. public void setStatus(Integer status) {
  18. this.status = status;
  19. }
  20. public String getContent() {
  21. return content;
  22. }
  23. public void setContent(String content) {
  24. this.content = content;
  25. }

}

  1. - 配置交换机、队列
  2. ```java
  3. import org.springframework.amqp.core.*;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class DirectConfig {
  8. @Bean(name = "classQueue")
  9. public Queue classQueue(){
  10. return new Queue("queue-class");
  11. }
  12. @Bean
  13. public DirectExchange directExchange() {
  14. return new DirectExchange("directExchange-class", true, false);
  15. }
  16. @Bean
  17. public Binding bindingDirect(Queue classQueue, DirectExchange directExchange) {
  18. return BindingBuilder.bind(classQueue).to(directExchange).with("routingKey-class");
  19. }
  20. }
  • 生产者 ```java import com.example.demo.rabbit.entity.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

@Component public class Producter { private Logger logger = LoggerFactory.getLogger(Producter.class);

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. public void classTest(Message message){
  4. logger.info("product class:" + message.toString());
  5. String routingKey = "routingKey-class";
  6. rabbitTemplate.convertAndSend("directExchange-class",routingKey,message);
  7. }

}

  1. - 消费者
  2. ```java
  3. import com.example.demo.rabbit.entity.Message;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  7. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Component;
  11. @Component
  12. public class Consumer {
  13. private Logger logger = LoggerFactory.getLogger(Consumer.class);
  14. @RabbitHandler
  15. @RabbitListener(queues = "queue-class")
  16. public void classTest(Message message){
  17. logger.info("consume class:"+message);
  18. }
  19. }
  • 测试 ```java

@RestController @RequestMapping(value = “mq-test”) public class MQTest { private Logger logger = LoggerFactory.getLogger(MQTest.class);

  1. @Autowired
  2. private Producter producter;
  3. @GetMapping
  4. public void test(){
  5. Message message = new Message(1,"Message");
  6. producter.classTest(message);
  7. }

}

  1. - 结果
  2. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/1416299/1630305134801-14c015cc-3f2a-40b7-852c-ab4af64c036b.png#clientId=uc8cf89ec-e11c-4&from=paste&height=38&id=u2f9d40e9&margin=%5Bobject%20Object%5D&name=image.png&originHeight=50&originWidth=864&originalType=binary&ratio=1&size=39169&status=done&style=none&taskId=u42f207d7-fbf5-42f4-91aa-1d29a1c3045&width=648)
  3. <a name="N9A4U"></a>
  4. ### 工作模式 - 多个消费者
  5. 采用上面DirectExchange交换机,需启动多个消费者,并不会重复消费
  6. - 消费者
  7. ```java
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  11. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  12. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.stereotype.Component;
  15. @Component
  16. public class Consumer {
  17. private Logger logger = LoggerFactory.getLogger(Consumer.class);
  18. @RabbitHandler
  19. @RabbitListener(queues = "queue-direct")
  20. public void direct(String message){
  21. logger.info("consume direct:"+message);
  22. }
  23. @RabbitHandler
  24. @RabbitListener(queues = "queue-direct")
  25. public void direct1(String message){
  26. logger.info("consume direct 1:"+message);
  27. }
  28. }
  • 测试

    1. @RestController
    2. @RequestMapping(value = "mq-test")
    3. public class MQTest {
    4. private Logger logger = LoggerFactory.getLogger(MQTest.class);
    5. @Autowired
    6. private Producter producter;
    7. @GetMapping
    8. public void test(){
    9. for (int i=0;i<5;i++){
    10. producter.direct("direct message "+i);
    11. }
    12. }
    13. }

    image.png

RPC

RabbitMQ支持RPC远程调用,同步返回结果。返回的结果可以是一个对象
image.png

  • 配置交换机、队列,采用上面DirectExchange 交换机配置
  • 生产者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;

@Component public class Producter { private Logger logger = LoggerFactory.getLogger(Producter.class); @Autowired private RabbitTemplate rabbitTemplate; public void sender(String content){ logger.info(“product message: “+content); String message= (String) rabbitTemplate.convertSendAndReceive(“directExchange”,”routingKey-direct”,content); logger.info(“return message:”+message); } }

  1. - 消费者
  2. ```java
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. public class Consumer {
  12. private Logger logger = LoggerFactory.getLogger(Consumer.class);
  13. @RabbitHandler
  14. @RabbitListener(queues = "queue-direct")
  15. public String receive(String content){
  16. logger.info("consume message: "+content);
  17. return "OK";
  18. }
  19. }
  • 测试

    1. @RestController
    2. @RequestMapping(value = "mq-test")
    3. public class MQTest {
    4. private Logger logger = LoggerFactory.getLogger(MQTest.class);
    5. @Autowired
    6. private Producter producter;
    7. @GetMapping
    8. public void test(){
    9. producter.sender("RPC");
    10. }
    11. }

    image.png