首先,我们都是由于导入了这个依赖,从而在SpringBoot中使用了RocketMQ
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
在其POM.xml中,可以看到其依赖项,其中 rocketmq-spring-boot 为关键
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-parent</artifactId>
<version>2.1.0</version>
<relativePath>../rocketmq-spring-boot-parent/pom.xml</relativePath>
</parent>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<packaging>jar</packaging>
<name>RocketMQ Spring Boot Starter</name>
<description>RocketMQ Spring Boot Starter</description>
<url>https://github.com/apache/rocketmq-spring</url>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
</dependencies>
</project>
在其Jar包中,META-INF下的spring,factories,其中必然是自动装配了一些东东
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
可以看到,其自动装配了 RocketMQAutoConfiguration,所以,就看看这个类干了什么
- 首先,先看这个类的头,首先他是一个@Configuration,所以,类中的所有方法,都是一个Bean
- @EnableConfigurationProperties({RocketMQProperties.class}) 启动配置的属性,将application配置文件,读取到配置类中。
再者,通过@Import注解,导入了如下几个类,在后续一一分析
- MessageConverterConfiguration
- ListenerContainerConfiguration
- ExtProducerResetConfiguration
- RocketMQTransactionConfiguration
先看 RocketMQAutoConfiguration 中的方法为我们导入了哪些的组件@Configuration
@EnableConfigurationProperties({RocketMQProperties.class})
@ConditionalOnClass({MQAdmin.class})
@ConditionalOnProperty(
prefix = "rocketmq",
value = {"name-server"},
matchIfMissing = true
)
@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, RocketMQTransactionConfiguration.class})
@AutoConfigureAfter({MessageConverterConfiguration.class})
@AutoConfigureBefore({RocketMQTransactionConfiguration.class})
public class RocketMQAutoConfiguration
通过JSR250规范的注解(PostConstruct),实现了Bean的生命周期,在初始化这个bean的时候,检查nameServer的配置情况
@PostConstruct
public void checkProperties() {
String nameServer = (String)this.environment.getProperty("rocketmq.name-server", String.class);
log.debug("rocketmq.nameServer = {}", nameServer);
if (nameServer == null) {
log.warn("The necessary spring property 'rocketmq.name-server' is not defined, all rockertmq beans creation are skipped!");
}
}
创建DefaultMQProducer,创建默认的消息生产者,用于消息的发送
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties){
}
创建Spring封装的发送消息模板,用户消息的发送,这就是我们在代码中,注入的RocketMQTemplate,就是从这里来的
public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, RocketMQMessageConverter rocketMQMessageConverter) {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
return rocketMQTemplate;
}
再看通过@Import导入的四个类
MessageConverterConfiguration
创建一个消息的转化器,用于发送消息时的转化,这就是我们在producer发送的时候 convertAndSend,为什么不用发Message对象,而可以直接发送的原因。
@Configuration
@ConditionalOnMissingBean({RocketMQMessageConverter.class})
class MessageConverterConfiguration {
MessageConverterConfiguration() {
}
@Bean
public RocketMQMessageConverter createRocketMQMessageConverter() {
return new RocketMQMessageConverter();
}
}
ListenerContainerConfiguration
首先,看到这个类实现了两个接口 :ApplicationContextAware 以及 SmartInitializingSingleton,
@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton
ApplicationContextAware用于给当前容器注入ApplicationContext对象,在IOC容器启动的时候,会调用当前bean的setApplicationContext方法,将ApplicationContext对象赋值进来。
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext)applicationContext;
}
而看到 SmartInitializingSingleton ,二话不说,立刻调到它的 afterSingletonsInstantiated方法中,会在IOC容器的最后一步中,创建bean时候,判断如果是这个类型的,就会执行 afterSingletonsInstantiated 这个方法。
public void afterSingletonsInstantiated() {
Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter((entry) -> {
return !ScopedProxyUtils.isScopedTarget((String)entry.getKey());
}).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
beans.forEach(this::registerContainer);
}
其逻辑为 获取所有标注了 RocketMQMessageListener 的类,依次调用本类的 registerContainer方法
关键代码:将所有的 RocketMQMessageListener 注册到容器中
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> {
return this.createRocketMQListenerContainer(containerBeanName, bean, annotation);
}, new BeanDefinitionCustomizer[0]);
- ExtProducerResetConfiguration
首先,这个类和上一个一样,实现了 ApplicationContextAware,SmartInitializingSingleton
@Configuration
public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton
转而看向 afterSingletonsInstantiated,这里是获取所有标注了 ExtRocketMQTemplateConfiguration 的bean,如果有,调用 registerTemplate
public void afterSingletonsInstantiated() {
Map<String, Object> beans = (Map)this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class).entrySet().stream().filter((entry) -> {
return !ScopedProxyUtils.isScopedTarget((String)entry.getKey());
}).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
beans.forEach(this::registerTemplate);
}
- RocketMQTransactionConfiguration,RocketMQ对事务消息的支持
首先,这个类和上一个一样,实现了 ApplicationContextAware,SmartInitializingSingleton
@Configuration
public class RocketMQTransactionConfiguration implements ApplicationContextAware, SmartInitializingSingleton
转而看向 afterSingletonsInstantiated,这里获取所有标注了 RocketMQTransactionListener 注解的 bean,调用registerTransactionListener 注册 事务消息的监听器
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerTransactionListener);
}
往producer中添加添加线程池 (setExecutorService),设置事务的监听器 (setTransactionListener)
private void registerTransactionListener(String beanName, Object bean) {
Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {
throw new IllegalStateException(clazz + " is not instance of " + RocketMQLocalTransactionListener.class.getName());
}
RocketMQTransactionListener annotation = clazz.getAnnotation(RocketMQTransactionListener.class);
RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) applicationContext.getBean(annotation.rocketMQTemplateBeanName());
if (((TransactionMQProducer) rocketMQTemplate.getProducer()).getTransactionListener() != null) {
throw new IllegalStateException(annotation.rocketMQTemplateBeanName() + " already exists RocketMQLocalTransactionListener");
}
((TransactionMQProducer) rocketMQTemplate.getProducer()).setExecutorService(new ThreadPoolExecutor(annotation.corePoolSize(), annotation.maximumPoolSize(),
annotation.keepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(annotation.blockingQueueSize())));
((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(RocketMQUtil.convert((RocketMQLocalTransactionListener) bean));
log.debug("RocketMQLocalTransactionListener {} register to {} success", clazz.getName(), annotation.rocketMQTemplateBeanName());
}