首先,我们都是由于导入了这个依赖,从而在SpringBoot中使用了RocketMQ
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version></dependency>
在其POM.xml中,可以看到其依赖项,其中 rocketmq-spring-boot 为关键
<?xml version="1.0" encoding="UTF-8"?><!--~ Licensed to the Apache Software Foundation (ASF) under one or more~ contributor license agreements. See the NOTICE file distributed with~ this work for additional information regarding copyright ownership.~ The ASF licenses this file to You under the Apache License, Version 2.0~ (the "License"); you may not use this file except in compliance with~ the License. You may obtain a copy of the License at~~ http://www.apache.org/licenses/LICENSE-2.0~~ Unless required by applicable law or agreed to in writing, software~ distributed under the License is distributed on an "AS IS" BASIS,~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.~ See the License for the specific language governing permissions and~ limitations under the License.--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-parent</artifactId><version>2.1.0</version><relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath></parent><artifactId>rocketmq-spring-boot-starter</artifactId><packaging>jar</packaging><name>RocketMQ Spring Boot Starter</name><description>RocketMQ Spring Boot Starter</description><url>https://github.com/apache/rocketmq-spring</url><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId></dependency></dependencies></project>
在其Jar包中,META-INF下的spring,factories,其中必然是自动装配了一些东东
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
可以看到,其自动装配了 RocketMQAutoConfiguration,所以,就看看这个类干了什么
- 首先,先看这个类的头,首先他是一个@Configuration,所以,类中的所有方法,都是一个Bean
- @EnableConfigurationProperties({RocketMQProperties.class}) 启动配置的属性,将application配置文件,读取到配置类中。
再者,通过@Import注解,导入了如下几个类,在后续一一分析
- MessageConverterConfiguration
- ListenerContainerConfiguration
- ExtProducerResetConfiguration
- RocketMQTransactionConfiguration
先看 RocketMQAutoConfiguration 中的方法为我们导入了哪些的组件@Configuration@EnableConfigurationProperties({RocketMQProperties.class})@ConditionalOnClass({MQAdmin.class})@ConditionalOnProperty(prefix = "rocketmq",value = {"name-server"},matchIfMissing = true)@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})@AutoConfigureAfter({MessageConverterConfiguration.class})@AutoConfigureBefore({RocketMQTransactionConfiguration.class})public class RocketMQAutoConfiguration
通过JSR250规范的注解(PostConstruct),实现了Bean的生命周期,在初始化这个bean的时候,检查nameServer的配置情况
@PostConstructpublic void checkProperties() {String nameServer = (String)this.environment.getProperty("rocketmq.name-server", String.class);log.debug("rocketmq.nameServer = {}", nameServer);if (nameServer == null) {log.warn("The necessary spring property 'rocketmq.name-server' is not defined, all rockertmq beans creation are skipped!");}}
创建DefaultMQProducer,创建默认的消息生产者,用于消息的发送
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties){}
创建Spring封装的发送消息模板,用户消息的发送,这就是我们在代码中,注入的RocketMQTemplate,就是从这里来的
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, RocketMQMessageConverter rocketMQMessageConverter) {RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();rocketMQTemplate.setProducer(mqProducer);rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());return rocketMQTemplate;}
再看通过@Import导入的四个类
MessageConverterConfiguration
创建一个消息的转化器,用于发送消息时的转化,这就是我们在producer发送的时候 convertAndSend,为什么不用发Message对象,而可以直接发送的原因。
@Configuration@ConditionalOnMissingBean({RocketMQMessageConverter.class})class MessageConverterConfiguration {MessageConverterConfiguration() {}@Beanpublic RocketMQMessageConverter createRocketMQMessageConverter() {return new RocketMQMessageConverter();}}
ListenerContainerConfiguration
首先,看到这个类实现了两个接口 :ApplicationContextAware 以及 SmartInitializingSingleton,
@Configurationpublic class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton
ApplicationContextAware用于给当前容器注入ApplicationContext对象,在IOC容器启动的时候,会调用当前bean的setApplicationContext方法,将ApplicationContext对象赋值进来。
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = (ConfigurableApplicationContext)applicationContext;}
而看到 SmartInitializingSingleton ,二话不说,立刻调到它的 afterSingletonsInstantiated方法中,会在IOC容器的最后一步中,创建bean时候,判断如果是这个类型的,就会执行 afterSingletonsInstantiated 这个方法。
public void afterSingletonsInstantiated() {Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter((entry) -> {return !ScopedProxyUtils.isScopedTarget((String)entry.getKey());}).collect(Collectors.toMap(Entry::getKey, Entry::getValue));beans.forEach(this::registerContainer);}
其逻辑为 获取所有标注了 RocketMQMessageListener 的类,依次调用本类的 registerContainer方法
关键代码:将所有的 RocketMQMessageListener 注册到容器中
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> {return this.createRocketMQListenerContainer(containerBeanName, bean, annotation);}, new BeanDefinitionCustomizer[0]);
- ExtProducerResetConfiguration
首先,这个类和上一个一样,实现了 ApplicationContextAware,SmartInitializingSingleton
@Configurationpublic class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton
转而看向 afterSingletonsInstantiated,这里是获取所有标注了 ExtRocketMQTemplateConfiguration 的bean,如果有,调用 registerTemplate
public void afterSingletonsInstantiated() {Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class).entrySet().stream().filter((entry) -> {return !ScopedProxyUtils.isScopedTarget((String)entry.getKey());}).collect(Collectors.toMap(Entry::getKey, Entry::getValue));beans.forEach(this::registerTemplate);}
- RocketMQTransactionConfiguration,RocketMQ对事务消息的支持
首先,这个类和上一个一样,实现了 ApplicationContextAware,SmartInitializingSingleton
@Configurationpublic class RocketMQTransactionConfiguration implements ApplicationContextAware, SmartInitializingSingleton
转而看向 afterSingletonsInstantiated,这里获取所有标注了 RocketMQTransactionListener 注解的 bean,调用registerTransactionListener 注册 事务消息的监听器
public void afterSingletonsInstantiated() {Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class).entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));beans.forEach(this::registerTransactionListener);}
往producer中添加添加线程池 (setExecutorService),设置事务的监听器 (setTransactionListener)
private void registerTransactionListener(String beanName, Object bean) {Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {throw new IllegalStateException(clazz + " is not instance of " + RocketMQLocalTransactionListener.class.getName());}RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());if (((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null) {throw new IllegalStateException(annotation.rocketMQTemplateBeanName() + " already exists RocketMQLocalTransactionListener");}((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(annotation.corePoolSize(), annotation.maximumPoolSize(),annotation.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(annotation.blockingQueueSize())));((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());}
