Lagom提供了使用Kafka的MessageBroker API的实现。以下各节介绍如何在构建中添加依赖项,以及如何配置和调整主题发布者和订阅者。有关在开发中运行kafka的信息,请参阅kafka服务器页面。
依赖
要使用此功能,请在项目的构建中添加以下内容。
libraryDependencies += lagomScaladslKafkaBroker
导入Lagom Kafka Broker模块时,请记住Lagom Kafka Broker模块需要一个Lagom持久化实现,因此请确保依赖项包括 Lagom Persistence Cassandra or Lagom Persistence JDBC。
配置
添加依赖项后,还需要在Application上混合LagomKafkaComponents特质,以确保它在运行时处于启用和可用状态。
Lagom Kafka客户端实现是使用Alpakka Kafka构建的。Alpakka Kafka库包装了官方的Apache Java Kafka客户端,并公开了一个基于(Akka)流的API,用于向Kafka发布/订阅消息。因此,我们实际上有三个库在起作用,每个库都公开自己的配置。让我们从位于顶层的Lagom Kafka客户端开始,探索每一层公开的配置属性。
Lagom Kafka 客户端
lagom.broker.kafka {# The name of the Kafka service to look up out of the service locator.# If this is an empty string, then a service locator lookup will not be done,# and the brokers configuration will be used instead.service-name = "kafka_native"service-name = ${?KAFKA_SERVICE_NAME}# The URLs of the Kafka brokers. Separate each URL with a comma.# This will be ignored if the service-name configuration is non empty.brokers = ${lagom.broker.defaults.kafka.brokers}# A mapping of Lagom topic id to real Kafka topic name.# For example:# topic-name-mappings {# topic-id = kafka-topic-name# }topic-name-mappings {}client {default {# how long should we wait when retrieving the last known offsetoffset-timeout = 5s# Exponential backoff for failuresfailure-exponential-backoff {# minimum (initial) duration until processor is started again# after failuremin = 3s# the exponential back-off is capped to this durationmax = 30s# additional random delay is based on this factorrandom-factor = 0.2}}# configuration used by the Lagom Kafka producerproducer = ${lagom.broker.kafka.client.default}producer.role = ""# configuration used by the Lagom Kafka consumerconsumer {offset-timeout = ${lagom.broker.kafka.client.default.offset-timeout}failure-exponential-backoff = ${lagom.broker.kafka.client.default.failure-exponential-backoff}# The number of offsets that will be buffered to allow the consumer flow to# do its own buffering. This should be set to a number that is at least as# large as the maximum amount of buffering that the consumer flow will do,# if the consumer buffer buffers more than this, the offset buffer will# backpressure and cause the stream to stop.offset-buffer = 100# Number of messages batched together by the consumer before the related messages'# offsets are committed to Kafka.# By increasing the batching-size you are trading speed with the risk of having# to re-process a larger number of messages if a failure occurs.# The value provided must be strictly greater than zero.batching-size = 20# Interval of time waited by the consumer before the currently batched messages'# offsets are committed to Kafka.# This parameter is useful to ensure that messages' offsets are always committed# within a fixed amount of time.# The value provided must be strictly greater than zero.batching-interval = 5 seconds# Parallelsim for async committing to Kafka# The value provided must be strictly greater than zero.batching-parallelism = 3}}}
首先,请注意,service-name默认设置为“kafka_native”。此属性定义如何在服务定位器中查找kafka broker URI(自v1.3.1起)。如果选择,可以通过将服务名称lagom.broker.kafka.brokers设置为空字符串并传递Kafka代理的位置来禁用查找。此设置映射到Kafka的boot-strap server列表,因此只需指定少数代理,因为其余代理将被动态发现。在生产中,通常需要至少有两名消息代理节点来保持可伸缩性。确保用逗号分隔每个代理URL。
其次,我们有特定于发布者和订阅者的配置。拉贡。经纪人卡夫卡。lagom.broker.kafka.client.default.failure-exponential-backoff定义了发布服务器或订阅服务器流失败时的配置。具体来说,它允许您配置在重新启动发布/消费流之前等待的退避时间。故障的发生可能有不同的原因,例如,可能是由于应用程序错误或网络错误。与原因无关,Lagom将继续尝试重新启动流(同时在每次失败的重试之间等待的时间越来越长)。如您所见,发布者和订阅者都使用相同的默认值,但可以为它们中的任何一个设置不同的值。
第三,使用者还有几个配置键,允许您决定读取端偏移量在数据存储中保留的频率。在调整这些值时,您需要权衡性能(每次使用消息时存储偏移量可能会很昂贵),如果出现故障,则可能需要重新处理某些消息。
第四,Lagom默认使用主题id作为Kafka主题名。但有时,Kafka主题可以根据不同环境的命名约定进行不同的命名。在这种情况下,可以使用topic-name-mappings属性将Lagom主题id映射到真正的Kafka主题名上。
Alpakka Kafka 配置
有关可用配置参数的信息,请参阅《Alpakka Kafka生产者设置》和《Alpakka Kafka消费者设置》。使用Alpakka Kafka时,请参考生产注意事项。
Apache Java Kafka Client
请参阅Producer Configs文档,了解发布服务器的公开配置。同时,对于订阅服务器,请参见 New Consumer Configs。唯一需要注意的是,如果需要更改Java Kafka客户端提供的任何配置的值,必须预先设置想要更改的配置akka.kafka.consumer.kafka-clients,对于消费者,akka.kafka.producer.kafka-clients。例如,假设您想更改消费者的request.timeout.ms,您应该在服务的application.conf中添加以下内容:
akka.kafka.producer.kafka-clients {request.timeout.ms = 30000}
仅订阅服务
有时,您将实现一个Lagom服务,该服务只使用Kafka主题。在这种情况下,您可以单独导入Lagom Kafka客户端(而不是导入Lagom Kafka代理和Lagom持久性实现)。
libraryDependencies += lagomScaladslKafkaClient
添加依赖项后,还需要在Application上混入LagomKafkaClientComponents特质,以确保它在运行时处于启用和可用状态。
如果/当您的订阅服务演变为包含将数据发布到主题的功能时,您将需要依赖Lagom Kafka Broker,并删除对Lagom Kafka客户端的依赖。Lagom Kafka Broker模块包括Lagom Kafka客户端模块。
从第三方消费主题
您可能希望Lagom服务消费来自非Lagom服务上生成的数据。在这种情况下,如服务客户端部分所述,您可以在Lagom项目中创建third-party-service-api模块。该模块将包含一个服务描述符,声明您将从中使用的主题。一旦实现了ThirdPartyService接口和相关类,就应该添加third-party-service-api,作为你的fancy-service-impl的依赖。最后,您可以使用ThirdPartyService 中描述的主题,如Subscribe to a topic部分所述。
