在广播模式下,消息发送流程是这样的:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
案例
- 创建一个交换机 itcast.fanout,类型是Fanout
- 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout
1. 引入依赖
2.基于Bean声明队列和交换机
Spring提供了一个接口Exchange,来表示所有不同类型的交换机:
在consumer中创建一个配置类,声明队列和交换机:
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FanoutConfig { /** * 声明交换机 * @return Fanout类型交换机 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } /** * 第1个队列 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * 绑定队列1和交换机 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2个队列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 绑定队列2和交换机 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); }}
3.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test public void testFanoutExchange() { // 交换机名称 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, everyone!"; //三个参数:1交换机名字,2路由键,传空就行,3消息 rabbitTemplate.convertAndSend(exchangeName, "", message); }
4.消息接受
在consumer服务下的SpringRabbitListener中添加两个方法,作为消费者:
package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.time.LocalTime;@Componentpublic class SpringRabbitListener { @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }}
给交换机发消息,绑定这个交换机的队列都得到了交换机发来的消息,消费者也收到了来自队列的消息
