26.4 接收消息

26.4.1 同步接收

虽然 JMS 通常与异步处理相关,但它也可以同步地消费消息。可重载的receive(..)方法提供了这个功能。在同步接收期间,调用线程阻塞,直到接收到消息。这可能是一个危险的操作,因为调用线程可能无限期地被阻塞。receiveTimeout属性指定了接收者等待消息的超时时间。

26.4.2 异步接收 - 消息驱动的 POJOs

Spring 还可以通过使用@JmsListener注解来支持监听注解端点,并提供了一种以编程方式注册端点的开放式基础架构。 这是设置异步接收器的最方便的方法,有关详细信息,请参见第26.6.1节“启用监听端点注解”

类似于 EJB 世界里流行的消息驱动 bean(MDB),消息驱动 POJO(MDP) 作为 JMS 消息的接收器。MDP 的一个约束(请看下面的有关javax.jms.MessageListener类的讨论)是它必须实现javax.jms.MessageListener接口。另外当你的 POJO 将以多线程的方式接收消息时必须确保你的代码是线程安全的。

下面是 MDP 的一个简单实现:

  1. import javax.jms.JMSException;
  2. import javax.jms.Message;
  3. import javax.jms.MessageListener;
  4. import javax.jms.TextMessage;
  5. public class ExampleListener implements MessageListener {
  6. public void onMessage(Message message) {
  7. if (message instanceof TextMessage) {
  8. try {
  9. System.out.println(((TextMessage) message).getText());
  10. }
  11. catch (JMSException ex) {
  12. throw new RuntimeException(ex);
  13. }
  14. }
  15. else {
  16. throw new IllegalArgumentException("Message must be of type TextMessage");
  17. }
  18. }
  19. }

一旦你实现了MessageListener接口,下面该创建一个消息监听容器了。

请看下面例子是如何定义和配置一个随 Sping 发行的消息侦听容器的(这个例子用DefaultMessageListenerContainer)。

  1. <!-- this is the Message Driven POJO (MDP) -->
  2. <bean id="messageListener" class="jmsexample.ExampleListener" />
  3. <!-- and this is the message listener container -->
  4. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  5. <property name="connectionFactory" ref="connectionFactory"/>
  6. <property name="destination" ref="destination"/>
  7. <property name="messageListener" ref="messageListener" />
  8. </bean>

请参阅各种消息监听容器的 Spring javadocs,以了解每个实现所支持功能的完整描述。

26.4.3 SessionAwareMessageListener 接口

SessionAwareMessageListener接口是一个 Spring 专门用来提供类似于 JMS MessageListener的接口,也提供了从接收Message来访问 JMS Session的消息处理方法。

  1. package org.springframework.jms.listener;
  2. public interface SessionAwareMessageListener {
  3. void onMessage(Message message, Session session) throws JMSException;
  4. }

如果你希望你的 MDP 可以响应所有接收到的消息(使用onMessage(Message, Session)方法提供的Session)那么你可以选择让你的 MDP 实现这个接口(优先于标准的 JMS MessageListener接口)。所有随 Spring 发行的支持 MDP 的消息监听容器都支持MessageListenerSessionAwareMessageListener接口的实现。要注意的是实现了SessionAwareMessageListener接口的类通过接口与 Spring 有了耦合。是否选择使用它完全取决于开发者或架构师。

请注意SessionAwareMessageListener接口的onMessage(..)方法会抛出JMSException异常。和标准 JMS MessageListener接口相反,当使用SessionAwareMessageListener接口时,客户端代码负责处理所有抛出的异常。

26.4.4 MessageListenerAdapter

MessageListenerAdapter类是 Spring 的异步支持消息类中的最后一个组建:简而言之,它允许您将几乎任何类都暴露为MDP(当然有一些限制)。

请考虑以下接口定义。请注意,虽然该接口既不继承MessageListener,也不继承SessionAwareMessageListener接口,但通过MessageListenerAdapter类依然可以当作一个 MDP 使用。还要注意,各种消息处理方法是如何根据可以接收和处理的各种消息的内容进行强类型匹配的。

  1. public interface MessageDelegate {
  2. void handleMessage(String message);
  3. void handleMessage(Map message);
  4. void handleMessage(byte[] message);
  5. void handleMessage(Serializable message);
  6. }
  1. public class DefaultMessageDelegate implements MessageDelegate {
  2. // implementation elided for clarity...
  3. }

尤其要注意的是,上述MessageDelegate接口的实现(上述DefaultMessageDelegate类)完全不依赖于 JMS。它是一个真正的 POJO,我们可以通过如下配置把它设置成 MDP。

  1. <!-- this is the Message Driven POJO (MDP) -->
  2. <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  3. <constructor-arg>
  4. <bean class="jmsexample.DefaultMessageDelegate"/>
  5. </constructor-arg>
  6. </bean>
  7. <!-- and this is the message listener container... -->
  8. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  9. <property name="connectionFactory" ref="connectionFactory"/>
  10. <property name="destination" ref="destination"/>
  11. <property name="messageListener" ref="messageListener" />
  12. </bean>

以下是另一个只能接收 JMS TextMessage消息的 MDP 示例。注意消息处理方法是如何实际调用receive(在MessageListenerAdapter中默认的消息处理方法的名字是handleMessage)的,但是它是可配置的(从下面可以看到)。注意receive(..)方法是如何使用强制类型来只接收和处理JMS TextMessage消息的。

  1. public interface TextMessageDelegate {
  2. void receive(TextMessage message);
  3. }
  1. public class DefaultTextMessageDelegate implements TextMessageDelegate {
  2. // implementation elided for clarity...
  3. }

辅助的MessageListenerAdapter类配置文件类似如下:

  1. <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  2. <constructor-arg>
  3. <bean class="jmsexample.DefaultTextMessageDelegate"/>
  4. </constructor-arg>
  5. <property name="defaultListenerMethod" value="receive"/>
  6. <!-- we don't want automatic message context extraction -->
  7. <property name="messageConverter">
  8. <null/>
  9. </property>
  10. </bean>

请注意,如果上述messageListener接收到不是TextMessage类型的 JMS 消息,则会抛出IllegalStateException(随之产生的其他异常只被捕获而不处理)。MessageListenerAdapter还有一个功能就是如果处理方法返回一个非空值,它将自动返回一个响应消息。请看下面的接口及其实现:

  1. public interface ResponsiveTextMessageDelegate {
  2. // notice the return type...
  3. String receive(TextMessage message);
  4. }
  1. public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
  2. // implementation elided for clarity...
  3. }

如果将上述DefaultResponsiveTextMessageDelegateMessageListenerAdapter联合使用,那么从执行receive(..)方法返回的任何非空值都将(缺省情况下)转换为TextMessage。这个返回的TextMessage将被发送到原来的Message中 JMS Reply-To 属性定义的目的地(如果存在),或者是MessageListenerAdapter设置(如果配置了)的缺省目的地;如果没有定义目的地,那么将产生一个InvalidDestinationException异常(此异常将不会只被捕获而不处理,它将沿着调用堆栈上传)。

26.4.5 事务中的消息处理

在事务中调用消息监听器只需要重新配置监听容器。

本地资源事务可以通过监听容器上定义的sessionTransacted标志进行简单地激活。 然后,每个消息监听器调用将在激活的 JMS 事务中进行操作,并在监听器执行失败的情况下进行消息回滚。 发送响应消息(通过SessionAwareMessageListener)将成为同一本地事务的一部分,但任何其他资源操作(如数据库访问)将独立运行。 在监听器的实现中通常需要进行重复消息的检测,覆盖数据库处理已经提交但消息处理提交失败的情况。

  1. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  2. <property name="connectionFactory" ref="connectionFactory"/>
  3. <property name="destination" ref="destination"/>
  4. <property name="messageListener" ref="messageListener"/>
  5. <property name="sessionTransacted" value="true"/>
  6. </bean>

对于参与外部管理的事务,你将需要配置一个事务管理器并使用支持外部管理事务的监听容器:通常为DefaultMessageListenerContainer

要配置 XA 事务参与的消息监听容器,您需要配置一个JtaTransactionManager(默认情况下,它将委托给 Java EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory需要具有 XA 能力并且正确地注册到你的 JTA 事务协调器上!(检查你的 Java EE 服务的 JNDI 资源配置。)这允许消息接收以及例如同一事务下的数据库访问(具有统一提交语义,以 XA 事务日志开销为代价)。

  1. <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后,你只需要将它添加到我们之前的容器配置中。其余的交给容器处理。

  1. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  2. <property name="connectionFactory" ref="connectionFactory"/>
  3. <property name="destination" ref="destination"/>
  4. <property name="messageListener" ref="messageListener"/>
  5. <property name="transactionManager" ref="transactionManager"/>
  6. </bean>