模拟业务生产者
配置文件
server:
port: 29898
# 配置rabbitmq
spring:
rabbitmq:
host: 112.74.175.76
username: admin
password: admin
virtual-host: /
port: 5672
业务类
@Slf4j
@Service
public class OrderService {
// 获取rabbitmq 对象
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 模拟下订单
*
* @return
*/
public boolean directOrder(String userId, String productId, Integer num) {
//保存订单业务执行
String orderId = UUID.randomUUID().toString();
log.info("订单业务执行完毕");
log.info("开始异步执行消息分发");
//通过MQ进行消息分发
//定义交换机
String exchangeName = "direct_order_exchange";
//消息分发
rabbitTemplate.convertAndSend(exchangeName, "email", orderId);
rabbitTemplate.convertAndSend(exchangeName, "sms", orderId);
return false;
}
}
配置类
配置路由key
@Configuration
public class RabbitConfiguration {
// 声明注册direct模式的交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct_order_exchange", true, false);
}
// 声明所要用到的队列:sms msg emile
@Bean
public Queue smsQueue() {
return new Queue("direct_sms_queue");
}
@Bean
public Queue msgQueue() {
return new Queue("direct_msg_queue");
}
@Bean
public Queue emailQueue() {
return new Queue("direct_email_queue");
}
// 完成交换机和队列,路由key的绑定
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding msgBinding() {
return BindingBuilder.bind(msgQueue()).to(directExchange()).with("msg");
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
}
}
测试类
@SpringBootTest
class OrderServiceTest {
@Autowired
OrderService orderService;
@Test
void directOrder() {
orderService.directOrder("2", "1", 13);
}
}
模拟消费者
配置文件
server:
port: 29897
# 配置rabbitmq
spring:
rabbitmq:
host: 112.74.175.76
username: admin
password: admin
virtual-host: /
port: 5672
消费者监听
通过注解【@RabbitListener(queues = {“email_queue”})】进行监听队列消息
通过注解【@RabbitHandler】将方法定义为rabbitmq的消费方法进行消费消息
注意队列名称必须和生产者生成的队列名称保持一致
@Component
@RabbitListener(queues = {"direct_email_queue"})
public class DirectEmailConsumer {
@RabbitHandler
public void receiveMsg(String message) {
System.out.println("direct :direct_email_queue---->"+message);
}
}
结果
成功消费到消息
总结:
为了防止出现生产者的交换机和队列还没有创建,导致消费者无法监听或者监听不到队列的情况出现,我们可以将配置类在消费者中也配置一份即可