注解的方式绑定交换机和队列

模拟业务生产者

案例:模拟订单生成之后分发消息

配置文件

  1. server:
  2. port: 29898
  3. # 配置rabbitmq
  4. spring:
  5. rabbitmq:
  6. host: 112.74.175.76
  7. username: admin
  8. password: admin
  9. virtual-host: /
  10. port: 5672

业务类

@Slf4j
@Service
public class OrderService {
    //    获取rabbitmq 对象
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 模拟下订单
     *
     * @return
     */
    public boolean topicOrder(String userId, String productId, Integer num) {
        //保存订单业务执行
        String orderId = UUID.randomUUID().toString();
        log.info("订单业务执行完毕");
        log.info("开始异步执行消息分发");
        //通过MQ进行消息分发
        //定义交换机
        String exchangeName = "topic_order_exchange";
        //定义路由key
        String routingKey = "com.msg.email";
        //消息分发
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
        return false;
    }
}

配置类

配置路由key

package com.example.topic.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbit config
 *
 * @author zukxu
 * CreateTime: 2021/5/20 0020 14:22
 */
@Configuration
public class RabbitConfiguration {
    //    声明注册topic模式的交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic_order_exchange", true, false);
    }

    //    声明所要用到的队列:sms msg emile
    @Bean
    public Queue smsQueue() {
        return new Queue("topic_sms_queue");
    }

    @Bean
    public Queue msgQueue() {
        return new Queue("topic_msg_queue");
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("topic_email_queue");
    }

    //    完成交换机和队列,路由key的绑定
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("#.sms.#");
    }

    @Bean
    public Binding msgBinding() {
        return BindingBuilder.bind(msgQueue()).to(topicExchange()).with("#.msg.#");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("#.email.#");
    }
}

测试类

@SpringBootTest
class OrderServiceTest {

    @Autowired
    OrderService orderService;
    @Test
    void topicOrder() {
        orderService.topicOrder("3", "1", 13);
    }
}

模拟消费者

配置文件

server:
  port: 29897

# 配置rabbitmq
spring:
  rabbitmq:
    host: 112.74.175.76
    username: admin
    password: admin
    virtual-host: /
    port: 5672

消费者监听

通过注解【@RabbitListener(queues = {“email_queue”})】进行监听队列消息
通过注解【@RabbitHandler】将方法定义为rabbitmq的消费方法进行消费消息
注意队列名称必须和生产者生成的队列名称保持一致

注解方式

@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "topic_msg_queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
        key = "#.msg.#"
))
public class TopicMsgConsumer {
    @RabbitHandler
    public void receiveMsg(String message) {
        System.out.println("topic :topic_msg_queue---->"+message);
    }
}

默认配置

@Component
@RabbitListener(queues = {"topic_sms_queue"})
public class TopicSmsConsumer {
    @RabbitHandler
    public void receiveMsg(String message) {
        System.out.println("topic :topic_sms_queue---->" + message);
    }

}

最好推荐使用配置类进行配置

结果

成功消费到消息
image.png