Spring EnableJms 注解

源码分析

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @Import(JmsBootstrapConfiguration.class)
  5. public @interface EnableJms {
  6. }
  • 该类的切入点在@Import(JmsBootstrapConfiguration.class) , 直接看JmsBootstrapConfiguration就可以了
  1. @Configuration
  2. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  3. public class JmsBootstrapConfiguration {
  4. /**
  5. * jms 监听注解后处理, 将{@link JmsListener} 注册到{@link JmsListenerContainerFactory}
  6. * @return
  7. */
  8. @Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
  9. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  10. public JmsListenerAnnotationBeanPostProcessor jmsListenerAnnotationProcessor() {
  11. return new JmsListenerAnnotationBeanPostProcessor();
  12. }
  13. /**
  14. * JMS 监听注册
  15. * @return
  16. */
  17. @Bean(name = JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
  18. public JmsListenerEndpointRegistry defaultJmsListenerEndpointRegistry() {
  19. return new JmsListenerEndpointRegistry();
  20. }
  21. }

JmsListenerAnnotationBeanPostProcessor

类图

image-20200304085303580

  • 主要关注

    1. afterSingletonsInstantiated
  1. postProcessAfterInitialization

afterSingletonsInstantiated

  1. @Override
  2. public void afterSingletonsInstantiated() {
  3. // Remove resolved singleton classes from cache
  4. this.nonAnnotatedClasses.clear();
  5. if (this.beanFactory instanceof ListableBeanFactory) {
  6. // Apply JmsListenerConfigurer beans from the BeanFactory, if any
  7. // 根据类型获取bean
  8. Map<String, JmsListenerConfigurer> beans =
  9. ((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);
  10. List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());
  11. // 排序 Order
  12. AnnotationAwareOrderComparator.sort(configurers);
  13. for (JmsListenerConfigurer configurer : configurers) {
  14. // 放入jms监听配置,开发者自定义
  15. configurer.configureJmsListeners(this.registrar);
  16. }
  17. }
  18. if (this.containerFactoryBeanName != null) {
  19. this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
  20. }
  21. if (this.registrar.getEndpointRegistry() == null) {
  22. // Determine JmsListenerEndpointRegistry bean from the BeanFactory
  23. if (this.endpointRegistry == null) {
  24. Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name");
  25. this.endpointRegistry = this.beanFactory.getBean(
  26. JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class);
  27. }
  28. this.registrar.setEndpointRegistry(this.endpointRegistry);
  29. }
  30. // Set the custom handler method factory once resolved by the configurer
  31. MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
  32. if (handlerMethodFactory != null) {
  33. this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
  34. }
  35. // Actually register all listeners
  36. this.registrar.afterPropertiesSet();
  37. }
  • 关注最后一行this.registrar.afterPropertiesSet()

    1. @Override
    2. public void afterPropertiesSet() {
    3. registerAllEndpoints();
    4. }
    5. protected void registerAllEndpoints() {
    6. Assert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");
    7. synchronized (this.mutex) {
    8. for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
    9. // 注册监听
    10. this.endpointRegistry.registerListenerContainer(
    11. descriptor.endpoint, resolveContainerFactory(descriptor));
    12. }
    13. this.startImmediately = true; // trigger immediate startup
    14. }
    15. }
  • 注册监听在下面分析会讲详见下文

postProcessAfterInitialization

  1. @Override
  2. public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
  3. if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||
  4. bean instanceof JmsListenerEndpointRegistry) {
  5. // Ignore AOP infrastructure such as scoped proxies.
  6. return bean;
  7. }
  8. // 获取 bean 的代理对象.class
  9. Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
  10. if (!this.nonAnnotatedClasses.contains(targetClass)) {
  11. Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
  12. (MethodIntrospector.MetadataLookup<Set<JmsListener>>) method -> {
  13. Set<JmsListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
  14. method, JmsListener.class, JmsListeners.class);
  15. return (!listenerMethods.isEmpty() ? listenerMethods : null);
  16. });
  17. if (annotatedMethods.isEmpty()) {
  18. this.nonAnnotatedClasses.add(targetClass);
  19. if (logger.isTraceEnabled()) {
  20. logger.trace("No @JmsListener annotations found on bean type: " + targetClass);
  21. }
  22. } else {
  23. // Non-empty set of methods
  24. annotatedMethods.forEach((method, listeners) ->
  25. listeners.forEach(listener -> processJmsListener(listener, method, bean)));
  26. if (logger.isDebugEnabled()) {
  27. logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +
  28. "': " + annotatedMethods);
  29. }
  30. }
  31. }
  32. return bean;
  33. }
  1. protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {
  2. Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
  3. // 设置 监听方法信息
  4. MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
  5. endpoint.setBean(bean);
  6. endpoint.setMethod(invocableMethod);
  7. endpoint.setMostSpecificMethod(mostSpecificMethod);
  8. endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  9. endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
  10. endpoint.setBeanFactory(this.beanFactory);
  11. endpoint.setId(getEndpointId(jmsListener));
  12. endpoint.setDestination(resolve(jmsListener.destination()));
  13. if (StringUtils.hasText(jmsListener.selector())) {
  14. endpoint.setSelector(resolve(jmsListener.selector()));
  15. }
  16. if (StringUtils.hasText(jmsListener.subscription())) {
  17. endpoint.setSubscription(resolve(jmsListener.subscription()));
  18. }
  19. if (StringUtils.hasText(jmsListener.concurrency())) {
  20. endpoint.setConcurrency(resolve(jmsListener.concurrency()));
  21. }
  22. JmsListenerContainerFactory<?> factory = null;
  23. String containerFactoryBeanName = resolve(jmsListener.containerFactory());
  24. if (StringUtils.hasText(containerFactoryBeanName)) {
  25. Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
  26. try {
  27. factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
  28. } catch (NoSuchBeanDefinitionException ex) {
  29. throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
  30. mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
  31. " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
  32. }
  33. }
  34. // 注册监听点 到 JmsListenerContainerFactory
  35. this.registrar.registerEndpoint(endpoint, factory);
  36. }
  • 将监听点注册的重要方法 org.springframework.jms.config.JmsListenerEndpointRegistrar#registerEndpoint(org.springframework.jms.config.JmsListenerEndpoint, org.springframework.jms.config.JmsListenerContainerFactory<?>)
  1. public void registerEndpoint(JmsListenerEndpoint endpoint, @Nullable JmsListenerContainerFactory<?> factory) {
  2. Assert.notNull(endpoint, "Endpoint must not be null");
  3. Assert.hasText(endpoint.getId(), "Endpoint id must be set");
  4. // Factory may be null, we defer the resolution right before actually creating the container
  5. // jms 监听点描述
  6. JmsListenerEndpointDescriptor descriptor = new JmsListenerEndpointDescriptor(endpoint, factory);
  7. synchronized (this.mutex) {
  8. if (this.startImmediately) { // register and start immediately
  9. Assert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");
  10. // 注册
  11. this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
  12. resolveContainerFactory(descriptor), true);
  13. }
  14. else {
  15. this.endpointDescriptors.add(descriptor);
  16. }
  17. }
  18. }
  • org.springframework.jms.config.JmsListenerEndpointRegistry#registerListenerContainer(org.springframework.jms.config.JmsListenerEndpoint, org.springframework.jms.config.JmsListenerContainerFactory<?>, boolean)

    1. public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
    2. boolean startImmediately) {
    3. Assert.notNull(endpoint, "Endpoint must not be null");
    4. Assert.notNull(factory, "Factory must not be null");
    5. String id = endpoint.getId();
    6. Assert.hasText(id, "Endpoint id must be set");
    7. synchronized (this.listenerContainers) {
    8. if (this.listenerContainers.containsKey(id)) {
    9. throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");
    10. }
    11. // 创建消息监听容器
    12. MessageListenerContainer container = createListenerContainer(endpoint, factory);
    13. this.listenerContainers.put(id, container);
    14. if (startImmediately) {
    15. // 启动消息监听容器
    16. startIfNecessary(container);
    17. }
    18. }
    19. }
  • org.springframework.jms.config.JmsListenerEndpointRegistry#createListenerContainer

  1. /**
  2. * Create and start a new container using the specified factory.
  3. * 创建监听容器
  4. */
  5. protected MessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint,
  6. JmsListenerContainerFactory<?> factory) {
  7. // 创建监听 容器
  8. MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
  9. if (listenerContainer instanceof InitializingBean) {
  10. try {
  11. // 后置方法
  12. ((InitializingBean) listenerContainer).afterPropertiesSet();
  13. } catch (Exception ex) {
  14. throw new BeanInitializationException("Failed to initialize message listener container", ex);
  15. }
  16. }
  17. int containerPhase = listenerContainer.getPhase();
  18. if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
  19. if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
  20. throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
  21. this.phase + " vs " + containerPhase);
  22. }
  23. this.phase = listenerContainer.getPhase();
  24. }
  25. return listenerContainer;
  26. }
  • 关键接口JmsListenerContainerFactory<C extends MessageListenerContainer>

    1. public interface JmsListenerContainerFactory<C extends MessageListenerContainer> {
    2. /**
    3. * Create a {@link MessageListenerContainer} for the given {@link JmsListenerEndpoint}.
    4. * 创建肩痛容器
    5. * @param endpoint the endpoint to configure
    6. * @return the created container
    7. */
    8. C createListenerContainer(JmsListenerEndpoint endpoint);
    9. }

    image-20200304092154712

  • 注册完成后是否立即启动

    1. this.listenerContainers.put(id, container);
    2. if (startImmediately) {
    3. // 启动消息监听容器
    4. startIfNecessary(container);
    5. }
    6. private void startIfNecessary(MessageListenerContainer listenerContainer) {
    7. if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
    8. listenerContainer.start();
    9. }
    10. }
    • 具体实现: org.springframework.jms.listener.AbstractJmsListeningContainer#start
  • 执行完start方法就结束了processJmsListener的调用链路, postProcessAfterInitialization 也结束了

JmsListenerEndpointRegistry

  • 这个类辅助JmsListenerAnnotationBeanPostProcessor 处理

registerListenerContainer

  1. /**
  2. * Create a message listener container for the given {@link JmsListenerEndpoint}.
  3. * <p>This create the necessary infrastructure to honor that endpoint
  4. * with regards to its configuration.
  5. * <p>The {@code startImmediately} flag determines if the container should be
  6. * started immediately.
  7. * <p>
  8. * 注册监听容器
  9. *
  10. * @param endpoint the endpoint to add
  11. * 监听点
  12. * @param factory the listener factory to use
  13. * 监听容器工厂
  14. * @param startImmediately start the container immediately if necessary
  15. * 是否立即启动容器
  16. * @see #getListenerContainers()
  17. * @see #getListenerContainer(String)
  18. */
  19. public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
  20. boolean startImmediately) {
  21. Assert.notNull(endpoint, "Endpoint must not be null");
  22. Assert.notNull(factory, "Factory must not be null");
  23. String id = endpoint.getId();
  24. Assert.hasText(id, "Endpoint id must be set");
  25. synchronized (this.listenerContainers) {
  26. if (this.listenerContainers.containsKey(id)) {
  27. throw new IllegalStateException("Another endpoint is already registered with id '" + id + "'");
  28. }
  29. // 创建消息监听容器
  30. MessageListenerContainer container = createListenerContainer(endpoint, factory);
  31. this.listenerContainers.put(id, container);
  32. if (startImmediately) {
  33. // 启动消息监听容器
  34. startIfNecessary(container);
  35. }
  36. }
  37. }