介绍
:::tips 在Fanout广播模式中,一条消息会被所有订阅的队列都消费,但是在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用到Direct路由模式
Direct路由模式:
- 队列与交换机不能再任意绑定,而是要指定一个RoutingKey(路由key)
 - 消息生产者向Exchange交换机发送消息时,也必须指定消息的RoutingKey
 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收到消息
 
引入依赖
:::tips 在父工程中引入SpringAMQP的依赖(在消息生产者和消息消费者中都引入依赖) :::
<!-- SpringAMQP依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
发送消息
添加配置
:::tips 在消息生产者的配置文件中添加配置 :::
spring:rabbitmq:#RabbitMQ服务的IP地址host: IP地址#RabbitMQ服务的端口号port: 端口号#RabbitMQ的虚拟主机virtual-host: /#RabbitMQ服务的用户名username: 用户名#RabbitMQ服务的密码password: 密码
编写代码
:::tips 在消息生产者中需要发送消息的类里面注入RabbitTemplate对象,然后调用这个对象的convertAndSend方法(参数一是交换机名称,参数二是路由key,参数三是消息内容)来发送消息 :::
@SpringBootTestpublic class MyTest{//注入RabbitTemplate对象@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {//指定交换机名称String exchangeName = "exchange.direct";//指定路由keyString routingKey = "red";//构建消息String message = "这是一条测试消息";//发送消息rabbitTemplate.convertAndSend(exchangeName, routingKey, message);}}
接收消息
添加配置
:::tips 在每个消息消费者的配置文件中添加配置 :::
spring:rabbitmq:#RabbitMQ服务的IP地址host: IP地址#RabbitMQ服务的端口号port: 端口号#RabbitMQ的虚拟主机virtual-host: /#RabbitMQ服务的用户名username: 用户名#RabbitMQ服务的密码password: 密码
编写代码
:::tips 在每个消息接收者中都新建一个listener包,然后在listener包下创建一个类,类上需要打上@Component注解将其注册到Spring容器中,否则无法接收消息
在这个类里面创建一个方法,方法上打上@RabbitListener注解,用来监听消息
- 指定@RabbitListener注解的bindings属性为@QueueBinding注解,用来声明绑定队列和交换机,如果队列和交换机不存在会自动创建
- 指定@QueueBinding注解的value属性为@Queue注解,用来声明队列
- 指定@Queue注解的name属性为队列名称,用来指定队列名称
 
 - 指定@QueueBinding注解的exchange属性为@Exchange注解,用来声明交换机
- 指定@Exchange注解的name属性为交换机名称,用来指定交换机名称
 - 指定指定@Exchange注解的type属性为ExchangeTypes.DIRECT,用来指定交换机类型为路由类型
 
 - 指定@QueueBinding注解的key属性为路由key,用来指定路由规则
 
 - 指定@QueueBinding注解的value属性为@Queue注解,用来声明队列
 
在方法上添加一个形参,形参的类型需要和消息生产者发送的消息类型保持一致,那么这个方法就会持续监听指定队列的消息,如果监听到消息,就会将消息传递给形参 :::
//将这个类注册到Spring容器中@Componentpublic class RabbitMqListener{//声明绑定队列和交换机,并监听消息,如果队列和交换机不存在会自动创建@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "exchange.direct",type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listen(String msg) throws InterruptedException {System.out.println("接收到消息:" + msg);}}
:::
