8.3 使用 Kafka 发送消息

Apache Kafka 是我们在本章中研究的最新消息传递选项。乍一看,Kafka 是一个消息代理,就像ActiveMQ、Artemis 或 Rabbit 一样。但是 Kafka 有一些独特的技巧。

Kafka 被设计为在集群中运行,提供了巨大的可伸缩性。通过将其 topic 划分到集群中的所有实例中,它具有很强的弹性。RabbitMQ 主要处理 exchange 中的队列,而 Kafka 仅利用 topic 来提供消息的发布/订阅。

Kafka topic 被复制到集群中的所有 broker 中。集群中的每个节点充当一个或多个 topic 的 leader,负责该 topic 的数据并将其复制到集群中的其他节点。

更进一步说,每个 topic 可以分成多个分区。在这种情况下,集群中的每个节点都是一个 topic 的一个或多个分区的 leader,但不是整个 topic 的 leader。该 topic 的职责由所有节点分担。图 8.2 说明了这是如何工作的。图 8.2 Kafka 集群由多个 broker 组成,每一个都作为 topic 分区的 leader

图 8.2

由于 Kafka 独特的构建风格,我鼓励你在迪伦·斯科特(Dylan Scott,2017)的《Kafka 实战》中阅读更多关于它的内容。出于我们的目的,我们将重点讨论如何使用 Spring 向 Kafka 发送和接收消息。

8.3.1 设置 Spring 的 Kafka

要开始使用 Kafka 进行消息传递,需要将适当的依赖项添加到构建中。但是,与 JMS 和 RabbitMQ 不同,Kafka 没有 Spring Boot starter。不过还是只需要一个依赖:

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

这个依赖项将 Kafka 所需的一切都带到项目中。更重要的是,它的存在将触发 Kafka 的 Spring Boot 自动配置,它将在 Spring 应用程序上下文中生成一个 KafkaTemplate。你所需要做的就是注入 KafkaTemplate 并开始发送和接收消息。

然而,在开始发送和接收消息之前,应该了解一些在使用 Kafka 时会派上用场的属性。具体来说就是,KafkaTemplate 默认在 localhost 上运行 Kafka broker,并监听 9092 端口。在开发应用程序时,在本地启动 Kafka broker 是可以的,但是在进入生产环境时,需要配置不同的主机和端口。

spring.kafka.bootstrap-servers 属性设置一个或多个 Kafka 服务器的位置,用于建立到 Kafka 集群的初始连接。例如,如果集群中的 Kafka 服务器之一运行在 Kafka .tacocloud.com 上,并监听 9092 端口,那么可以在 YAML 中像这样配置它的位置:

  1. spring:
  2. kafka:
  3. bootstrap-servers:
  4. - kafka.tacocloud.com:9092

但是注意 spring.kafka.bootstrap-servers 属性是复数形式,它接受一个列表。因此,可以在集群中为它提供多个 Kafka 服务器:

  1. spring:
  2. kafka:
  3. bootstrap-servers:
  4. - kafka.tacocloud.com:9092
  5. - kafka.tacocloud.com:9093
  6. - kafka.tacocloud.com:9094

在项目中设置了 Kafka 之后,就可以发送和接收消息了。首先来看看 KafkaTemplate 将 Order 对象发送给 Kafka。

8.3.2 使用 KafkaTemplate 发送消息

在许多方面,KafkaTemplate 与 JMS 和 RabbitMQ 类似。与此同时,它也是不同的,尤其是在我们考虑它发送消息的方法时:

  1. ListenableFuture<SendResult<K, V>> send(String topic, V data);
  2. ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
  3. ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
  4. ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
  5. ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
  6. ListenableFuture<SendResult<K, V>> send(Message<?> message);
  7. ListenableFuture<SendResult<K, V>> sendDefault(V data);
  8. ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
  9. ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
  10. ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

注意到的第一件事是没有 convertAndSend() 方法。这是因为 KafkaTemplate 是用的泛型,同时能够在发送消息时直接处理域类型。在某种程度上,所有的 send() 方法都在做 convertAndSend() 的工作。

再者 send() 和 sendDefault() 的参数,它们与 JMS 和 Rabbit 中使用的参数完全不同。当使用 Kafka 发送消息时,可以指定以下参数来指导如何发送消息:

  • 发送消息的 topic(send() 方法必要的参数)
  • 写入 topic 的分区(可选)
  • 发送记录的键(可选)
  • 时间戳(可选;默认为 System.currentTimeMillis())
  • payload(必须)

topic 和 payload 是两个最重要的参数。分区和键对如何使用 KafkaTemplate 几乎没有影响,除了作为 send() 和 sendDefault() 的参数用于提供额外信息。出于我们的目的,我们将把重点放在将消息有效负载发送到给定主题上,而不考虑分区和键。

对于 send() 方法,还可以选择发送一个 ProducerRecord,它与在单个对象中捕获所有上述参数的类型差不多。也可以发送 Message 对象,但是这样做需要将域对象转换为 Message。通常,使用其他方法比创建和发送 ProducerRecord 或 Message 对象更容易。

使用 KafkaTemplate 及其 send() 方法,可以编写一个基于 kafka 的 OrderMessagingService 实现。下面的程序清单显示了这样一个实现。程序清单 8.8 使用 KafkaTemplate 发送订单

  1. package tacos.messaging;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class KafkaOrderMessagingService implements OrderMessagingService {
  7. private KafkaTemplate<String, Order> kafkaTemplate;
  8. @Autowired
  9. public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
  10. this.kafkaTemplate = kafkaTemplate;
  11. }
  12. @Override
  13. public void sendOrder(Order order) {
  14. kafkaTemplate.send("tacocloud.orders.topic", order);
  15. }
  16. }

在 OrderMessagingService 的这个实现中,sendOrder() 方法使用注入的 KafkaTemplate 的 send() 方法向名为tacocloud.orders.topic 的主题发送 Order。代码中除了使用 “Kafka” 这个名称外,这与为 JMS 和 Rabbit 编写的代码没有太大的不同。

如果设置了默认主题,可以稍微简化 sendOrder() 方法。首先,通过设置 spring.kafka.template.default-topic 属性,将默认主题设置为 tacocloud.orders.topic:

  1. spring:
  2. kafka:
  3. template:
  4. default-topic: tacocloud.orders.topic

然后,在 sendOrder() 方法中,可以调用 sendDefault() 而不是 send(),并且不指定主题名称:

  1. @Override
  2. public void sendOrder(Order order) {
  3. kafkaTemplate.sendDefault(order);
  4. }

现在已经编写了消息发送代码了,让我们将注意力转向编写从 Kafka 接收这些消息的代码。

8.3.3 编写 Kafka 监听器

除了 send() 和 sendDefault() 的惟一方法签名之外,KafkaTemplate 与 JmsTemplate 和 RabbitTemplate 的不同之处在于它不提供任何接收消息的方法。这意味着使用 Spring 消费来自 Kafka 主题的消息的唯一方法是编写消息监听器。

对于 Kafka,消息监听器被定义为被 @KafkaListener 注解的方法。@KafkaListener 注解大致类似于 @JmsListener 和 @RabbitListener,其使用方式大致相同。下面程序清单显示了为 Kafka 编写的基于 listener 的订单接收程序。程序清单 8.9 使用 @KafkaListener 接收订单

  1. package tacos.kitchen.messaging.kafka.listener;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. import tacos.Order;
  6. import tacos.kitchen.KitchenUI;
  7. @Component
  8. public class OrderListener {
  9. private KitchenUI ui;
  10. @Autowired
  11. public OrderListener(KitchenUI ui) {
  12. this.ui = ui;
  13. }
  14. @KafkaListener(topics="tacocloud.orders.topic")
  15. public void handle(Order order) {
  16. ui.displayOrder(order);
  17. }
  18. }

handle() 方法由 @KafkaListener 注解,表示当消息到达名为 tacocloud.orders.topic 的主题时应该调用它。正如程序清单 8.9 中所写的,只为 handle() 方法提供了一个 Order(payload)参数 。但是,如果需要来自消息的其他元数据,它也可以接受一个 ConsumerRecord 或 Message 对象。

例如,handle() 的以下实现接受一个 ConsumerRecord,这样就可以记录消息的分区和时间戳:

  1. @KafkaListener(topics="tacocloud.orders.topic")
  2. public void handle(Order order, ConsumerRecord<Order> record) {
  3. log.info("Received from partition {} with timestamp {}",
  4. record.partition(), record.timestamp());
  5. ui.displayOrder(order);
  6. }

类似地,可以使用 Message 而不是 ConsumerRecord,并达到同样的效果:

  1. @KafkaListener(topics="tacocloud.orders.topic")
  2. public void handle(Order order, Message<Order> message) {
  3. MessageHeaders headers = message.getHeaders();
  4. log.info("Received from partition {} with timestamp {}",
  5. headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
  6. headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
  7. ui.displayOrder(order);
  8. }

值得注意的是,消息有效负载也可以通过 ConsumerRecord.value() 或 Message.getPayload() 获得。这意味着可以通过这些对象请求 Order,而不是直接将其作为 handle() 的参数。