8.2 使用 RabbitMQ 和 AMQP

RabbitMQ 可以说是 AMQP 最优秀的实现,它提供了比 JMS 更高级的消息路由策略。JMS 消息使用接收方将从中检索它们的目的地的名称来寻址,而 AMQP 消息使用交换器的名称和路由键来寻址,它们与接收方正在监听的队列解耦。交换器和队列之间的这种关系如图 8.1 所示。

图 8.1 发送到 RabbitMQ 交换器的消息被路由到多个队列

图 8.1 发送到 RabbitMQ 交换器的消息被路由到多个队列

当消息到达 RabbitMQ broker 时,它将转到它所寻址的交换器。交换器负责将其路由到一个或多个队列,具体取决于交换器的类型、交换器与队列之间的绑定以及消息的路由键的值。

有几种不同的交换方式,包括以下几种:

  • Default —— 一种特殊的交换器,通过 broker 自动创建。它将消息路由到与消息的路由键的值同名的队列中。所有的队列将会自动地与交换器绑定。
  • Direct —— 路由消息到消息路由键的值与绑定值相同的队列。
  • Topic —— 将消息路由到一个或多个队列,其中绑定键(可能包含通配符)与消息的路由键匹配。
  • Fanout —— 将消息路由到所有绑定队列,而不考虑绑定键或路由键。
  • Headers —— 与 topic 交换器类似,只是路由基于消息头值而不是路由键。
  • Dead letter —— 对无法交付的消息(意味着它们不匹配任何已定义的交换器与队列的绑定)的全部捕获。

最简单的交换形式是 Default 和 Fanout,因为它们大致对应于 JMS 队列和主题。但是其他交换允许定义更灵活的路由方案。

需要理解的最重要的一点是,消息是用路由键发送到交换器的,它们是从队列中使用的。它们如何从一个交换到一个队列取决于绑定定义以及什么最适合相应的情况。

使用哪种交换类型以及如何定义从交换到队列的绑定与 Spring 应用程序中消息的发送和接收方式关系不大。因此,我们将重点讨论如何编写使用 RabbitMQ 发送和接收消息的代码。

注意

有关如何最好地将队列绑定到交换器的更详细讨论,请参见 Alvaro Videla 和 Jason J.W. Williams(Manning, 2012)的《RabbitMQ 实战》

8.2.1 添加 RabbitMQ 到 Spring 中

在开始使用 Spring 发送和接收 RabbitMQ 消息之前,需要将 Spring Boot 的 AMQP starter 依赖项添加到构建中,以取代在前一节中添加的 Artemis 或 ActiveMQ starter:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

将 AMQP starter 添加到构建中将触发自动配置,该配置将创建 AMQP 连接工厂和 RabbitTemplate bean,以及其他支持组件。只需添加此依赖项,就可以开始使用 Spring 从 RabbitMQ broker 发送和接收消息,表 8.4 中列出了一些有用的属性。

表 8.4 配置 RabbitMQ broker 的位置和凭据的属性

属性 描述
spring.rabbitmq.addresses 一个逗号分隔的 RabbitMQ Broker 地址列表
spring.rabbitmq.host Broker 主机(默认为 localhost)
spring.rabbitmq.port Broker 端口(默认为 5672)
spring.rabbitmq.username 访问 Broker 的用户名(可选)
spring.rabbitmq.password 访问 Broker 的密码(可选)

出于开发目的,可能有一个 RabbitMQ Broker,它不需要在本地机器上运行身份验证,监听端口 5672。当还在开发阶段时,这些属性可能不会有太大的用处,但是当应用程序进入生产环境时,它们无疑会很有用。

例如,假设在进入生产环境时,RabbitMQ Broker 位于一个名为 rabbit.tacocloud.com 的服务器上,监听端口 5673,并需要凭据。在这种情况下,应用程序中的以下配置。当 prod 配置文件处于活动状态时,yml 文件将设置这些属性:

  1. spring:
  2. profiles: prod
  3. rabbitmq:
  4. host: rabbit.tacocloud.com
  5. port: 5673
  6. username: tacoweb
  7. password: l3tm31n

现在 RabbitMQ 被配置到了应用程序中了,是时候使用 RabbitTemplate 发送消息了。

8.2.2 使用 RabbitTemplate 发送消息

Spring 对于 RabbitMQ 消息支持的核心就是 RabbitTemplate。RabbitTemplate 提供一套与 JmsTemplate 类似的方法。但是对于 RabbitMQ,在工作方式上还是有一些细微的差别。

关于使用 RabbitTemplate 发送消息,send() 和 convertAndSend() 方法与来自 JmsTemplate 的同名方法并行。但是不同于 JmsTemplate 方法,它只将消息路由到给定的队列或主题,RabbitTemplate 方法根据交换和路由键发送消息。下面是一些用 RabbitTemplate 发送消息的最有用的方法:

  1. // 发送原始消息
  2. void send(Message message) throws AmqpException;
  3. void send(String routingKey, Message message) throws AmqpException;
  4. void send(String exchange, String routingKey, Message message) throws AmqpException;
  5. // 发送从对象转换过来的消息
  6. void convertAndSend(Object message) throws AmqpException;
  7. void convertAndSend(String routingKey, Object message) throws AmqpException;
  8. void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
  9. // 发送经过处理后从对象转换过来的消息
  10. void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
  11. void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
  12. void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;

这些方法与 JmsTemplate 中的孪生方法遵循类似的模式。前三个 send() 方法都发送一个原始消息对象。接下来的三个 convertAndSend() 方法接受一个对象,该对象将在发送之前在后台转换为消息。最后三个 convertAndSend() 方法与前三个方法类似,但是它们接受一个 MessagePostProcessor,可以在消息对象发送到代理之前使用它来操作消息对象。

这些方法与对应的 JmsTemplate 方法不同,它们接受 String 值来指定交换和路由键,而不是目的地名称(或 Destination 对象)。不接受交换的方法将把它们的消息发送到默认交换。同样,不接受路由键的方法将使用默认路由键路由其消息。

让我们用 RabbitTemplate 发送 taco 订单。一种方法是使用 send() 方法,如程序清单 8.5 所示。但是在调用 send() 之前,需要将 Order 对象转换为消息。如果不是因为 RabbitTemplate 使用 getMessageConverter() 方法使其消息转换器可用,这可能是一项乏味的工作。程序清单 8.5 使用 RabbitTemplate.send() 发送消息

  1. package tacos.messaging;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.core.MessageProperties;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.amqp.support.converter.MessageConverter;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. import tacos.Order;
  9. @Service
  10. public class RabbitOrderMessagingService implements OrderMessagingService {
  11. private RabbitTemplate rabbit;
  12. @Autowired
  13. public RabbitOrderMessagingService(RabbitTemplate rabbit) {
  14. this.rabbit = rabbit;
  15. }
  16. public void sendOrder(Order order) {
  17. MessageConverter converter = rabbit.getMessageConverter();
  18. MessageProperties props = new MessageProperties();
  19. Message message = converter.toMessage(order, props);
  20. rabbit.send("tacocloud.order", message);
  21. }
  22. }

有了 MessageConverter 之后,将 Order 转换为消息就很简单了。必须使用 MessageProperties 提供任何消息属性,但是如果不需要设置任何此类属性,则可以使用 MessageProperties 的缺省实例。然后,剩下的就是调用 send(),将交换键和路由键(两者都是可选的)与消息一起传递。在本例中,只指定了与消息一起的路由键:tacocloud.order,因此将使用缺省交换。

说到默认交换,默认交换名称是 “”(一个空 String ),它对应于 RabbitMQ Broker 自动创建的默认交换。同样,默认的路由键是 “”(其路由取决于所涉及的交换和绑定)。可以通过设置 spring.rabbitmq.template.exchange 和 spring.rabbitmq.template.routing-key 属性来覆盖这些缺省值:

  1. spring:
  2. rabbitmq:
  3. template:
  4. exchange: tacocloud.orders
  5. routing-key: kitchens.central

在这种情况下,所有发送的消息都将自动发送到名为 tacocloud.orders 的交换器。如果在 send() 或 convertAndSend() 调用中也未指定路由键,则消息将有一个 kitchens.central 的路由键。

从消息转换器创建消息对象非常简单,但是使用 convertAndSend() 让 RabbitTemplate 处理所有的转换工作就更容易了:

  1. public void sendOrder(Order order) {
  2. rabbit.convertAndSend("tacocloud.order", order);
  3. }

配置消息转换器

默认情况下,使用 SimpleMessageConverter 执行消息转换,SimpleMessageConverter 能够将简单类型(如 String)和可序列化对象转换为消息对象。但是 Spring 为 RabbitTemplate 提供了几个消息转换器,包括以下内容:

  • Jackson2JsonMessageConverter —— 使用Jackson 2 JSON处理器将对象与 JSON 进行转换
  • MarshallingMessageConverter —— 使用 Spring 的序列化和反序列化抽象转换 String 和任何类型的本地对象
  • SimpleMessageConverter —— 转换 String、字节数组和序列化类型
  • ContentTypeDelegatingMessageConverter —— 基于 contentType 头信息将对象委托给另一个 MessageConverter
  • MessagingMessageConverter —— 将消息转换委托给底层 MessageConverter,将消息头委托给 AmqpHeaderConverter

如果需要修改消息转换器,需要做的是配置 MessageConverter bean,例如,对于基于 JSON 的消息对话,可以像下面这样配置 Jackson2JsonMessageConverter:

  1. @Bean
  2. public MessageConverter messageConverter() {
  3. return new Jackson2JsonMessageConverter();
  4. }

Spring Boot 的自动配置将会发现这个 bean 并 RabbitTemplate 的缺省的消息转换器那里。

设置消息属性

与 JMS 一样,可能需要在发送的消息中设置一些标题。例如,假设需要为通过 Taco Cloud 网站提交的所有订单发送一个 X_ORDER_SOURCE。在创建 Message 对象时,可以通过提供给消息转换器的 MessageProperties 实例设置消息头。

重新访问程序清单 8.5 中的 sendOrder() 方法,只需要添加一行代码来设置标题:

  1. public void sendOrder(Order order) {
  2. MessageConverter converter = rabbit.getMessageConverter();
  3. MessageProperties props = new MessageProperties();
  4. props.setHeader("X_ORDER_SOURCE", "WEB");
  5. Message message = converter.toMessage(order, props);
  6. rabbit.send("tacocloud.order", message);
  7. }

但是,在使用 convertAndSend() 时,不能快速访问 MessageProperties 对象。不过,MessagePostProcessor 可以做到这一点:

  1. @Override
  2. public void sendOrder(Order order) {
  3. rabbit.convertAndSend("tacocloud.order.queue", order,
  4. new MessagePostProcessor() {
  5. @Override
  6. public Message postProcessMessage(Message message)
  7. throws AmqpException {
  8. MessageProperties props = message.getMessageProperties();
  9. props.setHeader("X_ORDER_SOURCE", "WEB");
  10. return message;
  11. }
  12. });
  13. }

这里,在 convertAndSend() 中使用 MessagePostProcessor 的匿名内部类进行实现 。在 postProcessMessage() 方法中,首先从消息中获取 MessageProperties,然后调用 setHeader() 来设置 X_ORDER_SOURCE 头信息。

现在已经了解了如何使用 RabbitTemplate 发送消息,接下来让我们将注意力转移到从 RabbitMQ 队列接收消息的代码上。

8.2.3 从 RabbitMQ 接收消息

使用 RabbitTemplate 发送消息与使用 JmsTemplate 发送消息差别不大。事实证明,从 RabbitMQ 队列接收消息与从 JMS 接收消息并没有太大的不同。

与 JMS 一样,有两个选择:

  • 使用 RabbitTemplate 从队列中拉取消息
  • 获取被推送到 @RabbitListener 注解的方法中的消息

让我们从基于拉模型的 RabbitTemplate.receive() 方法开始。

使用 RabbitTemplate 接收消息

RabbitTemplate 有多个从队列中拉取消息的方法,一部分最有用的方法如下所示:

  1. // 接收消息
  2. Message receive() throws AmqpException;
  3. Message receive(String queueName) throws AmqpException;
  4. Message receive(long timeoutMillis) throws AmqpException;
  5. Message receive(String queueName, long timeoutMillis) throws AmqpException;
  6. // 接收从消息转换过来的对象
  7. Object receiveAndConvert() throws AmqpException;
  8. Object receiveAndConvert(String queueName) throws AmqpException;
  9. Object receiveAndConvert(long timeoutMillis) throws AmqpException;
  10. Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
  11. // 接收从消息转换过来的类型安全的对象
  12. <T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
  13. <T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
  14. <T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
  15. <T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;

这些方法是前面描述的 send() 和 convertAndSend() 方法的镜像。send() 用于发送原始 Message 对象,而 receive() 从队列接收原始 Message 对象。同样地,receiveAndConvert() 接收消息,并在返回消息之前使用消息转换器将其转换为域对象。

但是在方法签名方面有一些明显的区别。首先,这些方法都不以交换键或路由键作为参数。这是因为交换和路由键用于将消息路由到队列,但是一旦消息在队列中,它们的下一个目的地就是将消息从队列中取出的使用者。使用应用程序不需要关心交换或路由键,队列是在消费应用程序是仅仅需要知道一个东西。

许多方法接受一个 long 参数来表示接收消息的超时。默认情况下,接收超时为 0 毫秒。也就是说,对 receive() 的调用将立即返回,如果没有可用的消息,则可能返回空值。这与 receive() 方法在 JmsTemplate 中的行为有明显的不同。通过传入超时值,可以让 receive() 和 receiveAndConvert() 方法阻塞,直到消息到达或超时过期。但是,即使使用非零超时,代码也要准备好处理返回的 null 值。

让我们看看如何将其付诸实践。下面程序清单显示了 OrderReceiver 的一个新的基于 Rabbit 的实现,它使用 RabbitTemplate 来接收订单。程序清单 8.6 使用 RabbitTemplate 从 RabbitMQ 拉取订单

  1. package tacos.kitchen.messaging.rabbit;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.amqp.support.converter.MessageConverter;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class RabbitOrderReceiver {
  9. private RabbitTemplate rabbit;
  10. private MessageConverter converter;
  11. @Autowired
  12. public RabbitOrderReceiver(RabbitTemplate rabbit) {
  13. this.rabbit = rabbit;
  14. this.converter = rabbit.getMessageConverter();
  15. }
  16. public Order receiveOrder() {
  17. Message message = rabbit.receive("tacocloud.orders");
  18. return message != null
  19. ? (Order) converter.fromMessage(message)
  20. : null;
  21. }
  22. }

receiveOrder() 方法是所有操作发生的地方。它调用所注入的 RabbitTemplate 上的 receive() 方法来从 tacocloud.queue 中获取订单。它不提供超时值,因此只能假设调用立即返回 Message 或 null。如果返回一条 Message,则使用 RabbitTemplate 中的 MessageConverter 将 Message 转换为 Order。另一方面,如果 receive() 返回 null,则返回 null。

根据实际情况的不同,可能容忍一个小的延迟。例如,在 Taco Cloud 厨房项目的头顶显示器中,如果没有订单信息出现,可以等待一下,可以决定等 30 秒后再放弃。然后,可以将 receiveOrder() 方法更改为传递一个 30,000 毫秒的延迟后再调用 receive():

  1. public Order receiveOrder() {
  2. Message message = rabbit.receive("tacocloud.order.queue", 30000);
  3. return message != null
  4. ? (Order) converter.fromMessage(message)
  5. : null;
  6. }

如果你和我一样,看到这样一个硬编码的数字会让你有点不舒服。那么创建一个带 @ConfigurationProperties 注解的类是个好想法,这样就可以使用 Spring Boot 的配置属性来配置超时。如果不是 Spring Boot 已经提供了这样的配置属性,我也会觉得硬编码的数字很不舒服。如果希望通过配置设置超时,只需删除 receive() 调用中的超时值,并在配置中使用 spring.rabbitmq.template.receive-timeout 属性设置它:

  1. spring:
  2. rabbitmq:
  3. template:
  4. receive-timeout: 30000

回到 receiveOrder() 方法,请注意,必须使用 RabbitTemplate 中的消息转换器来将传入 Message 对象转换为 Order 对象。但是如果 RabbitTemplate 携带了一个消息转换器,为什么它不能进行转换呢?这正是 receiveAndConvert() 方法的用途。使用 receiveAndConvert(),可以像这样重写 receiveOrder():

  1. public Oreder receiveOrder() {
  2. return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
  3. }

那就简单多了,不是吗?所看到的唯一麻烦的事情就是从 Object 到 Order 的转换。不过,除了演员阵容,还有另一种选择。相反,你可以传递一个 ParameterizedTypeReference 来直接接收一个 Order 对象:

  1. public Order receiveOrder() {
  2. return rabbit.receiveAndConvert("tacocloud.order.queue",
  3. new ParameterizedTypeReference<Order>() {});
  4. }

这是否比类型转换更好还值得商榷,但它是一种比类型转换更安全的方法。使用 receiveAndConvert() 的 ParameterizedTypeReference 的惟一要求是消息转换器必须是 SmartMessageConverter 的实现;Jackson2JsonMessageConverter 是唯一可以选择的开箱即用的实现。

JmsTemplate 提供的拉模型适用于许多用例,但通常最好有监听消息并在消息到达时调用的代码。让我们看看如何编写响应 RabbitMQ 消息的消息驱动 bean。

使用监听器处理 RabbitMQ 消息

对于消息驱动的 RabbitMQ bean,Spring 提供了 RabbitListener,相当于 RabbitMQ 中的 JmsListener。要指定当消息到达 RabbitMQ 队列时应该调用某个方法,请在相应的 bean 方法上使用 @RabbitTemplate 进行注解 。

例如,下面的程序清单显示了 OrderReceiver 的 RabbitMQ 实现,它被注解为监听订单消息,而不是使用 RabbitTemplate 来轮询订单消息。程序清单 8.7 声明一个方法作为 RabbitMQ 消息监听器

  1. package tacos.kitchen.messaging.rabbit.listener;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class OrderListener {
  7. private KitchenUI ui;
  8. @Autowired
  9. public OrderListener(KitchenUI ui) {
  10. this.ui = ui;
  11. }
  12. @RabbitListener(queues = "tacocloud.order.queue")
  13. public void receiveOrder(Order order) {
  14. ui.displayOrder(order);
  15. }
  16. }

这与程序清单 8.4 中的代码非常相似。实际上,唯一改变的是监听器注解—从 @JmsListener 变为了 @RabbitListener。尽管 @RabbitListener 非常棒,但这种近乎复制的代码让我对 @RabbitListener 没什么可说的,而我之前还没有对 @JmsListener 说过。它们都非常适合编写从各自的 broker 推送给它们的消息的代码 —— JMS broker 用于 @JmsListener,RabbitMQ broker 用于 @RabbitListener。

虽然在前面的段落中可能感觉到了 @RabbitListener 不是那么让人兴奋。事实上,@RabbitListener 与 @JmsListener 的工作方式非常相似,这一点非常令人兴奋!这意味着在使用 RabbitMQ 与 Artemis 或 ActiveMQ 时,不需要学习完全不同的编程模型。同样令人兴奋的是 RabbitTemplate 和 JmsTemplate 之间的相似性。

在结束本章时,让我们继续关注 Spring 支持的另一个消息传递选项:Apache Kafka。