SpringAMQP

SpringAMQP 是基于 RabbitMQ 封装的一套模板,并且还利用 SpringBoot 对其实现了自动装配,使用起来非常方便。
SpringAMQP 的官方地址:https://spring.io/projects/spring-amqp
image.png
SpringAMQP 提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了 RabbitTemplate 工具,用于发送消息

简单模式

image.png

  1. 在父工程中引入spring-amqp的依赖

    1. 因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程中: ```xml <?xml version=”1.0” encoding=”UTF-8”?>

      4.0.0

      org.example rabbit-mq 1.0-SNAPSHOT

      publisher consumer 8 8 pom org.springframework.boot spring-boot-starter-parent 2.3.9.RELEASE org.projectlombok lombok org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test com.fasterxml.jackson.core jackson-databind

  1. 2. publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
  2. 1. publisher服务中编写application.yml,添加mq连接信息:
  3. ```yaml
  4. logging:
  5. pattern:
  6. dateformat: MM-dd HH:mm:ss:SSS
  7. spring:
  8. rabbitmq:
  9. host: 192.168.31.168 # rabbitMQ的ip地址
  10. port: 5672 # 端口
  11. username: admin
  12. password: admin
  13. virtual-host: /
  1. 在publisher服务中新建一个测试类,编写测试方法:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class PublisherTest {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void testSendMessage2SimpleQueue() {
      String queueName = "simple.queue";
      String message = "hello, spring amqp!";
      rabbitTemplate.convertAndSend(queueName, message);
    }
    }
    

    image.png

  1. 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

    1. 在consumer服务中编写application.yml,添加mq连接信息:

      logging:
      pattern:
      dateformat: MM-dd HH:mm:ss:SSS
      spring:
      rabbitmq:
      host: 192.168.31.168 # rabbitMQ的ip地址
      port: 5672 # 端口
      username: admin
      password: admin
      virtual-host: /
      
    2. 在consumer服务中新建一个类,编写消费逻辑:

      @Component
      public class Consumer {
      
      @RabbitListener(queues = "simple.queue")
      public void listenSimpleQueueMessage(String msg) {
        System.out.println("消费者收到消息:【 " + msg + " 】");
      }
      }
      
  2. 启动消费者服务

    @SpringBootApplication
    public class ConsumerApplication {
     public static void main(String[] args) {
         SpringApplication.run(ConsumerApplication.class);
     }
    
     @Bean
     public MessageConverter messageConverter(){
         return new Jackson2JsonMessageConverter();
     }
    }
    

    image.png

Work Queue 工作队列

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积
image.png

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class PublisherTest {
    
     @Autowired
     private RabbitTemplate rabbitTemplate;
    
     @Test
     public void testSendMessage2WorkQueue() throws InterruptedException {
         String queueName = "simple.queue";
         String message = "hello, message--";
         for (int i = 0; i < 50; i++) {
             rabbitTemplate.convertAndSend(queueName, message + i);
             Thread.sleep(20);
         }
     }
    }
    
  2. 在consumer服务中定义两个消息监听者,都监听simple.queue队列,消费者1每秒处理50条消息,消费者2每秒处理10条消息 ```java @Component public class Consumer {

    @RabbitListener(queues = “simple.queue”) public void listenWorkQueueMessage1(String msg) throws InterruptedException {

     System.out.println("消费者1收到消息:【 " + msg + " 】");
     Thread.sleep(25);
    

    }

    @RabbitListener(queues = “simple.queue”) public void listenWorkQueueMessage2(String msg) throws InterruptedException {

     System.err.println("消费者2收到消息:【 " + msg + " 】");
     Thread.sleep(100);
    

    }

}


3. 修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限:
```yaml
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 192.168.31.168 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: admin
    password: admin
    virtual-host: /
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
  1. 输出如下,消费者1以4倍于消费者2的速度处理消息

image.png

Pub/Sub 发布订阅

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

image.png

1、FanoutExchange

SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:
image.png

  1. 在consumer服务中,利用代码声明队列、交换机,并将两者绑定

    1. 在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下: ```java @Configuration public class FanoutConfig {

      // 声明FanoutExchange交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(“itcast.fanout”); }

      // 声明第1个队列 @Bean public Queue fanoutQueue1() { return new Queue(“fanout.queue1”); }

      // 绑定队列1和交换机 @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); }

      // 声明第2个队列 @Bean public Queue fanoutQueue2() { return new Queue(“fanout.queue2”); }

      // 绑定队列2和交换机 @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); }

}


2. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
```java
@Component
public class Consumer {

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }

}
  1. 在publisher中编写测试方法,向itcast.fanout发送消息

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class PublisherTest {
    
     @Test
     public void testFanoutExchange() {
         String exchangeName = "itcast.fanout";
         String message = "hello, everyone!";
         // 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息
         rabbitTemplate.convertAndSend(exchangeName, "", message);
     }
    }
    

2、DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

image.png

  1. 利用@RabbitListener声明Exchange、Queue、RoutingKey

    1. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,
    2. 并利用@RabbitListener声明Exchange、Queue、RoutingKey ```java @Component public class Consumer {

      @RabbitListener(bindings = @QueueBinding( value = @Queue(name = “direct.queue1”), exchange = @Exchange(name = “itcast.direct”, type = ExchangeTypes.DIRECT), key = {“red”, “blue”}))

      public void listenDirectQueue1(String msg) { System.out.println(“消费者1接收到Direct消息:【” + msg + “】”); }

      @RabbitListener(bindings = @QueueBinding( value = @Queue(name = “direct.queue2”), exchange = @Exchange(name = “itcast.direct”, type = ExchangeTypes.DIRECT), key = {“red”, “yellow”}) ) public void listenDirectQueue2(String msg) { System.out.println(“消费者2接收到Direct消息:【” + msg + “】 “); }

}


2. 在publisher中编写测试方法,向itcast. direct发送消息
```java
@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testDirectExchange() {
        // 队列名称
        String exchangeName = "itcast.direct";
        // 消息
        String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
        String message2 = "黄色警报!日本又地震啦!";
        // 发送消息,参数依次为:交换机名称,RoutingKey,消息
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
        rabbitTemplate.convertAndSend(exchangeName, "yellow", message2);

    }
}
  1. 输出

image.png

3、TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词

image.png

  1. 并利用@RabbitListener声明Exchange、Queue、RoutingKey,在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2 ```java @Component public class Consumer {

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = “topic.queue1”), exchange = @Exchange(name = “itcast.topic”, type = ExchangeTypes.TOPIC), key = {“china.#”})) public void listenTopicQueue1(String msg) {

     System.out.println("消费者1接收到Topic消息:【" + msg + "】");
    

    }

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = “topic.queue2”), exchange = @Exchange(name = “itcast.topic”, type = ExchangeTypes.TOPIC), key = “#.news”)) public void listenTopicQueue2(String msg) {

     System.out.println("消费者2接收到Topic消息:【" + msg + "】");
    

    }

}


2. 在publisher中编写测试方法,向itcast. topic发送消息
```java
@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testTopicExchange() {
        // 队列名称
        String exchangeName = "itcast.topic";
        // 消息
        String message = "喜报!孙悟空大战哥斯拉,胜!";
        // 发送消息,参数依次为:交换机名称,RoutingKey,消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }
}
  1. 输出

image.png

发送Object类型消息

说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送,默认是JDK的序列化。

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

  • 在publisher服务和consumer服务都引入Jackson依赖

    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    </dependency>
    
  • 在publisher服务和在consumer服务定义MessageConverter:

    @Bean
    public MessageConverter messageConverter(){
      return new Jackson2JsonMessageConverter();
    }
    
  • 定义一个消费者,监听object.queue队列并消费消息:

    @Component
    public class Consumer {
    
      @RabbitListener(queuesToDeclare = @Queue(value = "object.queue")) //该方式,Queue会不存在自动创建
      public void listenObjectQueue(Map<String, Object> map) {
          System.out.println(map);
      }
    }
    
  • 发送消息

    @Test
    public void testObjectMessage() {
      String queueName = "object.queue";
      Map<String, Object> map = new HashMap<>();
      map.put("name", "迪丽热巴");
      map.put("age", 21);
    
      rabbitTemplate.convertAndSend(queueName, map);
    }
    
  • 输出

image.png

总结:
SpringAMQP中消息的序列化和反序列化是怎么实现的?

  1. 利用MessageConverter实现的,默认是JDK的序列化
  2. 注意发送方与接收方必须使用相同的MessageConverter