首先,我们都是由于导入了这个依赖,从而在SpringBoot中使用了RocketMQ

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-spring-boot-starter</artifactId>
    4. <version>2.1.0</version>
    5. </dependency>

    在其POM.xml中,可以看到其依赖项,其中 rocketmq-spring-boot 为关键

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <!--
    3. ~ Licensed to the Apache Software Foundation (ASF) under one or more
    4. ~ contributor license agreements. See the NOTICE file distributed with
    5. ~ this work for additional information regarding copyright ownership.
    6. ~ The ASF licenses this file to You under the Apache License, Version 2.0
    7. ~ (the "License"); you may not use this file except in compliance with
    8. ~ the License. You may obtain a copy of the License at
    9. ~
    10. ~ http://www.apache.org/licenses/LICENSE-2.0
    11. ~
    12. ~ Unless required by applicable law or agreed to in writing, software
    13. ~ distributed under the License is distributed on an "AS IS" BASIS,
    14. ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15. ~ See the License for the specific language governing permissions and
    16. ~ limitations under the License.
    17. -->
    18. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    19. <modelVersion>4.0.0</modelVersion>
    20. <parent>
    21. <groupId>org.apache.rocketmq</groupId>
    22. <artifactId>rocketmq-spring-boot-parent</artifactId>
    23. <version>2.1.0</version>
    24. <relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
    25. </parent>
    26. <artifactId>rocketmq-spring-boot-starter</artifactId>
    27. <packaging>jar</packaging>
    28. <name>RocketMQ Spring Boot Starter</name>
    29. <description>RocketMQ Spring Boot Starter</description>
    30. <url>https://github.com/apache/rocketmq-spring</url>
    31. <dependencies>
    32. <dependency>
    33. <groupId>org.apache.rocketmq</groupId>
    34. <artifactId>rocketmq-spring-boot</artifactId>
    35. </dependency>
    36. <dependency>
    37. <groupId>org.springframework.boot</groupId>
    38. <artifactId>spring-boot-starter</artifactId>
    39. </dependency>
    40. <dependency>
    41. <groupId>org.springframework.boot</groupId>
    42. <artifactId>spring-boot-starter-validation</artifactId>
    43. </dependency>
    44. </dependencies>
    45. </project>

    在其Jar包中,META-INF下的spring,factories,其中必然是自动装配了一些东东

    1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    2. org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

    可以看到,其自动装配了 RocketMQAutoConfiguration,所以,就看看这个类干了什么

    • 首先,先看这个类的头,首先他是一个@Configuration,所以,类中的所有方法,都是一个Bean
    • @EnableConfigurationProperties({RocketMQProperties.class}) 启动配置的属性,将application配置文件,读取到配置类中。
    • 再者,通过@Import注解,导入了如下几个类,在后续一一分析

      • MessageConverterConfiguration
      • ListenerContainerConfiguration
      • ExtProducerResetConfiguration
      • RocketMQTransactionConfiguration
        1. @Configuration
        2. @EnableConfigurationProperties({RocketMQProperties.class})
        3. @ConditionalOnClass({MQAdmin.class})
        4. @ConditionalOnProperty(
        5. prefix = "rocketmq",
        6. value = {"name-server"},
        7. matchIfMissing = true
        8. )
        9. @Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
        10. @AutoConfigureAfter({MessageConverterConfiguration.class})
        11. @AutoConfigureBefore({RocketMQTransactionConfiguration.class})
        12. public class RocketMQAutoConfiguration
        先看 RocketMQAutoConfiguration 中的方法为我们导入了哪些的组件
    • 通过JSR250规范的注解(PostConstruct),实现了Bean的生命周期,在初始化这个bean的时候,检查nameServer的配置情况

      1. @PostConstruct
      2. public void checkProperties() {
      3. String nameServer = (String)this.environment.getProperty("rocketmq.name-server", String.class);
      4. log.debug("rocketmq.nameServer = {}", nameServer);
      5. if (nameServer == null) {
      6. log.warn("The necessary spring property 'rocketmq.name-server' is not defined, all rockertmq beans creation are skipped!");
      7. }
      8. }
    • 创建DefaultMQProducer,创建默认的消息生产者,用于消息的发送

      1. public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties){
      2. }
    • 创建Spring封装的发送消息模板,用户消息的发送,这就是我们在代码中,注入的RocketMQTemplate,就是从这里来的

      1. public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, RocketMQMessageConverter rocketMQMessageConverter) {
      2. RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
      3. rocketMQTemplate.setProducer(mqProducer);
      4. rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
      5. return rocketMQTemplate;
      6. }

      再看通过@Import导入的四个类

    • MessageConverterConfiguration

      • 创建一个消息的转化器,用于发送消息时的转化,这就是我们在producer发送的时候 convertAndSend,为什么不用发Message对象,而可以直接发送的原因。

        1. @Configuration
        2. @ConditionalOnMissingBean({RocketMQMessageConverter.class})
        3. class MessageConverterConfiguration {
        4. MessageConverterConfiguration() {
        5. }
        6. @Bean
        7. public RocketMQMessageConverter createRocketMQMessageConverter() {
        8. return new RocketMQMessageConverter();
        9. }
        10. }
    • ListenerContainerConfiguration

    首先,看到这个类实现了两个接口 :ApplicationContextAware 以及 SmartInitializingSingleton,

    1. @Configuration
    2. public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton

    ApplicationContextAware用于给当前容器注入ApplicationContext对象,在IOC容器启动的时候,会调用当前bean的setApplicationContext方法,将ApplicationContext对象赋值进来。

    1. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    2. this.applicationContext = (ConfigurableApplicationContext)applicationContext;
    3. }

    而看到 SmartInitializingSingleton ,二话不说,立刻调到它的 afterSingletonsInstantiated方法中,会在IOC容器的最后一步中,创建bean时候,判断如果是这个类型的,就会执行 afterSingletonsInstantiated 这个方法。

    1. public void afterSingletonsInstantiated() {
    2. Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter((entry) -> {
    3. return !ScopedProxyUtils.isScopedTarget((String)entry.getKey());
    4. }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
    5. beans.forEach(this::registerContainer);
    6. }

    其逻辑为 获取所有标注了 RocketMQMessageListener 的类,依次调用本类的 registerContainer方法
    关键代码:将所有的 RocketMQMessageListener 注册到容器中

    1. genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> {
    2. return this.createRocketMQListenerContainer(containerBeanName, bean, annotation);
    3. }, new BeanDefinitionCustomizer[0]);
    • ExtProducerResetConfiguration

    首先,这个类和上一个一样,实现了 ApplicationContextAware,SmartInitializingSingleton

    1. @Configuration
    2. public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton

    转而看向 afterSingletonsInstantiated,这里是获取所有标注了 ExtRocketMQTemplateConfiguration 的bean,如果有,调用 registerTemplate

    1. public void afterSingletonsInstantiated() {
    2. Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class).entrySet().stream().filter((entry) -> {
    3. return !ScopedProxyUtils.isScopedTarget((String)entry.getKey());
    4. }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
    5. beans.forEach(this::registerTemplate);
    6. }
    • RocketMQTransactionConfiguration,RocketMQ对事务消息的支持

    首先,这个类和上一个一样,实现了 ApplicationContextAware,SmartInitializingSingleton

    1. @Configuration
    2. public class RocketMQTransactionConfiguration implements ApplicationContextAware, SmartInitializingSingleton

    转而看向 afterSingletonsInstantiated,这里获取所有标注了 RocketMQTransactionListener 注解的 bean,调用registerTransactionListener 注册 事务消息的监听器

    1. public void afterSingletonsInstantiated() {
    2. Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class)
    3. .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
    4. .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    5. beans.forEach(this::registerTransactionListener);
    6. }

    往producer中添加添加线程池 (setExecutorService),设置事务的监听器 (setTransactionListener)

    1. private void registerTransactionListener(String beanName, Object bean) {
    2. Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
    3. if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {
    4. throw new IllegalStateException(clazz + " is not instance of " + RocketMQLocalTransactionListener.class.getName());
    5. }
    6. RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);
    7. RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());
    8. if (((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null) {
    9. throw new IllegalStateException(annotation.rocketMQTemplateBeanName() + " already exists RocketMQLocalTransactionListener");
    10. }
    11. ((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(annotation.corePoolSize(), annotation.maximumPoolSize(),
    12. annotation.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(annotation.blockingQueueSize())));
    13. ((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
    14. log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());
    15. }