介绍
:::tips Fanout广播模式:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定的所有队列
- 订阅队列的消费者都能拿到消息
SpringAMQP提供了一个接口Exchange,来表示所有不同类型的交换机
:::
引入依赖
:::tips 在父工程中引入SpringAMQP的依赖(在消息生产者和消息消费者中都引入依赖) :::
<!-- SpringAMQP依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
发送消息
添加配置
:::tips 在消息生产者的配置文件中添加配置 :::
spring:
rabbitmq:
#RabbitMQ服务的IP地址
host: IP地址
#RabbitMQ服务的端口号
port: 端口号
#RabbitMQ的虚拟主机
virtual-host: /
#RabbitMQ服务的用户名
username: 用户名
#RabbitMQ服务的密码
password: 密码
编写代码
:::tips 在消息生产者中需要发送消息的类里面注入RabbitTemplate对象,然后调用这个对象的convertAndSend方法(参数一是交换机名称,参数二是路由key(指定空字符),参数三是消息内容)来发送消息 :::
@SpringBootTest
public class MyTest{
//注入RabbitTemplate对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() {
//指定交换机名称
String exchangeName = "exchange.fanout";
//指定路由Key
String routingKey = "";
//构建消息
String message = "这是一条测试消息";
//发送消息
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
}
}
接收消息
添加配置
:::tips 在每个消息消费者的配置文件中添加配置 :::
spring:
rabbitmq:
#RabbitMQ服务的IP地址
host: IP地址
#RabbitMQ服务的端口号
port: 端口号
#RabbitMQ的虚拟主机
virtual-host: /
#RabbitMQ服务的用户名
username: 用户名
#RabbitMQ服务的密码
password: 密码
编写代码
:::tips 在每个消息接收者中都新建一个listener包,然后在listener包下创建一个类,类上需要打上@Component注解将其注册到Spring容器中,否则无法接收消息
在这个类里面创建一个方法,方法上打上@RabbitListener注解,用来监听消息
- 指定@RabbitListener注解的bindings属性为@QueueBinding注解,用来声明绑定队列和交换机,如果队列和交换机不存在会自动创建
- 指定@QueueBinding注解的value属性为@Queue注解,用来声明队列
- 指定@Queue注解的name属性为队列名称,用来指定队列名称
- 指定@QueueBinding注解的exchange属性为@Exchange注解,用来声明交换机
- 指定@Exchange注解的name属性为交换机名称,用来指定交换机名称
- 指定@Exchange注解的type属性为ExchangeTypes.FANOUT,用来指定交换机类型为广播类型
- 指定@QueueBinding注解的value属性为@Queue注解,用来声明队列
在方法中添加一个形参,形参的类型需要和消息生产者发送消息的类型保持一致,那么这个方法就会持续监听指定队列的消息,如果监听到消息,就会将消息传递给形参 :::
//将这个类注册到Spring容器中
@Component
public class RabbitMqListener{
//声明绑定队列和交换机,并监听消息,如果队列和交换机不存在会自动创建
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue1"),
exchange = @Exchange(name = "exchange.fanout",type = ExchangeTypes.FANOUT)
))
public void listen(String msg) throws InterruptedException {
System.out.println("接收到消息:" + msg);
}
}