Lagom提供了使用Kafka的MessageBroker API的实现。以下各节介绍如何在构建中添加依赖项,以及如何配置和调整主题发布者和订阅者。有关在开发中运行kafka的信息,请参阅kafka服务器页面。
依赖
要使用此功能,请在项目的构建中添加以下内容。
libraryDependencies += lagomScaladslKafkaBroker
导入Lagom Kafka Broker模块时,请记住Lagom Kafka Broker模块需要一个Lagom持久化实现,因此请确保依赖项包括 Lagom Persistence Cassandra or Lagom Persistence JDBC。
配置
添加依赖项后,还需要在Application
上混合LagomKafkaComponents
特质,以确保它在运行时处于启用和可用状态。
Lagom Kafka客户端实现是使用Alpakka Kafka构建的。Alpakka Kafka
库包装了官方的Apache Java Kafka客户端,并公开了一个基于(Akka)流的API,用于向Kafka发布/订阅消息。因此,我们实际上有三个库在起作用,每个库都公开自己的配置。让我们从位于顶层的Lagom Kafka客户端开始,探索每一层公开的配置属性。
Lagom Kafka 客户端
lagom.broker.kafka {
# The name of the Kafka service to look up out of the service locator.
# If this is an empty string, then a service locator lookup will not be done,
# and the brokers configuration will be used instead.
service-name = "kafka_native"
service-name = ${?KAFKA_SERVICE_NAME}
# The URLs of the Kafka brokers. Separate each URL with a comma.
# This will be ignored if the service-name configuration is non empty.
brokers = ${lagom.broker.defaults.kafka.brokers}
# A mapping of Lagom topic id to real Kafka topic name.
# For example:
# topic-name-mappings {
# topic-id = kafka-topic-name
# }
topic-name-mappings {
}
client {
default {
# how long should we wait when retrieving the last known offset
offset-timeout = 5s
# Exponential backoff for failures
failure-exponential-backoff {
# minimum (initial) duration until processor is started again
# after failure
min = 3s
# the exponential back-off is capped to this duration
max = 30s
# additional random delay is based on this factor
random-factor = 0.2
}
}
# configuration used by the Lagom Kafka producer
producer = ${lagom.broker.kafka.client.default}
producer.role = ""
# configuration used by the Lagom Kafka consumer
consumer {
offset-timeout = ${lagom.broker.kafka.client.default.offset-timeout}
failure-exponential-backoff = ${lagom.broker.kafka.client.default.failure-exponential-backoff}
# The number of offsets that will be buffered to allow the consumer flow to
# do its own buffering. This should be set to a number that is at least as
# large as the maximum amount of buffering that the consumer flow will do,
# if the consumer buffer buffers more than this, the offset buffer will
# backpressure and cause the stream to stop.
offset-buffer = 100
# Number of messages batched together by the consumer before the related messages'
# offsets are committed to Kafka.
# By increasing the batching-size you are trading speed with the risk of having
# to re-process a larger number of messages if a failure occurs.
# The value provided must be strictly greater than zero.
batching-size = 20
# Interval of time waited by the consumer before the currently batched messages'
# offsets are committed to Kafka.
# This parameter is useful to ensure that messages' offsets are always committed
# within a fixed amount of time.
# The value provided must be strictly greater than zero.
batching-interval = 5 seconds
# Parallelsim for async committing to Kafka
# The value provided must be strictly greater than zero.
batching-parallelism = 3
}
}
}
首先,请注意,service-name
默认设置为“kafka_native”。此属性定义如何在服务定位器中查找kafka broker URI(自v1.3.1起)。如果选择,可以通过将服务名称lagom.broker.kafka.brokers
设置为空字符串并传递Kafka代理的位置来禁用查找。此设置映射到Kafka的boot-strap server列表,因此只需指定少数代理,因为其余代理将被动态发现。在生产中,通常需要至少有两名消息代理节点来保持可伸缩性。确保用逗号分隔每个代理URL。
其次,我们有特定于发布者和订阅者的配置。拉贡。经纪人卡夫卡。lagom.broker.kafka.client.default.failure-exponential-backoff
定义了发布服务器或订阅服务器流失败时的配置。具体来说,它允许您配置在重新启动发布/消费流之前等待的退避时间。故障的发生可能有不同的原因,例如,可能是由于应用程序错误或网络错误。与原因无关,Lagom将继续尝试重新启动流(同时在每次失败的重试之间等待的时间越来越长)。如您所见,发布者和订阅者都使用相同的默认值,但可以为它们中的任何一个设置不同的值。
第三,使用者还有几个配置键,允许您决定读取端偏移量在数据存储中保留的频率。在调整这些值时,您需要权衡性能(每次使用消息时存储偏移量可能会很昂贵),如果出现故障,则可能需要重新处理某些消息。
第四,Lagom默认使用主题id作为Kafka主题名。但有时,Kafka主题可以根据不同环境的命名约定进行不同的命名。在这种情况下,可以使用topic-name-mappings
属性将Lagom主题id映射到真正的Kafka主题名上。
Alpakka Kafka 配置
有关可用配置参数的信息,请参阅《Alpakka Kafka生产者设置》和《Alpakka Kafka消费者设置》。使用Alpakka Kafka
时,请参考生产注意事项。
Apache Java Kafka Client
请参阅Producer Configs文档,了解发布服务器的公开配置。同时,对于订阅服务器,请参见 New Consumer Configs。唯一需要注意的是,如果需要更改Java Kafka客户端提供的任何配置的值,必须预先设置想要更改的配置akka.kafka.consumer.kafka-clients
,对于消费者,akka.kafka.producer.kafka-clients
。例如,假设您想更改消费者的request.timeout.ms
,您应该在服务的application.conf
中添加以下内容:
akka.kafka.producer.kafka-clients {
request.timeout.ms = 30000
}
仅订阅服务
有时,您将实现一个Lagom服务,该服务只使用Kafka主题。在这种情况下,您可以单独导入Lagom Kafka客户端(而不是导入Lagom Kafka代理和Lagom持久性实现)。
libraryDependencies += lagomScaladslKafkaClient
添加依赖项后,还需要在Application
上混入LagomKafkaClientComponents
特质,以确保它在运行时处于启用和可用状态。
如果/当您的订阅服务演变为包含将数据发布到主题的功能时,您将需要依赖Lagom Kafka Broker,并删除对Lagom Kafka客户端的依赖。Lagom Kafka Broker模块包括Lagom Kafka客户端模块。
从第三方消费主题
您可能希望Lagom服务消费来自非Lagom服务上生成的数据。在这种情况下,如服务客户端部分所述,您可以在Lagom项目中创建third-party-service-api
模块。该模块将包含一个服务描述符,声明您将从中使用的主题。一旦实现了ThirdPartyService
接口和相关类,就应该添加third-party-service-api
,作为你的fancy-service-impl
的依赖。最后,您可以使用ThirdPartyService
中描述的主题,如Subscribe to a topic部分所述。