模拟业务生产者

案例:模拟订单生成之后分发消息

配置文件

  1. server:
  2. port: 29898
  3. # 配置rabbitmq
  4. spring:
  5. rabbitmq:
  6. host: 112.74.175.76
  7. username: admin
  8. password: admin
  9. virtual-host: /
  10. port: 5672

业务类

@Slf4j
@Service
public class OrderService {
    //    获取rabbitmq 对象
    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 模拟下订单
     *
     * @return
     */
    public boolean directOrder(String userId, String productId, Integer num) {
        //保存订单业务执行
        String orderId = UUID.randomUUID().toString();
        log.info("订单业务执行完毕");
        log.info("开始异步执行消息分发");
        //通过MQ进行消息分发
        //定义交换机
        String exchangeName = "direct_order_exchange";

        //消息分发
        rabbitTemplate.convertAndSend(exchangeName, "email", orderId);
        rabbitTemplate.convertAndSend(exchangeName, "sms", orderId);
        return false;
    }
}

配置类

配置路由key

@Configuration
public class RabbitConfiguration {
    //    声明注册direct模式的交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_order_exchange", true, false);
    }

    //    声明所要用到的队列:sms msg emile
    @Bean
    public Queue smsQueue() {
        return new Queue("direct_sms_queue");
    }

    @Bean
    public Queue msgQueue() {
        return new Queue("direct_msg_queue");
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("direct_email_queue");
    }

    //    完成交换机和队列,路由key的绑定
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding msgBinding() {
        return BindingBuilder.bind(msgQueue()).to(directExchange()).with("msg");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }
}

测试类

@SpringBootTest
class OrderServiceTest {

    @Autowired
    OrderService orderService;
    @Test
    void directOrder() {
        orderService.directOrder("2", "1", 13);
    }
}

模拟消费者

配置文件

server:
  port: 29897

# 配置rabbitmq
spring:
  rabbitmq:
    host: 112.74.175.76
    username: admin
    password: admin
    virtual-host: /
    port: 5672

消费者监听

通过注解【@RabbitListener(queues = {“email_queue”})】进行监听队列消息
通过注解【@RabbitHandler】将方法定义为rabbitmq的消费方法进行消费消息
注意队列名称必须和生产者生成的队列名称保持一致

@Component
@RabbitListener(queues = {"direct_email_queue"})
public class DirectEmailConsumer {
    @RabbitHandler
    public void receiveMsg(String message) {
        System.out.println("direct :direct_email_queue---->"+message);
    }

}

结果

成功消费到消息
image.png

总结:

为了防止出现生产者的交换机和队列还没有创建,导致消费者无法监听或者监听不到队列的情况出现,我们可以将配置类在消费者中也配置一份即可