模拟业务生产者
配置文件
server:
port: 29898
# 配置rabbitmq
spring:
rabbitmq:
host: 112.74.175.76
username: admin
password: admin
virtual-host: /
port: 5672
业务类
@Slf4j
@Service
public class OrderService {
// 获取rabbitmq 对象
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 模拟下订单
*
* @return
*/
public boolean mockAddOrder(String userId, String productId, Integer num) {
//保存订单业务执行
String orderId = UUID.randomUUID().toString();
log.info("订单业务执行完毕");
log.info("开始异步执行消息分发");
//通过MQ进行消息分发
//定义交换机
String exchangeName = "fanout_order_exchange";
//定义路由key
String routingKey = "";
//消息分发
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
return false;
}
}
配置类
@Configuration
public class RabbitConfiguration {
// 声明注册fanout模式的交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
}
// 声明所要用到的队列:sms msg emile
@Bean
public Queue smsQueue() {
return new Queue("fanout_sms_queue");
}
@Bean
public Queue msgQueue() {
return new Queue("fanout_msg_queue");
}
@Bean
public Queue emailQueue() {
return new Queue("fanout_email_queue");
}
// 完成交换机和队列的绑定
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding msgBinding() {
return BindingBuilder.bind(msgQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
}
测试类
@SpringBootTest
class OrderServiceTest {
@Autowired
OrderService orderService;
@Test
void mockAddOrder() {
orderService.mockAddOrder("1", "1", 12);
}
}
模拟消费者
配置文件
server:
port: 29897
# 配置rabbitmq
spring:
rabbitmq:
host: 112.74.175.76
username: admin
password: admin
virtual-host: /
port: 5672
消费者监听
通过注解【@RabbitListener(queues = {“email_queue”})】进行监听队列消息
通过注解【@RabbitHandler】将方法定义为rabbitmq的消费方法进行消费消息
注意队列名称必须和生产者生成的队列名称保持一致
@Component
@RabbitListener(queues = {"fanout_email_queue"})
public class FanoutEmailConsumer {
@RabbitHandler
public void receiveMsg(String message) {
System.out.println("fanout :fanout_email_queue---->"+message);
}
}
结果
成功消费到消息