一、在springboot中引入rabbitmq的场景之后,并启动springboot主启动类,此时rabbitmq通过RabbitAutoConfiguration类自动装配了那些配置?
    1、创建连接并获取rabbitmq的服务,将springboot配置文件有关rabbitmq的配置封装到RabbitProperties中
    image.png
    image.png
    2、给spring容器中加入RabbitTemplate工具类

    1. @Bean
    2. @ConditionalOnSingleCandidate(ConnectionFactory.class)
    3. @ConditionalOnMissingBean(RabbitOperations.class)
    4. public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    5. RabbitTemplate template = new RabbitTemplate();
    6. configurer.configure(template, connectionFactory);
    7. return template;
    8. }

    3、自动装配时给RabbitTemplate在springboot配置文件的配置

    1. @Bean
    2. @ConditionalOnMissingBean
    3. public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties,
    4. ObjectProvider<MessageConverter> messageConverter,
    5. ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
    6. RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer(properties);
    7. configurer.setMessageConverter(messageConverter.getIfUnique());
    8. configurer
    9. .setRetryTemplateCustomizers(retryTemplateCustomizers.orderedStream().collect(Collectors.toList()));
    10. return configurer;
    11. }

    二、e6yun3.0 自己封装的rabbitmq
    1、配置类自动装配了连接rabbitmq服务时所需要的几大件

    1. @Configuration
    2. public class AmqpConfig {
    3. @Bean
    4. @Primary
    5. public ConnectionFactory defaultConnectionFactory(
    6. @Value("${spring.rabbitmq.default.host}") String host,
    7. @Value("${spring.rabbitmq.default.port}") int port,
    8. @Value("${spring.rabbitmq.default.username}") String username,
    9. @Value("${spring.rabbitmq.default.password}") String password,
    10. @Value("${spring.rabbitmq.default.virtual-host}") String virtualHost) {
    11. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    12. connectionFactory.setHost(host);
    13. connectionFactory.setPort(port);
    14. connectionFactory.setUsername(username);
    15. connectionFactory.setPassword(password);
    16. connectionFactory.setVirtualHost(virtualHost);
    17. return connectionFactory;
    18. }

    2、专门对与消费者进行的配置

    1. @Bean
    2. public SimpleRabbitListenerContainerFactory defaultFactory(
    3. SimpleRabbitListenerContainerFactoryConfigurer configurer,
    4. @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
    5. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    6. configurer.configure(factory, connectionFactory);
    7. factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    8. return factory;
    9. }

    3、消费者

    1. public class TmsEventMessageReceiver {
    2. private static Logger logger = LoggerFactory.getLogger(TmsEventMessageReceiver.class);
    3. /**
    4. 消费者基于注解形式的创建和绑定
    5. */
    6. @RabbitListener(bindings = {@QueueBinding(
    7. value = @Queue(value = "${e6yun3.event.receiver.queue.name}", // 给存放消息的队列起一个名字
    8. durable = "${e6yun3.event.receiver.queue.durable:true}", // 是否持久化
    9. autoDelete = "${e6yun3.event.receiver.queue.autoDelete:true}",// 消息消费了之后是否进行删除
    10. exclusive = "${e6yun3.event.receiver.queue.exclusive:false}"),// 是否具有排他性
    11. exchange = @Exchange(value = "${e6yun3.event.receiver.exchange}", // 给交换机起一个名字
    12. type = "${e6yun3.event.receiver.exchange.type:topic}"), // 指定交换机类型
    13. key = "${e6yun3.event.receiver.exchange.routing.key}")}, // 用来指定消费者和交换机中间的路由匹配规则
    14. containerFactory = "defaultFactory") // 一般使用默认的连接工厂
    15. public void processor(Message message, Channel channel, @Header(name = "amqp_deliveryTag") long deliveryTag,
    16. @Header("amqp_redelivered") boolean redelivered) {
    17. /**
    18. 消费者拿到消息之后,对该消息进行处理的业务
    19. */
    20. byte[] body = message.getBody();
    21. String messageStr = "";
    22. try {
    23. messageStr = new String(body, "utf-8");
    24. } catch (UnsupportedEncodingException e) {
    25. e.printStackTrace();
    26. }
    27. //如果发送方有压缩,这里要解压
    28. String msgInfo = GZipUtils.gunzip(messageStr, "GB2312");
    29. //业务处理在这里
    30. logger.info(msgInfo);
    31. MqHandleUtils.handleReceiver(channel, deliveryTag);
    32. }
    33. }

    4、生产者

    1. /**
    2. 该消息发送的类中,两个方法进行了重载,并重载的两个方法的区别就是多了一个文本类型
    3. 并且两个方法的发送机制都是调用rabbitTemplate.convertAndSend(exchangeName, routingKey, messageBean);这个方法,需要三个参数
    4. 1、交换机的名称
    5. 2、消息队列和交换机的匹配规则
    6. 3、所要发送的消息 (messageBean)
    7. */
    8. public void sendMessage(RabbitTemplateEnum rabbitTemplateEnum, String exchangeName, String routingKey, String message, boolean zip) {
    9. //判断是否加密
    10. if(zip) {
    11. message = GZipUtils.gzip(message, GZipUtils.DEFAULT_CHARSET);
    12. }
    13. //构造mq template
    14. RabbitTemplate rabbitTemplate = beanFactory.getBean(rabbitTemplateEnum.getBeanName(), RabbitTemplate.class);
    15. Message messageBean = MessageBuilder.withBody(message.getBytes())
    16. .setHeader("routingKey",routingKey)
    17. .build();
    18. //推送mq
    19. rabbitTemplate.convertAndSend(exchangeName, routingKey, messageBean);
    20. logger.info("推送mq消息routingKey:{},内容:{}",routingKey, message);
    21. }