介绍

我们使用原生的方式配置 RabbitMQ,其实是非常麻烦的,所以通常都是使用第三方框架或者工具类来开发,十分不建议在业务代码中混杂连接 RabbitMQ 的操作,而作为最常用的 SpringBoot 框架,自然也有整合 RabbitMQ 的方式,也建议使用 SpringBoot 进行整合开发,因为真的方便了特别多。

导入依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.projectlombok</groupId>
  7. <artifactId>lombok</artifactId>
  8. <optional>true</optional>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. <scope>test</scope>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.amqp</groupId>
  17. <artifactId>spring-rabbit-test</artifactId>
  18. <scope>test</scope>
  19. </dependency>

查看 RabbitAutoConfiguration 类可以查看 SpringBoot 对 RabbitMQ 的自动配置信息:

  • 导入了 RabbitProperties 配置文件类,绑定了配置文件中以 spring.rabbitmq 为前缀的配置。
  • 导入了 RabbitTemplate 类,很明显,这个类和之前的 redisTemplate 等模板类类似,提供对 RabbitMQ 的一系列操作,比如生产和消费。
  • 导入了 AmqpAdmin 类,这个类是用来做声明的,比如声明交换机、队列,在配置文件 spring.rabbitmq.dynamictrue 时才会开启,默认就是开启的。

更多自动配置信息,可以自行查看源码。

配置文件

配置文件信息根据本机的情况而定。

  1. spring:
  2. rabbitmq:
  3. username: admin
  4. password: 123456
  5. host: 192.168.20.10
  6. port: 5672

AmqpAdmin 的使用

前面说过这个类主要用于声明,声明后的交换机、对象和绑定如果在 RabbitMQ 中没有,会自行创建。

在类中,我们可以直接使用自动注入的方式使用 RabbitTemplate

  1. @SpringBootTest
  2. @Slf4j
  3. class SpringBootRabbitmqApplicationTests {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. }

Tip:交换机、队列和绑定的类都位于 org.springframework.amqp.core 包下,注意不要导错包!

声明交换机

需要哪种类型的交换机,就创建 XxxExchange 的实例,通过 amqpAdmin.declareExchange() 方法声明交换机。

  1. @Test
  2. public void directExchange() {
  3. DirectExchange directExchange = new DirectExchange("directExchange");
  4. amqpAdmin.declareExchange(directExchange);
  5. }

当然,除了直接通过构造方法创建,我们也可以通过 ExchangeBuilder 来创建交换机对象,自行选择一种方式即可。

声明队列

通过 QueueBuilder 可以快速构造一个队列对象,由于队列的参数可能较多,所以通过 QueueBuilder 可能会更加方便,最后通过 amqpAdmin.declareQueue() 方法声明对象。

  1. @Test
  2. public void queue() {
  3. Queue queue = QueueBuilder
  4. .nonDurable("myqueue") // 设置不需要持久化和队列名字
  5. .ttl(10000) // 设置消息ttl
  6. .build();
  7. amqpAdmin.declareQueue(queue);
  8. }

QueueBuilder 还可以为队列设置很多参数,通过链式调用即可快速创建实例,具体可以自行探索。

声明绑定关系

通过 BindingBuilder 可以快速构造一个绑定对象,最后通过 amqpAdmin.declareBinding() 方法声明绑定关系。

  1. @Test
  2. public void binding() {
  3. DirectExchange directExchange = new DirectExchange("directExchange");
  4. Queue queue = QueueBuilder
  5. .nonDurable("myqueue") // 设置不需要持久化和队列名字
  6. .ttl(10000) // 设置消息ttl
  7. .build();
  8. Binding binding = BindingBuilder
  9. .bind(queue) // 绑定的队列
  10. .to(directExchange) // 绑定的交换机
  11. .with("exchange_binding_queue"); // 两者的路由Key
  12. amqpAdmin.declareBinding(binding);
  13. }

配置类中声明

除了使用 AmqpAdmin 对象来做声明以外,在配置类中的所有交换机、队列和绑定对象,都会被自动的声明,所以我们也可以直接在配置类中声明以上信息。

  1. @Configuration
  2. public class RabbitConfig {
  3. public final static String EXCHANGE_NAME = "direct_exchange";
  4. public final static String QUEUE_NAME = "direct_queue";
  5. public final static String EXCHANGE_BINDING_QUEUE = "exchange_binding_queue";
  6. @Bean
  7. public DirectExchange exchange() {
  8. return new DirectExchange(EXCHANGE_NAME);
  9. }
  10. @Bean
  11. public Queue queue() {
  12. return QueueBuilder
  13. .nonDurable(QUEUE_NAME)
  14. .build();
  15. }
  16. @Bean
  17. public Binding binding() {
  18. return BindingBuilder
  19. .bind(queue())
  20. .to(exchange())
  21. .with(EXCHANGE_BINDING_QUEUE);
  22. }
  23. }

Tip:下小节 RabbitTemplate 还会使用到这个配置类哟。

RabbitTemplate 的使用

这个对象可以直接帮助我们操作 RabbitMQ,比如生产操作、消费操作。

我们可以使用自动注入的方式使用 rabbitTemplate

  1. @Slf4j
  2. class SpringBootRabbitmq2ApplicationTests {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. }

生产消息

生产消息的方式有两种,如果需要传递对象数据,使用第二种可能会更加方便,根据具体情况自行选择。

  1. /**
  2. * 第一种发送消息的方式
  3. * 这种方式需要我们自己封装Message对象
  4. */
  5. @Test
  6. public void sendMethodOne() {
  7. // 构造方法的第一个参数是消息的byte数据,第二个参数是消息参数,注意不能为空,无参数情况下可以直接传入一个空对象
  8. Message message = new Message("hello rabbit".getBytes(StandardCharsets.UTF_8), new MessageProperties());
  9. // 发送消息
  10. rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME, RabbitConfig.EXCHANGE_BINDING_QUEUE, message);
  11. }
  12. /**
  13. * 第二种发送消息的方式
  14. * 这种方式可以帮我们把发送的消息转换成byte数据
  15. */
  16. @Test
  17. public void sendMethodTwo() {
  18. rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME // 交换机名
  19. , RabbitConfig.EXCHANGE_BINDING_QUEUE // 路由Key
  20. , "hello rabbit"); // 发送的消息(可以是对象)
  21. }

Tip:这两种方法只是作为发送消息的基本方式,其实这两个方法还有很多重载方法,有需要的可以自行尝试使用。

消费消息

消费消息的方式同样有两种,如果只关注消息体的内容,我们可以使用第二种方式更加方便,根据具体情况自行选择。

  1. /**
  2. * 第一种消费消息的方式
  3. * 这种方式获取的是原生的Message消息对象
  4. */
  5. @Test
  6. public void receiveMethodOne() {
  7. // receive()方法第一个参数为队列名称,第二个参数为超时时间
  8. Message message = rabbitTemplate.receive(RabbitConfig.QUEUE_NAME, 200);
  9. Assert.notNull(message, "接收到空消息");
  10. log.info("接收到的消息 -> {}", new String(message.getBody(), StandardCharsets.UTF_8));
  11. }
  12. /**
  13. * 第二种消费消息的方式
  14. * 这种方式获取的是消息体对象,如果没指定对象,则为Object类型
  15. */
  16. @Test
  17. public void receiveMethodTwo() {
  18. // receiveAndConvert()参数含义同上
  19. Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.QUEUE_NAME, 200);
  20. Assert.notNull(o, "接收到空消息");
  21. log.info("接收到的消息 -> {}", o);
  22. log.info("接收到消息的类 -> {}", o.getClass());
  23. }

Tip:这两种方法同样也有很多重载方法,有需要的可以自行尝试使用。

消息序列化

在生产消息的时候,我们不可能每次都发送字符串这种数据,更多时候都是需要传递对象消息,但是如果我们直接使用 convertAndSend() 方法发送一个对象消息,我们在 RabbitMQ 中查看消息就是一团乱码。

SpringBoot 整合 RabbitMQ - 图1

乱码的原因我们在使用 redis 的时候也见到过,就是因为它们底层用的都是 jdk 自带的序列化器,而 jdk 的序列化器就是这样一团乱码的效果,而我们应该换成更加常见的 json 格式数据。

那么我们应该怎么更换呢?查看 rabbitTemplate 源码可以发现一个 MessageConverter 对象,这个对象很明显就是用于消息类型转换的。

SpringBoot 整合 RabbitMQ - 图2

这个对象默认是 SimpleMessageConverter 类,而 SimpleMessageConverter 类其实就是使用的 jdk 方式进行的序列化,而我们要换成 json 的方式,我们就应该考虑是不是 jackson 有没有提供什么消息转换器呢?

答案也是有的,Jackson2JsonMessageConverter 就是 jackson 为我们提供的消息转换器,我们只需要在配置类中注入这个消息转换器即可。

  1. @Configuration
  2. public class RabbitConfig {
  3. // ...
  4. @Bean
  5. public MessageConverter messageConverter() {
  6. return new Jackson2JsonMessageConverter();
  7. }
  8. }

这时候我们可以再尝试发送一个对象到队列中。

  1. @Test
  2. public void sendObject() {
  3. rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME
  4. , RabbitConfig.EXCHANGE_BINDING_QUEUE
  5. , new User(1L, "张三", "123456"));
  6. }

可以看到结果就是 json 格式的消息了。

SpringBoot 整合 RabbitMQ - 图3

@RabbitListener 注解

介绍

上面说过了接收消息的方式,但是有些情况下,我们可能并不是只接收一个消息就够了,比如有一个邮件队列,我们需要为队列中的每个用户都发送邮件,这个时候,如果还使用方法就会比较麻烦,我们还需要频繁的调用方法。

但是我们可以使用一个监听器,这个监听器专门负责监听某个队列,只要队列中有了消息,我们就可以直接取出这个消息做处理,并且只要一监听到消息,就对消息做处理,显然,这种方式比调用方法更加方便。

而 Spring 就为我们提供了 @RabbitListener 注解,这个注解可以放在方法上,表示这个方法用于监听某个队列。

使用

在使用这个注解之前,我们主要在配置类上标注 @EnableRabbit 注解:

  1. @Configuration
  2. @EnableRabbit
  3. public class RabbitConfig {
  4. // ...
  5. }

但是好像较高版本的 SpringBoot 不标注 @EnableRabbit 注解也可以正常使用 @RabbitListener ,导致暂时不知道这个注解有什么用,但是还是建议标上,以防万一。

然后为了测试,我们先生产20条消息:

  1. @Test
  2. public void sendObject() {
  3. for (int i = 0; i < 20; i++) {
  4. rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME // 交换机
  5. , RabbitConfig.EXCHANGE_BINDING_QUEUE // 路由Key
  6. , new User((long) i, "张三" + i, "123456"));
  7. }
  8. }

然后我们使用 @RabbitListener 注解就可以监听某个队列中的消息了,注意,发布消息使用的是 routingKey ,而监听消息时使用的是队列名称,表示监听某个具体队列,不要弄混了。

在监听的方法中,方法可以有参数,Spring 会帮我们自动进行注入,就像 @Controller 中的方法一样,但是这里我们一般注入消息、消息体、信道等对象。

  1. @Slf4j
  2. @Component
  3. public class MessageListener {
  4. /**
  5. * 监听队列
  6. * @param message 原生的消息对象
  7. */
  8. @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
  9. public void receive(Message message) throws IOException {
  10. Assert.notNull(message, "收到空消息");
  11. byte[] body = message.getBody();
  12. User user = new ObjectMapper().readValue(body, User.class);
  13. log.info("receive() 接收到的消息 -> {}", user);
  14. }
  15. /**
  16. * 监听队列
  17. * @param user 仅包含消息体的对象
  18. */
  19. @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
  20. public void receiveBody(User user) {
  21. Assert.notNull(user, "收到空消息");
  22. log.info("receiveBody() 接收到的消息 -> {}", user);
  23. }
  24. }

如上方定义了两个监听器,那么如果是直接交换机,默认还是会采用轮询的方式接收消息,结果如下:

SpringBoot 整合 RabbitMQ - 图4

总结

以上就是 SpringBoot 中整合 RabbitMQ 的基本使用,使用 SpringBoot 之后,不用再像之前一样复杂的发布和接收消息了,我们只需要专注代码的开发,希望本文对您有所帮助。


本章完。