介绍
我们使用原生的方式配置 RabbitMQ,其实是非常麻烦的,所以通常都是使用第三方框架或者工具类来开发,十分不建议在业务代码中混杂连接 RabbitMQ 的操作,而作为最常用的 SpringBoot 框架,自然也有整合 RabbitMQ 的方式,也建议使用 SpringBoot 进行整合开发,因为真的方便了特别多。
导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
查看 RabbitAutoConfiguration
类可以查看 SpringBoot 对 RabbitMQ 的自动配置信息:
- 导入了
RabbitProperties
配置文件类,绑定了配置文件中以spring.rabbitmq
为前缀的配置。 - 导入了
RabbitTemplate
类,很明显,这个类和之前的redisTemplate
等模板类类似,提供对 RabbitMQ 的一系列操作,比如生产和消费。 - 导入了
AmqpAdmin
类,这个类是用来做声明的,比如声明交换机、队列,在配置文件spring.rabbitmq.dynamic
为true
时才会开启,默认就是开启的。
更多自动配置信息,可以自行查看源码。
配置文件
配置文件信息根据本机的情况而定。
spring:
rabbitmq:
username: admin
password: 123456
host: 192.168.20.10
port: 5672
AmqpAdmin
的使用
前面说过这个类主要用于声明,声明后的交换机、对象和绑定如果在 RabbitMQ 中没有,会自行创建。
在类中,我们可以直接使用自动注入的方式使用 RabbitTemplate
:
@SpringBootTest
@Slf4j
class SpringBootRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
}
Tip:交换机、队列和绑定的类都位于
org.springframework.amqp.core
包下,注意不要导错包!
声明交换机
需要哪种类型的交换机,就创建 XxxExchange 的实例,通过 amqpAdmin.declareExchange()
方法声明交换机。
@Test
public void directExchange() {
DirectExchange directExchange = new DirectExchange("directExchange");
amqpAdmin.declareExchange(directExchange);
}
当然,除了直接通过构造方法创建,我们也可以通过 ExchangeBuilder
来创建交换机对象,自行选择一种方式即可。
声明队列
通过 QueueBuilder
可以快速构造一个队列对象,由于队列的参数可能较多,所以通过 QueueBuilder
可能会更加方便,最后通过 amqpAdmin.declareQueue()
方法声明对象。
@Test
public void queue() {
Queue queue = QueueBuilder
.nonDurable("myqueue") // 设置不需要持久化和队列名字
.ttl(10000) // 设置消息ttl
.build();
amqpAdmin.declareQueue(queue);
}
QueueBuilder
还可以为队列设置很多参数,通过链式调用即可快速创建实例,具体可以自行探索。
声明绑定关系
通过 BindingBuilder
可以快速构造一个绑定对象,最后通过 amqpAdmin.declareBinding()
方法声明绑定关系。
@Test
public void binding() {
DirectExchange directExchange = new DirectExchange("directExchange");
Queue queue = QueueBuilder
.nonDurable("myqueue") // 设置不需要持久化和队列名字
.ttl(10000) // 设置消息ttl
.build();
Binding binding = BindingBuilder
.bind(queue) // 绑定的队列
.to(directExchange) // 绑定的交换机
.with("exchange_binding_queue"); // 两者的路由Key
amqpAdmin.declareBinding(binding);
}
配置类中声明
除了使用 AmqpAdmin
对象来做声明以外,在配置类中的所有交换机、队列和绑定对象,都会被自动的声明,所以我们也可以直接在配置类中声明以上信息。
@Configuration
public class RabbitConfig {
public final static String EXCHANGE_NAME = "direct_exchange";
public final static String QUEUE_NAME = "direct_queue";
public final static String EXCHANGE_BINDING_QUEUE = "exchange_binding_queue";
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
public Queue queue() {
return QueueBuilder
.nonDurable(QUEUE_NAME)
.build();
}
@Bean
public Binding binding() {
return BindingBuilder
.bind(queue())
.to(exchange())
.with(EXCHANGE_BINDING_QUEUE);
}
}
Tip:下小节
RabbitTemplate
还会使用到这个配置类哟。
RabbitTemplate
的使用
这个对象可以直接帮助我们操作 RabbitMQ,比如生产操作、消费操作。
我们可以使用自动注入的方式使用 rabbitTemplate
:
@Slf4j
class SpringBootRabbitmq2ApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
}
生产消息
生产消息的方式有两种,如果需要传递对象数据,使用第二种可能会更加方便,根据具体情况自行选择。
/**
* 第一种发送消息的方式
* 这种方式需要我们自己封装Message对象
*/
@Test
public void sendMethodOne() {
// 构造方法的第一个参数是消息的byte数据,第二个参数是消息参数,注意不能为空,无参数情况下可以直接传入一个空对象
Message message = new Message("hello rabbit".getBytes(StandardCharsets.UTF_8), new MessageProperties());
// 发送消息
rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME, RabbitConfig.EXCHANGE_BINDING_QUEUE, message);
}
/**
* 第二种发送消息的方式
* 这种方式可以帮我们把发送的消息转换成byte数据
*/
@Test
public void sendMethodTwo() {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME // 交换机名
, RabbitConfig.EXCHANGE_BINDING_QUEUE // 路由Key
, "hello rabbit"); // 发送的消息(可以是对象)
}
Tip:这两种方法只是作为发送消息的基本方式,其实这两个方法还有很多重载方法,有需要的可以自行尝试使用。
消费消息
消费消息的方式同样有两种,如果只关注消息体的内容,我们可以使用第二种方式更加方便,根据具体情况自行选择。
/**
* 第一种消费消息的方式
* 这种方式获取的是原生的Message消息对象
*/
@Test
public void receiveMethodOne() {
// receive()方法第一个参数为队列名称,第二个参数为超时时间
Message message = rabbitTemplate.receive(RabbitConfig.QUEUE_NAME, 200);
Assert.notNull(message, "接收到空消息");
log.info("接收到的消息 -> {}", new String(message.getBody(), StandardCharsets.UTF_8));
}
/**
* 第二种消费消息的方式
* 这种方式获取的是消息体对象,如果没指定对象,则为Object类型
*/
@Test
public void receiveMethodTwo() {
// receiveAndConvert()参数含义同上
Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.QUEUE_NAME, 200);
Assert.notNull(o, "接收到空消息");
log.info("接收到的消息 -> {}", o);
log.info("接收到消息的类 -> {}", o.getClass());
}
Tip:这两种方法同样也有很多重载方法,有需要的可以自行尝试使用。
消息序列化
在生产消息的时候,我们不可能每次都发送字符串这种数据,更多时候都是需要传递对象消息,但是如果我们直接使用 convertAndSend()
方法发送一个对象消息,我们在 RabbitMQ 中查看消息就是一团乱码。
乱码的原因我们在使用 redis 的时候也见到过,就是因为它们底层用的都是 jdk
自带的序列化器,而 jdk
的序列化器就是这样一团乱码的效果,而我们应该换成更加常见的 json
格式数据。
那么我们应该怎么更换呢?查看 rabbitTemplate
源码可以发现一个 MessageConverter
对象,这个对象很明显就是用于消息类型转换的。
这个对象默认是 SimpleMessageConverter
类,而 SimpleMessageConverter
类其实就是使用的 jdk
方式进行的序列化,而我们要换成 json
的方式,我们就应该考虑是不是 jackson
有没有提供什么消息转换器呢?
答案也是有的,Jackson2JsonMessageConverter
就是 jackson
为我们提供的消息转换器,我们只需要在配置类中注入这个消息转换器即可。
@Configuration
public class RabbitConfig {
// ...
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
这时候我们可以再尝试发送一个对象到队列中。
@Test
public void sendObject() {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME
, RabbitConfig.EXCHANGE_BINDING_QUEUE
, new User(1L, "张三", "123456"));
}
可以看到结果就是 json
格式的消息了。
@RabbitListener
注解
介绍
上面说过了接收消息的方式,但是有些情况下,我们可能并不是只接收一个消息就够了,比如有一个邮件队列,我们需要为队列中的每个用户都发送邮件,这个时候,如果还使用方法就会比较麻烦,我们还需要频繁的调用方法。
但是我们可以使用一个监听器,这个监听器专门负责监听某个队列,只要队列中有了消息,我们就可以直接取出这个消息做处理,并且只要一监听到消息,就对消息做处理,显然,这种方式比调用方法更加方便。
而 Spring 就为我们提供了 @RabbitListener
注解,这个注解可以放在方法上,表示这个方法用于监听某个队列。
使用
在使用这个注解之前,我们主要在配置类上标注 @EnableRabbit
注解:
@Configuration
@EnableRabbit
public class RabbitConfig {
// ...
}
但是好像较高版本的 SpringBoot 不标注 @EnableRabbit
注解也可以正常使用 @RabbitListener
,导致暂时不知道这个注解有什么用,但是还是建议标上,以防万一。
然后为了测试,我们先生产20条消息:
@Test
public void sendObject() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME // 交换机
, RabbitConfig.EXCHANGE_BINDING_QUEUE // 路由Key
, new User((long) i, "张三" + i, "123456"));
}
}
然后我们使用 @RabbitListener
注解就可以监听某个队列中的消息了,注意,发布消息使用的是 routingKey
,而监听消息时使用的是队列名称,表示监听某个具体队列,不要弄混了。
在监听的方法中,方法可以有参数,Spring 会帮我们自动进行注入,就像 @Controller
中的方法一样,但是这里我们一般注入消息、消息体、信道等对象。
@Slf4j
@Component
public class MessageListener {
/**
* 监听队列
* @param message 原生的消息对象
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void receive(Message message) throws IOException {
Assert.notNull(message, "收到空消息");
byte[] body = message.getBody();
User user = new ObjectMapper().readValue(body, User.class);
log.info("receive() 接收到的消息 -> {}", user);
}
/**
* 监听队列
* @param user 仅包含消息体的对象
*/
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void receiveBody(User user) {
Assert.notNull(user, "收到空消息");
log.info("receiveBody() 接收到的消息 -> {}", user);
}
}
如上方定义了两个监听器,那么如果是直接交换机,默认还是会采用轮询的方式接收消息,结果如下:
总结
以上就是 SpringBoot 中整合 RabbitMQ 的基本使用,使用 SpringBoot 之后,不用再像之前一样复杂的发布和接收消息了,我们只需要专注代码的开发,希望本文对您有所帮助。
本章完。