Lagom Message Broker API提供了一个分布式发布-订阅模型,服务可以使用该模型通过主题共享数据。主题只是一个允许服务推送和拉取数据的渠道。在Lagom中,主题是强类型的,因此订阅者和制作者都可以提前知道预期的数据流将是什么。

声明一个主题

要将数据发布到主题,服务需要在其服务描述符中声明主题。

  1. import com.lightbend.lagom.scaladsl.api.broker.Topic
  2. import com.lightbend.lagom.scaladsl.api.Service
  3. import com.lightbend.lagom.scaladsl.api.ServiceCall
  4. import play.api.libs.json.Format
  5. import play.api.libs.json.Json
  6. object HelloService {
  7. val TOPIC_NAME = "greetings"
  8. }
  9. trait HelloService extends Service {
  10. final override def descriptor = {
  11. import Service._
  12. named("brokerdocs")
  13. .withCalls(
  14. pathCall("/api/hello/:id", hello _),
  15. pathCall("/api/hello/:id", useGreeting _)
  16. )
  17. .withTopics(
  18. topic(HelloService.TOPIC_NAME, greetingsTopic)
  19. )
  20. .withAutoAcl(true)
  21. }
  22. // The topic handle
  23. def greetingsTopic(): Topic[GreetingMessage]
  24. def hello(id: String): ServiceCall[NotUsed, String]
  25. def useGreeting(id: String): ServiceCall[GreetingMessage, Done]
  26. }

声明主题的语法与已用于定义服务端点的语法类似。[Descriptor.withTopics](https://www.lagomframework.com/documentation/1.6.x/scala/api/com/lightbend/lagom/scaladsl/api/Descriptor.html)方法接受一系列主题调用,每个主题调用都可以通过Service对象上的 topic 方法定义。后者采用主题名称(即主题标识符)和返回Topic实例的方法引用。
默认情况下,流经主题的数据被序列化为JSON。当然,可以使用不同的序列化格式,可以通过为服务描述符中定义的每个主题提供隐式消息序列化程序来实现。这与呈现服务描述符时在消息序列化中描述的方法相同。您可能还需要查看消息序列化程序文档

主题分区

Kafka将在多个分区中分发特定主题的消息,以便该主题可以扩展。发送到不同分区的消息可能会被无序处理,因此,如果要发布的消息的顺序很重要,则需要确保消息以保持顺序的方式进行分区。通常,这意味着确保特定实体的每条消息都进入同一分区。
Lagom允许您配置分区key策略,从消息中提取分区key。然后,Kafka将使用该键帮助决定将每条消息发送到哪个分区。通过向分区传递[partitionKeyStrategy](https://www.lagomframework.com/documentation/1.6.x/scala/api/com/lightbend/lagom/scaladsl/api/broker/kafka/KafkaProperties$.html#partitionKeyStrategy[Message]:com.lightbend.lagom.scaladsl.api.Descriptor.Property[Message,com.lightbend.lagom.scaladsl.api.broker.kafka.PartitionKeyStrategy[Message]]),可以使用[PartitionKeyStrategy](https://www.lagomframework.com/documentation/1.6.x/scala/api/com/lightbend/lagom/scaladsl/api/broker/kafka/PartitionKeyStrategy.html)属性选择该分区:

  1. named("blogpostservice")
  2. .withTopics(
  3. topic("blogposts", blogPostEvents)
  4. .addProperty(
  5. KafkaProperties.partitionKeyStrategy,
  6. PartitionKeyStrategy[BlogPostEvent](_.postId)
  7. )
  8. )

实现一个主题

Lagom旨在生成的消息的主要来源是持久实体事件。与其为了响应发生的特定事件而以临时方式发布事件,不如从持久性实体中获取事件流,并将其调整为发送到消息代理的消息流。通过这种方式,您可以确保发布者和使用者至少处理一次事件,这允许您在整个系统中保证非常强的一致性。
Lagom的[TopicProducer](https://www.lagomframework.com/documentation/1.6.x/scala/api/com/lightbend/lagom/scaladsl/broker/TopicProducer$.html)helper提供了两种发布持久实体事件流的方法,[singleStreamWithOffset](https://www.lagomframework.com/documentation/1.6.x/scala/api/com/lightbend/lagom/scaladsl/broker/TopicProducer$.html) 用于非分片读取端事件流,[taggedStreamWithOffset](https://www.lagomframework.com/documentation/1.6.x/scala/api/com/lightbend/lagom/scaladsl/broker/TopicProducer$.html)用于分片读取端事件流。这两种方法都采用回调,该回调采用主题生成器发布的最后一个偏移量,并允许通过PersistentEntityRegistry.eventStream方法从该偏移量恢复事件流,用于获取读侧数据流。有关读取端流的更多详细信息,请参阅 Persistent Read-Side’s
对于singleStreamWithOffset方法,Lagom将确保主题生成器仅在集群的一个节点上运行,或者使用taggedStreamWithOffset方法将标签均匀地分布在集群中,以分配发布负载。
下面是发布单个非分片事件流的示例:

  1. override def greetingsTopic(): Topic[GreetingMessage] =
  2. TopicProducer.singleStreamWithOffset { fromOffset =>
  3. persistentEntityRegistry
  4. .eventStream(HelloEventTag.INSTANCE, fromOffset)
  5. .map(ev => (convertEvent(ev), ev.offset))
  6. }
  7. private def convertEvent(helloEvent: EventStreamElement[HelloEvent]): GreetingMessage = {
  8. helloEvent.event match {
  9. case GreetingMessageChanged(msg) => GreetingMessage(msg)
  10. }
  11. }

请注意,您传递给主题生成器的读端事件流在默认情况下会在服务启动后立即“激活”。可以使用Projections API更改默认行为。这意味着您的服务保留的所有事件最终将发布到连接的主题。此外,您通常希望将域事件映射到其他类型,以便其他服务不会与其他服务的域模型事件紧密耦合。换句话说,域模型事件是服务的一个实现细节,不应该泄露。

过滤事件

您可能不希望发布服务保留的所有事件。如果是这种情况,则可以过滤事件流:

  1. override def greetingsTopic(): Topic[GreetingMessage] =
  2. TopicProducer.singleStreamWithOffset { fromOffset =>
  3. persistentEntityRegistry
  4. .eventStream(HelloEventTag.INSTANCE, fromOffset)
  5. .mapConcat(filterEvents)
  6. }
  7. private def filterEvents(ev: EventStreamElement[HelloEvent]) = ev match {
  8. // Only publish greetings where the message is "Hello".
  9. case ev @ EventStreamElement(_, GreetingMessageChanged("Hello"), offset) =>
  10. immutable.Seq((convertEvent(ev), offset))
  11. case _ => Nil
  12. }

过滤事件时,TopicProducer不会发布该事件。它也不会使偏移提前。如果TopicProducer重新启动,它将从最后一个偏移量恢复。如果过滤了大量事件,那么最后的偏移量可能会远远落后,因此所有这些事件都将被重新处理并再次过滤掉。您需要意识到这可能会发生,并保持连续过滤元素的数量相对较低,同时尽可能减少执行过滤所需的时间和资源。

偏移量存储

Lagom将使用您配置的持久性API提供程序来存储事件流的偏移量。要阅读有关偏移存储的更多信息,请参阅Cassandra偏移文档、JDBC数据库偏移文档和Slick数据库偏移文档。

订阅一个主题

要订阅一个主题,服务只需要对感兴趣的话题调用[Topic.subscribe](https://www.lagomframework.com/documentation/1.6.x/scala/api/com/lightbend/lagom/scaladsl/api/broker/Topic.html)。例如,假设一个服务想要收集HelloService发布的所有问候消息。您应该做的第一件事是注入HelloService(请参阅“使用服务客户端”一节,以获取有关将客户端使用到另一个服务的完整解释)。然后,订阅“问候语”主题,并将逻辑应用于发布到该主题的每条消息。

  1. helloService
  2. .greetingsTopic()
  3. .subscribe // <-- you get back a Subscriber instance
  4. .atLeastOnce(
  5. Flow.fromFunction(doSomethingWithTheMessage)
  6. )

调用Topic.subscribe 您将获得一个Subscriber实例。在上面的代码片段中,我们使用至少一次传递语义订阅了greetings主题。这意味着发布到greetings主题的每条消息至少会收到一次,但可能会收到更多。订阅者还提供了[atMostOnceSource](https://www.lagomframework.com/documentation/1.6.x/scala/api/com/lightbend/lagom/scaladsl/api/broker/Subscriber.html#atMostOnceSource:akka.stream.scaladsl.Source[Message,_]),它最多提供一次交付语义。如果有疑虑,最好使用至少一次交付语义。
最后,通过Subscriber.withGroupId:com.lightbend.lagom.scaladsl.api.broker.Subscriber[Message]).将订阅服务器分组在一起。订阅组允许集群中的多个节点使用消息流,同时确保集群中的每个节点只处理一次消息。如果没有订阅组,特定服务的所有节点都将获取流中的每条消息,从而导致它们的处理重复。默认情况下,Lagom将使用与该主题的服务同名的组id。

消费消息元数据

您的代理实现可能会为您可以使用的消息提供额外的元数据。这可以通过调用Subscriber.withMetadata方法来访问,该方法将返回一个将消息包装到Message中的订阅者实例。

  1. import com.lightbend.lagom.scaladsl.api.broker.Message
  2. import com.lightbend.lagom.scaladsl.broker.kafka.KafkaMetadataKeys
  3. helloService
  4. .greetingsTopic()
  5. .subscribe
  6. .withMetadata
  7. .atLeastOnce(
  8. Flow[Message[GreetingMessage]].map { msg =>
  9. val greetingMessage = msg.payload
  10. val messageKey = msg.messageKeyAsString
  11. val kafkaHeaders = msg.get(KafkaMetadataKeys.Headers)
  12. println(s"Message: $greetingMessage Key: $messageKey Headers: $kafkaHeaders")
  13. Done
  14. }
  15. )

messageKeyAsString方法是为了方便访问消息密钥而提供的。可以使用 get:Metadata) 方法访问其他属性。可以在这里找到Kafka可用的元数据密钥的完整列表。

跳过消息

您可能只想将逻辑应用于主题发布的消息的一个子集,而跳过其他消息。传递给Subscriber.atLeastOnce:scala.concurrent.Future[akka.Done])的Flow对于接收到的每个元素,必须只发送一个Done元素。它还必须以接收元素的相同顺序发射它们。这意味着您不能在Flow上使用filtercollect等方法,否则会删除元素。
实现这一点的最简单方法是使用通配函数,该函数在应该跳过的元素时返回Done。例如:

  1. helloService
  2. .greetingsTopic()
  3. .subscribe
  4. .atLeastOnce(
  5. Flow[GreetingMessage].map {
  6. case msg @ GreetingMessage("Kia ora") => doSomethingWithTheMessage(msg)
  7. case _ => Done // Skip all messages where the message is not "Kia ora".
  8. }
  9. )

多态事件流

通常情况下,您会希望向特定主题发布多种类型的事件。这可以通过创建每个事件实现的接口来实现。为了成功地从JSON数据序列化和反序列化这些事件,您必须在数据的JSON表示中包含一些额外的信息。
例如,考虑一个博客帖子创建事件和博客发布事件的情况。以下是您的事件结构:

  1. sealed trait BlogPostEvent {
  2. def postId: String
  3. }
  4. case class BlogPostCreated(postId: String, title: String) extends BlogPostEvent
  5. case class BlogPostPublished(postId: String) extends BlogPostEvent

这可能是你使用Play JSON格式化程序的样子:

  1. case object BlogPostCreated {
  2. implicit val blogPostCreatedFormat: Format[BlogPostCreated] = Json.format
  3. }
  4. case object BlogPostPublished {
  5. implicit val blogPostPublishedFormat: Format[BlogPostPublished] = Json.format
  6. }

您必须实现一个定制的消息序列化程序,在每个JSON消息上添加额外的信息,以便在接收端知道使用什么类型的事件来反序列化。BlogPostCreated事件的结果JSON如下所示:

  1. {
  2. "postId": "23",
  3. "title": "new post",
  4. "event_type": "postCreated"
  5. }

BlogPostPublished事件的JSON如下所示:

  1. {
  2. "postId": "23",
  3. "event_type": "postPublished"
  4. }

你可以使用 Play JSON transformers:

  1. object BlogPostEvent {
  2. implicit val reads: Reads[BlogPostEvent] = {
  3. (__ \ "event_type").read[String].flatMap {
  4. case "postCreated" => implicitly[Reads[BlogPostCreated]].map(identity)
  5. case "postPublished" => implicitly[Reads[BlogPostPublished]].map(identity)
  6. case other => Reads(_ => JsError(s"Unknown event type $other"))
  7. }
  8. }
  9. implicit val writes: Writes[BlogPostEvent] = Writes { event =>
  10. val (jsValue, eventType) = event match {
  11. case m: BlogPostCreated => (Json.toJson(m)(BlogPostCreated.blogPostCreatedFormat), "postCreated")
  12. case m: BlogPostPublished => (Json.toJson(m)(BlogPostPublished.blogPostPublishedFormat), "postPublished")
  13. }
  14. jsValue.transform(__.json.update((__ \ 'event_type).json.put(JsString(eventType)))).get
  15. }
  16. }

这种方法会增加维护成本。幸运的是,有些库扩展了Play JSON特性,并支持代数数据类型序列化,如Play JSON Derived Codecs