Mongoosej.Blog.Software.Programming.Java.Framework.Spring.SpringFramework.ActiveMQ


MDBs are extremely useful when using a full Java EE container, but there is an alternative that doesn’t require the full Java EE container. Using the Spring Framework’s JMS APIs makes developing Message Driven POJOs (MDPs) very easy. That is, Plain Old Java Objects (POJOs) that act as if they’re message driven. In fact, this style of development has become quite popular in the Java development community because it avoids the overhead of using a full Java EE container. 当使用完整的 Java EE 容器时,MDB 非常有用,但还有一种替代方法不需要完整的 Java EE 容器。使用 Spring Framework 的 JMS API 使开发消息驱动 POJO (MDP) 非常简单。也就是说,Plain Old Java Objects (POJO) 就像是消息驱动的一样。事实上,这种开发方式在 Java 开发社区中非常流行,因为它避免了使用完整 Java EE 容器的开销。

ActiveMQ provides good Spring integration for configuring various aspects of client to broker communication and Spring framework, on the other hand, comes with support for easier JMS messaging. Together ActiveMQ and Spring make an excellent JMS development platform, making many common tasks easy to accomplish. Some of those tasks we will cover are:

  • Defining connection factory - ActiveMQ provide bean classes that could be used to configure URLs and other parameters of connections to brokers. The connection factory could later be used by your application to get the appropriate connection.
  • Defining destinations - ActiveMQ destination classes could be also configured as beans representing JMS destinations used by your producers and consumers.
  • Defining consumers - Spring JMS support provides helper bean classes that allows you to easily configure new consumers.
  • Defining producers - Spring also provides helper bean classes for creating new producers.

A message listener container is used to receive messages from a JMS message queue and drive the MessageListener that is injected into it. The listener container is responsible for all threading of message reception and dispatches into the listener for processing. A message listener container is the intermediary between an MDP and a messaging provider and takes care of registering to receive messages, participating in transactions, resource acquisition and release, exception conversion, and so on. This lets you write the (possibly complex) business logic associated with receiving a message (and possibly respond to it), and delegates boilerplate JMS infrastructure concerns to the framework. 消息侦听器容器用于从 JMS 消息队列接收消息并驱动 注入其中的 MessageListener。侦听器容器负责所有线程 消息接收并分派到侦听器中进行处理。消息侦听器容器是 MDP 和消息传递提供者之间的中介,负责注册到 接收消息、参与事务、资源获取与释放、异常 转换等等。这使您可以编写与 接收消息(并可能响应),并委托样板 JMS 基础设施 对框架的关注。

The basic abstraction for receiving messages in Spring is the message listener container. It is practically an intermediary between your message listener and broker, dealing with connections, threading and such, leaving you to worry just about your business logic. Spring 中接收消息的基本抽象是消息侦听器容器。它实际上是您的消息侦听器和代理之间的中介,处理连接、线程等,让您只关心您的业务逻辑。

  1. <bean id="portfolioListener" class="org.apache.activemq.book.ch2.portfolio.Listener">
  2. </bean>
  3. <bean id="javaConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  4. <property name="connectionFactory" ref="jmsFactory"/>
  5. <property name="destination" ref="javaDest" />
  6. <property name="messageListener" ref="portfolioListener" />
  7. </bean>
  8. <bean id="ionaConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  9. <property name="connectionFactory" ref="jmsFactory"/>
  10. <property name="destination" ref="ionaDest" />
  11. <property name="messageListener" ref="portfolioListener" />
  12. </bean>

As you can see, every message listener container needs a connection factory, destination and listener to be used. So all you have to do is to implement your desired listener and leave everything else to Spring. Note that in this examples we have used plain (not pooled) connection factory, since no connection pooling is needed. 如您所见,每个消息侦听器容器都需要使用连接工厂、目标和侦听器。因此,您所要做的就是实现您想要的侦听器,并将其他一切交给 Spring。请注意,在此示例中,我们使用了普通(非池化)连接工厂,因为不需要连接池化。 In this example we have used the DefaultMessageListenerContainer which is commonly used. This container could be dynamically adjusted in terms of concurrent consumers and such. Spring also provides more modest and advanced message containers, so please check its documentation when deciding which one is a perfect fit for your project. 在这个例子中,我们使用了常用的 DefaultMessageListenerContainer。这个容器可以在并发消费者等方面进行动态调整。 Spring 还提供了更普通和更高级的消息容器,因此在决定哪一个最适合您的项目时,请查看其文档。

When the JMS infrastructure is present, any bean can be annotated with @JmsListener to create a listener endpoint. If no JmsListenerContainerFactory has been defined, a default one is configured automatically. If a DestinationResolver, a MessageConverter, or a javax.jms.ExceptionListener beans are defined, they are associated automatically with the default factory. 当 JMS 基础设施存在时,任何 bean 都可以使用 @JmsListener 进行注释以创建一个侦听器端点。如果未定义 JmsListenerContainerFactory,则自动地配置默认的。如果DestinationResolver、MessageConverter 或 javax.jms.ExceptionListener beans 定义后,它们会自动与默认工厂关联。

If you need to create more JmsListenerContainerFactory instances or if you want to override the default, Spring Boot provides a DefaultJmsListenerContainerFactoryConfigurer that you can use to initialize a DefaultJmsListenerContainerFactory with the same settings as the one that is autoconfigured. 如果您需要创建更多 JmsListenerContainerFactory 实例或者如果您想覆盖默认值,Spring Boot 提供了一个 DefaultJmsListenerContainerFactoryConfigurer ,您可以使用它来初始化 DefaultJmsListenerContainerFactory ,其设置与自动配置的设置相同。

how to registe a durable subscriber on topic?

You cannot create a durable subscriber without specifying a unique clientID on a Connection.

source code:

  1. @Configuration(proxyBeanMethods = false)
  2. public class TestConfig
  3. {
  4. @Bean
  5. public DefaultJmsListenerContainerFactory wcsJmsListenerContainerFactory(DefaultJmsListenerContainerFactoryConfigurer configurer,
  6. ConnectionFactory jmsConnectionFactory)
  7. {
  8. DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  9. configurer.configure(factory, jmsConnectionFactory);
  10. factory.setSubscriptionDurable(true);
  11. //factory.setClientId("hello ActiveMQ");
  12. return factory;
  13. }
  14. }

error log:

  1. javax.jms.JMSException: You cannot create a durable subscriber without specifying a unique clientID on a Connection
  2. at org.apache.activemq.ActiveMQConnection.checkClientIDWasManuallySpecified(ActiveMQConnection.java:1289) ~[activemq-client-5.16.3.jar:5.16.3]
  3. at org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1478) ~[activemq-client-5.16.3.jar:5.16.3]
  4. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_131]
  5. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_131]
  6. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
  7. at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
  8. at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:401) ~[spring-jms-5.3.13.jar:5.3.13]
  9. at com.sun.proxy.$Proxy52.createDurableSubscriber(Unknown Source) ~[na:na]
  10. at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:863) ~[spring-jms-5.3.13.jar:5.3.13]
  11. at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:224) ~[spring-jms-5.3.13.jar:5.3.13]
  12. at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1264) ~[spring-jms-5.3.13.jar:5.3.13]
  13. at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1236) ~[spring-jms-5.3.13.jar:5.3.13]
  14. at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.13.jar:5.3.13]
  15. at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.13.jar:5.3.13]
  16. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

创建持久订阅者/监听,必须设置connection的clientID。
所以代码中的第10行后面,需要设置clientID,也就是取消第11行的注释。

javax.jms.IllegalStateException: setClientID call not supported on proxy for shared Connection. Set the ‘clientId’ property on the SingleConnectionFactory instead.

source code:

  1. @Configuration(proxyBeanMethods = false)
  2. public class TestConfig
  3. {
  4. @Bean
  5. public DefaultJmsListenerContainerFactory wcsJmsListenerContainerFactory(DefaultJmsListenerContainerFactoryConfigurer configurer,
  6. ConnectionFactory jmsConnectionFactory)
  7. {
  8. DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  9. configurer.configure(factory, jmsConnectionFactory);
  10. factory.setSubscriptionDurable(true);
  11. factory.setClientId("hello ActiveMQ");
  12. return factory;
  13. }
  14. }

error log:

  1. javax.jms.IllegalStateException: setClientID call not supported on proxy for shared Connection. Set the 'clientId' property on the SingleConnectionFactory instead.
  2. at org.springframework.jms.connection.SingleConnectionFactory$SharedConnectionInvocationHandler.invoke(SingleConnectionFactory.java:572) ~[spring-jms-5.3.13.jar:5.3.13]
  3. at com.sun.proxy.$Proxy51.setClientID(Unknown Source) ~[na:na]
  4. at org.springframework.jms.listener.AbstractJmsListeningContainer.prepareSharedConnection(AbstractJmsListeningContainer.java:435) [spring-jms-5.3.13.jar:5.3.13]
  5. at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:414) [spring-jms-5.3.13.jar:5.3.13]
  6. at org.springframework.jms.listener.AbstractJmsListeningContainer.establishSharedConnection(AbstractJmsListeningContainer.java:380) [spring-jms-5.3.13.jar:5.3.13]
  7. at org.springframework.jms.listener.DefaultMessageListenerContainer.establishSharedConnection(DefaultMessageListenerContainer.java:863) [spring-jms-5.3.13.jar:5.3.13]
  8. at org.springframework.jms.listener.AbstractJmsListeningContainer.doStart(AbstractJmsListeningContainer.java:292) [spring-jms-5.3.13.jar:5.3.13]
  9. at org.springframework.jms.listener.AbstractJmsListeningContainer.start(AbstractJmsListeningContainer.java:277) [spring-jms-5.3.13.jar:5.3.13]
  10. at org.springframework.jms.listener.DefaultMessageListenerContainer.start(DefaultMessageListenerContainer.java:657) [spring-jms-5.3.13.jar:5.3.13]
  11. at org.springframework.jms.config.JmsListenerEndpointRegistry.startIfNecessary(JmsListenerEndpointRegistry.java:242) [spring-jms-5.3.13.jar:5.3.13]
  12. at org.springframework.jms.config.JmsListenerEndpointRegistry.start(JmsListenerEndpointRegistry.java:205) [spring-jms-5.3.13.jar:5.3.13]
  13. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) [spring-context-5.3.13.jar:5.3.13]
  14. at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) [spring-context-5.3.13.jar:5.3.13]
  15. at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) [spring-context-5.3.13.jar:5.3.13]
  16. at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_131]
  17. at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) [spring-context-5.3.13.jar:5.3.13]
  18. at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) [spring-context-5.3.13.jar:5.3.13]
  19. at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.13.jar:5.3.13]
  20. at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.13.jar:5.3.13]
  21. at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:730) ~[spring-boot-2.6.1.jar:2.6.1]
  22. at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:412) ~[spring-boot-2.6.1.jar:2.6.1]
  23. at org.springframework.boot.SpringApplication.run(SpringApplication.java:302) ~[spring-boot-2.6.1.jar:2.6.1]
  24. at com.ksec.wcs.web.WcsWebStarter.main(WcsWebStarter.java:103) ~[classes/:na]

reason:

  • 持久化订阅的ConnectionFactory不能和非持久化的订阅一起使用
  • JMS 1.1 (which is what you’re using since you’re using ActiveMQ 5.x) doesn’t support shared durable subscriptions. Therefore, when you use setConcurrency(“3-10”) and Spring tries to create > 1 subscription you receive an error.

solution:

  • 注入自定义的ConnectionFactory实体,而不是注入DefaultJmsListenerContainerFactory时,使用容器中已经配置注入的jmsConnectionFactory,因为jmsConnectionFactory是共享连接。【推荐】
  • Use setConcurrency(“1”) which will limit the number of subscribers/consumers to 1. Depending on your requirements this could have a severe negative performance impact.
  • Switch to ActiveMQ Artemis which does support JMS 2.0 and invoke setSubscriptionShared(true).

suggest solution:

  1. @Configuration(proxyBeanMethods = false)
  2. public class TestConfig
  3. {
  4. @Bean
  5. public DefaultJmsListenerContainerFactory wcsJmsListenerContainerFactory(DefaultJmsListenerContainerFactoryConfigurer configurer,
  6. ConnectionFactory durableConnectionFactory)
  7. {
  8. DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  9. configurer.configure(factory, durableConnectionFactory);
  10. factory.setSubscriptionDurable(true);
  11. factory.setClientId("hello ActiveMQ");
  12. return factory;
  13. }
  14. @Bean
  15. public ConnectionFactory durableConnectionFactory(@Value("${spring.activemq.brokerUrl}") String url)
  16. {
  17. // 创建连接工厂,可以在brokerUrl后面跟上jms.useAsyncSend=true来表示异步发送消息
  18. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
  19. connectionFactory.setAlwaysSyncSend(true);// 设置异步发送消息
  20. connectionFactory.setRedeliveryPolicy(redeliveryPolicy());// 设置重发的策略
  21. return connectionFactory;
  22. }
  23. public RedeliveryPolicy redeliveryPolicy()
  24. {
  25. RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
  26. //是否在每次尝试重新发送失败后,增长这个等待时间
  27. redeliveryPolicy.setUseExponentialBackOff(true);
  28. //重发次数,默认为6次,这里设置为10次,-1表示不限次数
  29. redeliveryPolicy.setMaximumRedeliveries(3);
  30. //重发时间间隔,默认为1毫秒,设置为10000毫秒
  31. redeliveryPolicy.setInitialRedeliveryDelay(10000);
  32. //表示没有拖延只有UseExponentialBackOff(true)为true时生效
  33. //第一次失败后重新发送之前等待10000毫秒,第二次失败再等待10000 * 2毫秒
  34. //第三次翻倍10000 * 2 * 2,以此类推
  35. redeliveryPolicy.setBackOffMultiplier(2);
  36. //是否避免消息碰撞
  37. redeliveryPolicy.setUseCollisionAvoidance(true);
  38. //设置重发最大拖延时间360000毫秒 表示没有拖延只有UseExponentialBackOff(true)为true时生效
  39. redeliveryPolicy.setMaximumRedeliveryDelay(360000);
  40. return redeliveryPolicy;
  41. }
  42. }

reference:

<如何修复ActiveMQ持久订阅并抛出“已使用持久消费者”错误>

<ActiveMQ工作中遇到的问题总结>

Setup of JMS message listener invoker failed for destination

source code

  1. @Configuration
  2. @EnableConfigurationProperties({ ActiveMQProperties.class, JmsProperties.class })
  3. public class TestConfig
  4. {
  5. @Bean
  6. public DefaultJmsListenerContainerFactory wcsJmsListenerContainerFactory(DefaultJmsListenerContainerFactoryConfigurer config,
  7. ConnectionFactory durableConnectionFactory)
  8. {
  9. DefaultJmsListenerContainerFactory wcsJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
  10. config.configure(wcsJmsListenerContainerFactory, durableConnectionFactory);
  11. wcsJmsListenerContainerFactory.setSubscriptionShared(true);
  12. wcsJmsListenerContainerFactory.setSubscriptionDurable(true);
  13. wcsJmsListenerContainerFactory.setPubSubDomain(true);
  14. wcsJmsListenerContainerFactory.setClientId("ClientId001");
  15. return wcsJmsListenerContainerFactory;
  16. }
  17. @Bean
  18. public ConnectionFactory durableConnectionFactory(ActiveMQProperties properties)
  19. {
  20. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
  21. connectionFactory.setBrokerURL(properties.getBrokerUrl());
  22. return connectionFactory;
  23. }
  24. }

error log:

  1. 2021-12-21 23:13:20.736 INFO 2960 --- [ main] com.ksec.wcs.web.WcsWebStarter : Starting WcsWebStarter using Java 1.8.0_161 on Lanny-PC with PID 2960 (F:\KiMoJar\WorkSpace\EclipseProjects\com.ksec.wcs.web\target\classes started by Lanny in F:\KiMoJar\WorkSpace\EclipseProjects\com.ksec.wcs.web)
  2. 2021-12-21 23:13:20.740 INFO 2960 --- [ main] com.ksec.wcs.web.WcsWebStarter : No active profile set, falling back to default profiles: default
  3. 2021-12-21 23:13:22.417 INFO 2960 --- [ main] com.ksec.wcs.web.WcsWebStarter : Started WcsWebStarter in 2.413 seconds (JVM running for 3.075)
  4. 2021-12-21 23:13:27.439 WARN 2960 --- [ntContainer#0-1] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'SpringBootTest001' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer;
  5. 2021-12-21 23:13:32.471 WARN 2960 --- [ntContainer#0-2] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'SpringBootTest001' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer;
  6. 2021-12-21 23:13:37.489 WARN 2960 --- [ntContainer#0-3] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'SpringBootTest001' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer;
  7. 2021-12-21 23:13:42.503 WARN 2960 --- [ntContainer#0-4] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'SpringBootTest001' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer;
  8. 2021-12-21 23:13:47.522 WARN 2960 --- [ntContainer#0-5] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'SpringBootTest001' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer;
  9. 2021-12-21 23:13:52.539 WARN 2960 --- [ntContainer#0-6] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'SpringBootTest001' - trying to recover. Cause: org.apache.activemq.ActiveMQSession.createSharedDurableConsumer(Ljavax/jms/Topic;Ljava/lang/String;Ljava/lang/String;)Ljavax/jms/MessageConsumer;

reason:
JMS 1.1 (which is what you’re using since you’re using ActiveMQ 5.x) doesn’t support shared durable subscriptions.
You cannot create a durable subscriber without specifying a unique clientID on a Connection.
JMS1.1规范不支持持久化订阅所使用的连接被共享,SpringBoot2.6.1默认集成的是ActiveMQ5.16,ActiveMQ5.x使用的是JMS1.1规范。

solution:
示例代码中的第11行应该要被删掉,去掉耐久订阅的连接共享。