注解的方式绑定交换机和队列
模拟业务生产者
配置文件
server:
port: 29898
# 配置rabbitmq
spring:
rabbitmq:
host: 112.74.175.76
username: admin
password: admin
virtual-host: /
port: 5672
业务类
@Slf4j
@Service
public class OrderService {
// 获取rabbitmq 对象
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 模拟下订单
*
* @return
*/
public boolean topicOrder(String userId, String productId, Integer num) {
//保存订单业务执行
String orderId = UUID.randomUUID().toString();
log.info("订单业务执行完毕");
log.info("开始异步执行消息分发");
//通过MQ进行消息分发
//定义交换机
String exchangeName = "topic_order_exchange";
//定义路由key
String routingKey = "com.msg.email";
//消息分发
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
return false;
}
}
配置类
配置路由key
package com.example.topic.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbit config
*
* @author zukxu
* CreateTime: 2021/5/20 0020 14:22
*/
@Configuration
public class RabbitConfiguration {
// 声明注册topic模式的交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic_order_exchange", true, false);
}
// 声明所要用到的队列:sms msg emile
@Bean
public Queue smsQueue() {
return new Queue("topic_sms_queue");
}
@Bean
public Queue msgQueue() {
return new Queue("topic_msg_queue");
}
@Bean
public Queue emailQueue() {
return new Queue("topic_email_queue");
}
// 完成交换机和队列,路由key的绑定
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("#.sms.#");
}
@Bean
public Binding msgBinding() {
return BindingBuilder.bind(msgQueue()).to(topicExchange()).with("#.msg.#");
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("#.email.#");
}
}
测试类
@SpringBootTest
class OrderServiceTest {
@Autowired
OrderService orderService;
@Test
void topicOrder() {
orderService.topicOrder("3", "1", 13);
}
}
模拟消费者
配置文件
server:
port: 29897
# 配置rabbitmq
spring:
rabbitmq:
host: 112.74.175.76
username: admin
password: admin
virtual-host: /
port: 5672
消费者监听
通过注解【@RabbitListener(queues = {“email_queue”})】进行监听队列消息
通过注解【@RabbitHandler】将方法定义为rabbitmq的消费方法进行消费消息
注意队列名称必须和生产者生成的队列名称保持一致
注解方式
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic_msg_queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.msg.#"
))
public class TopicMsgConsumer {
@RabbitHandler
public void receiveMsg(String message) {
System.out.println("topic :topic_msg_queue---->"+message);
}
}
默认配置
@Component
@RabbitListener(queues = {"topic_sms_queue"})
public class TopicSmsConsumer {
@RabbitHandler
public void receiveMsg(String message) {
System.out.println("topic :topic_sms_queue---->" + message);
}
}
结果
成功消费到消息