在广播模式下,消息发送流程是这样的:

  1. 可以有多个队列
  2. 每个队列都要绑定到Exchange(交换机)
  3. 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
  4. 交换机把消息发送给绑定过的所有队列
  5. 订阅队列的消费者都能拿到消息

案例

  • 创建一个交换机 itcast.fanout,类型是Fanout
  • 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout

image-20210717165509466.png

1. 引入依赖

Basic Queue

2.基于Bean声明队列和交换机

Spring提供了一个接口Exchange,来表示所有不同类型的交换机:
image-20210717165552676.png
在consumer中创建一个配置类,声明队列和交换机:

  1. package cn.itcast.mq.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.FanoutExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class FanoutConfig {
  10. /**
  11. * 声明交换机
  12. * @return Fanout类型交换机
  13. */
  14. @Bean
  15. public FanoutExchange fanoutExchange(){
  16. return new FanoutExchange("itcast.fanout");
  17. }
  18. /**
  19. * 第1个队列
  20. */
  21. @Bean
  22. public Queue fanoutQueue1(){
  23. return new Queue("fanout.queue1");
  24. }
  25. /**
  26. * 绑定队列1和交换机
  27. */
  28. @Bean
  29. public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
  30. return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
  31. }
  32. /**
  33. * 第2个队列
  34. */
  35. @Bean
  36. public Queue fanoutQueue2(){
  37. return new Queue("fanout.queue2");
  38. }
  39. /**
  40. * 绑定队列2和交换机
  41. */
  42. @Bean
  43. public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
  44. return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
  45. }
  46. }

3.消息发送

在publisher服务的SpringAmqpTest类中添加测试方法:

  1. @Test
  2. public void testFanoutExchange() {
  3. // 交换机名称
  4. String exchangeName = "itcast.fanout";
  5. // 消息
  6. String message = "hello, everyone!";
  7. //三个参数:1交换机名字,2路由键,传空就行,3消息
  8. rabbitTemplate.convertAndSend(exchangeName, "", message);
  9. }

4.消息接受

在consumer服务下的SpringRabbitListener中添加两个方法,作为消费者:

  1. package cn.itcast.mq.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.stereotype.Component;
  4. import java.time.LocalTime;
  5. @Component
  6. public class SpringRabbitListener {
  7. @RabbitListener(queues = "fanout.queue1")
  8. public void listenFanoutQueue1(String msg) {
  9. System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
  10. }
  11. @RabbitListener(queues = "fanout.queue2")
  12. public void listenFanoutQueue2(String msg) {
  13. System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
  14. }
  15. }

给交换机发消息,绑定这个交换机的队列都得到了交换机发来的消息,消费者也收到了来自队列的消息
image.png