发送到该交换机的消息会分发到所有绑定他的队列中等待消费

模拟业务生产者

案例:模拟订单生成之后分发消息

配置文件

  1. server:
  2. port: 29898
  3. # 配置rabbitmq
  4. spring:
  5. rabbitmq:
  6. host: 112.74.175.76
  7. username: admin
  8. password: admin
  9. virtual-host: /
  10. port: 5672

业务类

@Slf4j
@Service
public class OrderService {
    //    获取rabbitmq 对象
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 模拟下订单
     *
     * @return
     */
    public boolean mockAddOrder(String userId, String productId, Integer num) {
        //保存订单业务执行
        String orderId = UUID.randomUUID().toString();
        log.info("订单业务执行完毕");
        log.info("开始异步执行消息分发");
        //通过MQ进行消息分发
        //定义交换机
        String exchangeName = "fanout_order_exchange";
        //定义路由key
        String routingKey = "";

        //消息分发
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
        return false;
    }
}

配置类

@Configuration
public class RabbitConfiguration {
    //    声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    //    声明所要用到的队列:sms msg emile
    @Bean
    public Queue smsQueue() {
        return new Queue("fanout_sms_queue");
    }

    @Bean
    public Queue msgQueue() {
        return new Queue("fanout_msg_queue");
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("fanout_email_queue");
    }

    //    完成交换机和队列的绑定
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding msgBinding() {
        return BindingBuilder.bind(msgQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
}

测试类

@SpringBootTest
class OrderServiceTest {

    @Autowired
    OrderService orderService;
    @Test
    void mockAddOrder() {
        orderService.mockAddOrder("1", "1", 12);
    }
}

模拟消费者

新建一个项目,rabbit-consumer

配置文件

server:
  port: 29897

# 配置rabbitmq
spring:
  rabbitmq:
    host: 112.74.175.76
    username: admin
    password: admin
    virtual-host: /
    port: 5672

消费者监听

通过注解【@RabbitListener(queues = {“email_queue”})】进行监听队列消息
通过注解【@RabbitHandler】将方法定义为rabbitmq的消费方法进行消费消息
注意队列名称必须和生产者生成的队列名称保持一致

@Component
@RabbitListener(queues = {"fanout_email_queue"})
public class FanoutEmailConsumer {
    @RabbitHandler
    public void receiveMsg(String message) {
        System.out.println("fanout :fanout_email_queue---->"+message);
    }

}

结果

成功消费到消息
image.png