Spring EnableJms 注解
- Author: HuiFer
- 源码阅读仓库: SourceHot-spring
- 源码路径:
org.springframework.jms.annotation.EnableJms
源码分析
@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(JmsBootstrapConfiguration.class)public @interface EnableJms {}
- 该类的切入点在
@Import(JmsBootstrapConfiguration.class), 直接看JmsBootstrapConfiguration就可以了
@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class JmsBootstrapConfiguration {/*** jms 监听注解后处理, 将{@link JmsListener} 注册到{@link JmsListenerContainerFactory}* @return*/@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public JmsListenerAnnotationBeanPostProcessor jmsListenerAnnotationProcessor() {return new JmsListenerAnnotationBeanPostProcessor();}/*** JMS 监听注册* @return*/@Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)public JmsListenerEndpointRegistry defaultJmsListenerEndpointRegistry() {return new JmsListenerEndpointRegistry();}}
JmsListenerAnnotationBeanPostProcessor
类图

主要关注
- afterSingletonsInstantiated
- postProcessAfterInitialization
afterSingletonsInstantiated
@Overridepublic void afterSingletonsInstantiated() {// Remove resolved singleton classes from cachethis.nonAnnotatedClasses.clear();if (this.beanFactory instanceof ListableBeanFactory) {// Apply JmsListenerConfigurer beans from the BeanFactory, if any// 根据类型获取beanMap<String, JmsListenerConfigurer> beans =((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());// 排序 OrderAnnotationAwareOrderComparator.sort(configurers);for (JmsListenerConfigurer configurer : configurers) {// 放入jms监听配置,开发者自定义configurer.configureJmsListeners(this.registrar);}}if (this.containerFactoryBeanName != null) {this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);}if (this.registrar.getEndpointRegistry() == null) {// Determine JmsListenerEndpointRegistry bean from the BeanFactoryif (this.endpointRegistry == null) {Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name");this.endpointRegistry = this.beanFactory.getBean(JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}// Set the custom handler method factory once resolved by the configurerMessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory != null) {this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);}// Actually register all listenersthis.registrar.afterPropertiesSet();}
关注最后一行
this.registrar.afterPropertiesSet()@Overridepublic void afterPropertiesSet() {registerAllEndpoints();}protected void registerAllEndpoints() {Assert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");synchronized (this.mutex) {for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {// 注册监听this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true; // trigger immediate startup}}
注册监听在下面分析会讲详见下文
postProcessAfterInitialization
@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||bean instanceof JmsListenerEndpointRegistry) {// Ignore AOP infrastructure such as scoped proxies.return bean;}// 获取 bean 的代理对象.classClass<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);if (!this.nonAnnotatedClasses.contains(targetClass)) {Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<JmsListener>>) method -> {Set<JmsListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, JmsListener.class, JmsListeners.class);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(targetClass);if (logger.isTraceEnabled()) {logger.trace("No @JmsListener annotations found on bean type: " + targetClass);}} else {// Non-empty set of methodsannotatedMethods.forEach((method, listeners) ->listeners.forEach(listener -> processJmsListener(listener, method, bean)));if (logger.isDebugEnabled()) {logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +"': " + annotatedMethods);}}}return bean;}
protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());// 设置 监听方法信息MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();endpoint.setBean(bean);endpoint.setMethod(invocableMethod);endpoint.setMostSpecificMethod(mostSpecificMethod);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);endpoint.setBeanFactory(this.beanFactory);endpoint.setId(getEndpointId(jmsListener));endpoint.setDestination(resolve(jmsListener.destination()));if (StringUtils.hasText(jmsListener.selector())) {endpoint.setSelector(resolve(jmsListener.selector()));}if (StringUtils.hasText(jmsListener.subscription())) {endpoint.setSubscription(resolve(jmsListener.subscription()));}if (StringUtils.hasText(jmsListener.concurrency())) {endpoint.setConcurrency(resolve(jmsListener.concurrency()));}JmsListenerContainerFactory<?> factory = null;String containerFactoryBeanName = resolve(jmsListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");try {factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);} catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register JMS listener endpoint on [" +mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +" with id '" + containerFactoryBeanName + "' was found in the application context", ex);}}// 注册监听点 到 JmsListenerContainerFactorythis.registrar.registerEndpoint(endpoint, factory);}
- 将监听点注册的重要方法
org.springframework.jms.config.JmsListenerEndpointRegistrar#registerEndpoint(org.springframework.jms.config.JmsListenerEndpoint, org.springframework.jms.config.JmsListenerContainerFactory<?>)
public void registerEndpoint(JmsListenerEndpoint endpoint, @Nullable JmsListenerContainerFactory<?> factory) {Assert.notNull(endpoint, "Endpoint must not be null");Assert.hasText(endpoint.getId(), "Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the container// jms 监听点描述JmsListenerEndpointDescriptor descriptor = new JmsListenerEndpointDescriptor(endpoint, factory);synchronized (this.mutex) {if (this.startImmediately) { // register and start immediatelyAssert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");// 注册this.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {this.endpointDescriptors.add(descriptor);}}}
org.springframework.jms.config.JmsListenerEndpointRegistry#registerListenerContainer(org.springframework.jms.config.JmsListenerEndpoint, org.springframework.jms.config.JmsListenerContainerFactory<?>, boolean)public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,boolean startImmediately) {Assert.notNull(endpoint, "Endpoint must not be null");Assert.notNull(factory, "Factory must not be null");String id = endpoint.getId();Assert.hasText(id, "Endpoint id must be set");synchronized (this.listenerContainers) {if (this.listenerContainers.containsKey(id)) {throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");}// 创建消息监听容器MessageListenerContainer container = createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if (startImmediately) {// 启动消息监听容器startIfNecessary(container);}}}
org.springframework.jms.config.JmsListenerEndpointRegistry#createListenerContainer
/*** Create and start a new container using the specified factory.* 创建监听容器*/protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint,JmsListenerContainerFactory<?> factory) {// 创建监听 容器MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);if (listenerContainer instanceof InitializingBean) {try {// 后置方法((InitializingBean) listenerContainer).afterPropertiesSet();} catch (Exception ex) {throw new BeanInitializationException("Failed to initialize message listener container", ex);}}int containerPhase = listenerContainer.getPhase();if (containerPhase < Integer.MAX_VALUE) { // a custom phase valueif (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +this.phase + " vs " + containerPhase);}this.phase = listenerContainer.getPhase();}return listenerContainer;}
关键接口
JmsListenerContainerFactory<C extends MessageListenerContainer>public interface JmsListenerContainerFactory<C extends MessageListenerContainer> {/*** Create a {@link MessageListenerContainer} for the given {@link JmsListenerEndpoint}.* 创建肩痛容器* @param endpoint the endpoint to configure* @return the created container*/C createListenerContainer(JmsListenerEndpoint endpoint);}

注册完成后是否立即启动
this.listenerContainers.put(id, container);if (startImmediately) {// 启动消息监听容器startIfNecessary(container);}private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {listenerContainer.start();}}
- 具体实现:
org.springframework.jms.listener.AbstractJmsListeningContainer#start
执行完
start方法就结束了processJmsListener的调用链路,postProcessAfterInitialization也结束了
JmsListenerEndpointRegistry
- 这个类辅助JmsListenerAnnotationBeanPostProcessor 处理
registerListenerContainer
/*** Create a message listener container for the given {@link JmsListenerEndpoint}.* <p>This create the necessary infrastructure to honor that endpoint* with regards to its configuration.* <p>The {@code startImmediately} flag determines if the container should be* started immediately.* <p>* 注册监听容器** @param endpoint the endpoint to add* 监听点* @param factory the listener factory to use* 监听容器工厂* @param startImmediately start the container immediately if necessary* 是否立即启动容器* @see #getListenerContainers()* @see #getListenerContainer(String)*/public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,boolean startImmediately) {Assert.notNull(endpoint, "Endpoint must not be null");Assert.notNull(factory, "Factory must not be null");String id = endpoint.getId();Assert.hasText(id, "Endpoint id must be set");synchronized (this.listenerContainers) {if (this.listenerContainers.containsKey(id)) {throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");}// 创建消息监听容器MessageListenerContainer container = createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if (startImmediately) {// 启动消息监听容器startIfNecessary(container);}}}
