Lagom提供了使用Kafka的MessageBroker API的实现。以下各节介绍如何在构建中添加依赖项,以及如何配置和调整主题发布者和订阅者。有关在开发中运行kafka的信息,请参阅kafka服务器页面。

依赖

要使用此功能,请在项目的构建中添加以下内容。

  1. libraryDependencies += lagomScaladslKafkaBroker

导入Lagom Kafka Broker模块时,请记住Lagom Kafka Broker模块需要一个Lagom持久化实现,因此请确保依赖项包括 Lagom Persistence Cassandra or Lagom Persistence JDBC

配置

添加依赖项后,还需要在Application上混合LagomKafkaComponents特质,以确保它在运行时处于启用和可用状态。
Lagom Kafka客户端实现是使用Alpakka Kafka构建的。Alpakka Kafka库包装了官方的Apache Java Kafka客户端,并公开了一个基于(Akka)流的API,用于向Kafka发布/订阅消息。因此,我们实际上有三个库在起作用,每个库都公开自己的配置。让我们从位于顶层的Lagom Kafka客户端开始,探索每一层公开的配置属性。

Lagom Kafka 客户端

  1. lagom.broker.kafka {
  2. # The name of the Kafka service to look up out of the service locator.
  3. # If this is an empty string, then a service locator lookup will not be done,
  4. # and the brokers configuration will be used instead.
  5. service-name = "kafka_native"
  6. service-name = ${?KAFKA_SERVICE_NAME}
  7. # The URLs of the Kafka brokers. Separate each URL with a comma.
  8. # This will be ignored if the service-name configuration is non empty.
  9. brokers = ${lagom.broker.defaults.kafka.brokers}
  10. # A mapping of Lagom topic id to real Kafka topic name.
  11. # For example:
  12. # topic-name-mappings {
  13. # topic-id = kafka-topic-name
  14. # }
  15. topic-name-mappings {
  16. }
  17. client {
  18. default {
  19. # how long should we wait when retrieving the last known offset
  20. offset-timeout = 5s
  21. # Exponential backoff for failures
  22. failure-exponential-backoff {
  23. # minimum (initial) duration until processor is started again
  24. # after failure
  25. min = 3s
  26. # the exponential back-off is capped to this duration
  27. max = 30s
  28. # additional random delay is based on this factor
  29. random-factor = 0.2
  30. }
  31. }
  32. # configuration used by the Lagom Kafka producer
  33. producer = ${lagom.broker.kafka.client.default}
  34. producer.role = ""
  35. # configuration used by the Lagom Kafka consumer
  36. consumer {
  37. offset-timeout = ${lagom.broker.kafka.client.default.offset-timeout}
  38. failure-exponential-backoff = ${lagom.broker.kafka.client.default.failure-exponential-backoff}
  39. # The number of offsets that will be buffered to allow the consumer flow to
  40. # do its own buffering. This should be set to a number that is at least as
  41. # large as the maximum amount of buffering that the consumer flow will do,
  42. # if the consumer buffer buffers more than this, the offset buffer will
  43. # backpressure and cause the stream to stop.
  44. offset-buffer = 100
  45. # Number of messages batched together by the consumer before the related messages'
  46. # offsets are committed to Kafka.
  47. # By increasing the batching-size you are trading speed with the risk of having
  48. # to re-process a larger number of messages if a failure occurs.
  49. # The value provided must be strictly greater than zero.
  50. batching-size = 20
  51. # Interval of time waited by the consumer before the currently batched messages'
  52. # offsets are committed to Kafka.
  53. # This parameter is useful to ensure that messages' offsets are always committed
  54. # within a fixed amount of time.
  55. # The value provided must be strictly greater than zero.
  56. batching-interval = 5 seconds
  57. # Parallelsim for async committing to Kafka
  58. # The value provided must be strictly greater than zero.
  59. batching-parallelism = 3
  60. }
  61. }
  62. }

首先,请注意,service-name默认设置为“kafka_native”。此属性定义如何在服务定位器中查找kafka broker URI(自v1.3.1起)。如果选择,可以通过将服务名称lagom.broker.kafka.brokers设置为空字符串并传递Kafka代理的位置来禁用查找。此设置映射到Kafka的boot-strap server列表,因此只需指定少数代理,因为其余代理将被动态发现。在生产中,通常需要至少有两名消息代理节点来保持可伸缩性。确保用逗号分隔每个代理URL。
其次,我们有特定于发布者和订阅者的配置。拉贡。经纪人卡夫卡。lagom.broker.kafka.client.default.failure-exponential-backoff定义了发布服务器或订阅服务器流失败时的配置。具体来说,它允许您配置在重新启动发布/消费流之前等待的退避时间。故障的发生可能有不同的原因,例如,可能是由于应用程序错误或网络错误。与原因无关,Lagom将继续尝试重新启动流(同时在每次失败的重试之间等待的时间越来越长)。如您所见,发布者和订阅者都使用相同的默认值,但可以为它们中的任何一个设置不同的值。
第三,使用者还有几个配置键,允许您决定读取端偏移量在数据存储中保留的频率。在调整这些值时,您需要权衡性能(每次使用消息时存储偏移量可能会很昂贵),如果出现故障,则可能需要重新处理某些消息。
第四,Lagom默认使用主题id作为Kafka主题名。但有时,Kafka主题可以根据不同环境的命名约定进行不同的命名。在这种情况下,可以使用topic-name-mappings属性将Lagom主题id映射到真正的Kafka主题名上。

Alpakka Kafka 配置

有关可用配置参数的信息,请参阅《Alpakka Kafka生产者设置》《Alpakka Kafka消费者设置》。使用Alpakka Kafka时,请参考生产注意事项

Apache Java Kafka Client

请参阅Producer Configs文档,了解发布服务器的公开配置。同时,对于订阅服务器,请参见 New Consumer Configs。唯一需要注意的是,如果需要更改Java Kafka客户端提供的任何配置的值,必须预先设置想要更改的配置akka.kafka.consumer.kafka-clients,对于消费者,akka.kafka.producer.kafka-clients。例如,假设您想更改消费者的request.timeout.ms,您应该在服务的application.conf中添加以下内容:

  1. akka.kafka.producer.kafka-clients {
  2. request.timeout.ms = 30000
  3. }

仅订阅服务

有时,您将实现一个Lagom服务,该服务只使用Kafka主题。在这种情况下,您可以单独导入Lagom Kafka客户端(而不是导入Lagom Kafka代理和Lagom持久性实现)。

  1. libraryDependencies += lagomScaladslKafkaClient

添加依赖项后,还需要在Application上混入LagomKafkaClientComponents特质,以确保它在运行时处于启用和可用状态。
如果/当您的订阅服务演变为包含将数据发布到主题的功能时,您将需要依赖Lagom Kafka Broker,并删除对Lagom Kafka客户端的依赖。Lagom Kafka Broker模块包括Lagom Kafka客户端模块。

从第三方消费主题

您可能希望Lagom服务消费来自非Lagom服务上生成的数据。在这种情况下,如服务客户端部分所述,您可以在Lagom项目中创建third-party-service-api模块。该模块将包含一个服务描述符,声明您将从中使用的主题。一旦实现了ThirdPartyService接口和相关类,就应该添加third-party-service-api,作为你的fancy-service-impl的依赖。最后,您可以使用ThirdPartyService 中描述的主题,如Subscribe to a topic部分所述。