本节介绍了使用Akka Persistence Typed并遵循Lagom采用的CQRS原则,按照域驱动设计中的定义,对聚合进行建模的所有步骤。虽然Akka Persistence Typed为构建事件源参与者提供了API,但这并不一定适用于CQRS聚合。为了构建CQRS应用程序,我们需要在设计中使用一些规则。
我们使用一个简化的购物车示例来指导您完成这个过程。您可以在我们的样例代码库中找到一个完整的购物车样例。

定义模型

首先,根据命令、事件和状态定义模型。

State状态模型

购物车的状态模型定义如下:

  1. final case class ShoppingCart(
  2. items: Map[String, Int],
  3. // checkedOutTime defines if cart was checked-out or not:
  4. // case None, cart is open
  5. // case Some, cart is checked-out
  6. checkedOutTime: Option[Instant] = None
  7. )

请注意,我们使用案例类ShoppingCart对其进行建模,当从一个状态(打开的购物车)转换到另一个状态(签出的购物车)时,可以设置checkedOutTime。正如我们将在后面看到的,每个状态对它可以处理的命令、它可以持久化的事件以及它可以转换到的其他状态进行编码。

注:上面显示的示例是一个简化案例。每当你的模型经历不同的状态转换时,更好的方法是为每个状态都有一个特性和它的扩展。有关信息请参见样式指南中的示例。

命令和响应模型

接下来,我们定义可以发送的命令。
每个命令都通过replyTo: ActorRef[R]字段定义一个应答,其中R是将发送回调用方的应答类型。回复用于在命令被接受或拒绝时进行回复,或读取聚合服务的数据时进行回复(即:只读命令),也有可能两者兼而有之。例如,如果命令成功,它将返回一些更新的数据;如果失败,它将返回一条被拒绝的消息。或者你可以使用没有任何回复的命令(如:即发即弃)。不过,这是CQRS聚合建模中不太常见的模式,本文中没有介绍。

  1. trait CommandSerializable
  2. sealed trait ShoppingCartCommand extends CommandSerializable
  3. final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[Confirmation]) extends ShoppingCartCommand
  4. final case class Checkout(replyTo: ActorRef[Confirmation]) extends ShoppingCartCommand
  5. final case class Get(replyTo: ActorRef[Summary]) extends ShoppingCartCommand

在Akka类型中,与Akka classic和Lagom持久化不同,不可能向调用方返回异常。参与者和调用者之间的所有通信都必须通过命令中传递的replyTo:ActorRef[R]完成。因此,如果你想发出拒绝信号,你必须在回复协议中对其进行编码。
上述命令使用的回复定义如下:

  1. sealed trait Confirmation
  2. final case class Accepted(summary: Summary) extends Confirmation
  3. final case class Rejected(reason: String) extends Confirmation
  4. final case class Summary(items: Map[String, Int], checkedOut: Boolean)

这里有两种不同的回复:ConfirmationSummary。当我们想要修改状态时,使用Confirmation。修改请求可以被AcceptedRejected。然后,使用Summary来查询我们想看的购物车的状态。

注意:请记住,Summary不是购物车本身的数据结构,而是我们想要向外部世界展示的表现。将聚合的内部状态保持为私有是一种很好的做法,因为它允许内部状态和公开的API独立发展。

事件模型

接下来,我们定义需要持久化的事件。这些事件必须扩展自Lagom的AggregateEvent。这对于标记事件很重要。我们将在下文中标记事件小节介绍它。

  1. sealed trait ShoppingCartEvent extends AggregateEvent[ShoppingCartEvent] {
  2. override def aggregateTag: AggregateEventTagger[ShoppingCartEvent] = ShoppingCartEvent.Tag
  3. }
  4. final case class ItemAdded(itemId: String, quantity: Int) extends ShoppingCartEvent
  5. final case class CartCheckedOut(eventTime: Instant) extends ShoppingCartEvent

定义命令处理器

一旦根据命令、回复、事件和状态定义了模型,就需要指定业务规则。命令处理程序定义如何处理每个传入的命令,必须应用哪些验证,以及哪些事件将被持久化。
你可以用不同的方式编码。建议的样式是在状态类中添加命令处理程序。对于ShoppingCart,我们可以根据两种可能的状态定义命令处理程序:

  1. def applyCommand(cmd: ShoppingCartCommand): ReplyEffect[ShoppingCartEvent, ShoppingCart] =
  2. if (isOpen) {
  3. cmd match {
  4. case AddItem(itemId, quantity, replyTo) => onAddItem(itemId, quantity, replyTo)
  5. case Checkout(replyTo) => onCheckout(replyTo)
  6. case Get(replyTo) => onGet(replyTo)
  7. }
  8. } else {
  9. cmd match {
  10. case AddItem(_, _, replyTo) => Effect.reply(replyTo)(Rejected("Cannot add an item to a checked-out cart"))
  11. case Checkout(replyTo) => Effect.reply(replyTo)(Rejected("Cannot checkout a checked-out cart"))
  12. case Get(replyTo) => onGet(replyTo)
  13. }
  14. }
  15. private def onAddItem(
  16. itemId: String,
  17. quantity: Int,
  18. replyTo: ActorRef[Confirmation]
  19. ): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
  20. if (items.contains(itemId))
  21. Effect.reply(replyTo)(Rejected(s"Item '$itemId' was already added to this shopping cart"))
  22. else if (quantity <= 0)
  23. Effect.reply(replyTo)(Rejected("Quantity must be greater than zero"))
  24. else
  25. Effect
  26. .persist(ItemAdded(itemId, quantity))
  27. .thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))
  28. }
  29. private def onCheckout(replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
  30. if (items.isEmpty)
  31. Effect.reply(replyTo)(Rejected("Cannot checkout an empty shopping cart"))
  32. else
  33. Effect
  34. .persist(CartCheckedOut(Instant.now()))
  35. .thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))
  36. }
  37. private def onGet(replyTo: ActorRef[Summary]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
  38. Effect.reply(replyTo)(toSummary(shoppingCart = this))
  39. }
  40. private def toSummary(shoppingCart: ShoppingCart): Summary = {
  41. Summary(shoppingCart.items, shoppingCart.checkedOut)
  42. }

注意:当然可以按您认为更方便的方式组织命令处理程序,但我们建议使用onCommand模式,因为它可以帮助保持每个命令的处理逻辑相互隔离。

命令处理程序是模型的核心。它们对模型的业务规则进行编码,并充当模型一致性的守护者。命令处理程序必须首先验证传入命令是否可以应用于当前模型状态。在验证成功的情况下,一个或多个事件变化会被持久化。一旦事件被持久化,它们就会从当前状态迁移到下一个有效的状态。
由于聚合服务旨在为一致性边界建模,因此不建议使用范围内不可用的数据验证命令。任何决策都应该完全基于命令中传递的数据和当前聚合的状态。任何外部调用都应该被视为对聚合服务的一种破坏,因为这意味着聚合服务不能完全控制它应该保护的不变量。
回复有两种方式:Effect.reply and Effect.persist(...).thenReply。第一个选项在生效时直接可用,在回复时应在不保留任何事件的情况下使用。在这种情况下,您可以使用作用域中的可用状态,因为它保证不会更改。第二种变体应该在持久化一个或多个事件时使用。然后,您可以在用于定义回复的函数上改变到更新的状态。
您可以在命令处理程序中运行副作用函数。请参考 Akka文档 有关详细信息。

定义事件处理器

事件处理程序通过响应聚合事件来改变聚合状态。事件处理程序必须是纯函数,因为它们将在实例化聚合和重放事件日志时使用。与命令处理程序类似,建议将它们添加到状态类中。

  1. def applyEvent(evt: ShoppingCartEvent): ShoppingCart =
  2. evt match {
  3. case ItemAdded(itemId, quantity) => onItemAdded(itemId, quantity)
  4. case CartCheckedOut(checkedOutTime) => onCartCheckedOut(checkedOutTime)
  5. }
  6. private def onItemAdded(itemId: String, quantity: Int): ShoppingCart =
  7. copy(items = items + (itemId -> quantity))
  8. private def onCartCheckedOut(checkedOutTime: Instant): ShoppingCart = {
  9. copy(checkedOutTime = Option(checkedOutTime))
  10. }

EventSourcingBehaviour-模型组合

对所有模型进行编码后,下一步是将所有片段组合在一起,这样我们就可以让它运行。为此,请定义EventSourcedBehavior。在建模CQRS聚合时,建议使用withEnforcedReplies定义EventSourcedBehavior。使用强制回复要求命令处理程序返回ReplyEffect,通过这样来迫使开发人员给出明确回复。

  1. EventSourcedBehavior
  2. .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
  3. persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
  4. emptyState = ShoppingCart.empty,
  5. commandHandler = (cart, cmd) => cart.applyCommand(cmd),
  6. eventHandler = (cart, evt) => cart.applyEvent(evt)
  7. )

EventSourcedBehavior.withEnforcedReplies 有四个字段需要定义: persistenceId, emptyState, commandHandler 和eventHandler。
persistenceId定义将在事件日志中使用的id。id由名称(例如entityContext.entityTypeKey.name)和业务id(例如entityContext.entityId)组成。默认情况下,这两个值将使用“|”连接(例如,“ShoppingCart | 123456”)。有关更多详细信息,请参阅Akka文档
emptyState是日志为空时使用的状态。这是初始状态:

  1. val empty: ShoppingCart = ShoppingCart(items = Map.empty)

commandHandler是一个(State,Command)=>ReplyEffect[Event,State]的函数。在本例中,它是在passed状态下使用applyCommand定义的。同样,eventHandler是一个(State,Event)=>Event的函数,并在passed状态中定义。

改变行为-有限状态机

如果你熟悉Akka Actors,你可能知道在处理一条消息后,你应该返回下一个要使用的行为。有了Akka Persistence Typed 情况就不同了。命令处理程序和事件处理程序都依赖于当前状态,因此可以通过在事件处理程序中返回新状态来更改行为。有关此主题的更多信息,请参阅Akka文档

标记事件–Akka持久化查询注意事项

事件被持久保存在事件日志中,主要用于在每次需要实例化时重播聚合的状态。然而,在CQRS中,我们还希望使用这些相同的事件并生成读取端视图,或者将它们发布到消息代理(例如:Kafka)中以供外部使用。
为了能够在读取端使用事件,必须对事件进行标记。这是使用AggregateEventTag来完成的。建议对标签进行分片,以便Lagom的读侧处理器主题生产者能够以分布式方式使用它们。虽然不建议这样做,但也可以不对事件进行分片,参照这里。
本例将标记拆分为10个片段,并在ShoppingCart.Event的伴生对象中定义事件标记器。请注意标记名以及分片的数量必须稳定。如果不迁移日志,这两个值以后无法更改。

  1. object ShoppingCartEvent {
  2. // will produce tags with shard numbers from 0 to 9
  3. val Tag: AggregateEventShards[ShoppingCartEvent] =
  4. AggregateEventTag.sharded[ShoppingCartEvent](numShards = 10)
  5. }

注意:如果使用JDBC数据库存储日志,则分片标记(numshard)的数量不应大于10。这是由于插件中存在缺陷造成的。如果不遵守此指令,将导致某些事件在读侧处理器和主题生产者上被多次传递。

AggregateEventTag是Lagom的读侧处理器和主题生产者使用的Lagom类,但是Akka Persistence Type需要一个函数Event=>Set[String]。因此,我们需要使用适配器将Lagom的AggregateEventTag转换为所需的Akka tagger函数。

  1. EventSourcedBehavior
  2. .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
  3. persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
  4. emptyState = ShoppingCart.empty,
  5. commandHandler = (cart, cmd) => cart.applyCommand(cmd),
  6. eventHandler = (cart, evt) => cart.applyEvent(evt)
  7. )
  8. .withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))

配置快照

快照是一种常见的优化,可以避免从一开始就重播所有事件。
可以通过两种方式定义快照规则:通过断言行为(snapshotWhen)和通过计数器。两者可以结合。下面的示例使用计数器来说明API。你可以在Akka文档中找到更多细节。

  1. EventSourcedBehavior
  2. .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
  3. persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
  4. emptyState = ShoppingCart.empty,
  5. commandHandler = (cart, cmd) => cart.applyCommand(cmd),
  6. eventHandler = (cart, evt) => cart.applyEvent(evt)
  7. )
  8. // snapshot every 100 events and keep at most 2 snapshots on db
  9. .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))

通过断言行为来定义快照:

  1. import akka.persistence.typed.scaladsl.Effect
  2. EventSourcedBehavior[Command, Event, State](
  3. persistenceId = PersistenceId.ofUniqueId("abc"),
  4. emptyState = State(),
  5. commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
  6. eventHandler = (state, evt) => state) // do something based on a particular state
  7. .snapshotWhen {
  8. case (state, BookingCompleted(_), sequenceNumber) => true
  9. case (state, event, sequenceNumber) => false
  10. }
  11. .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))

Akka集群分片(Cluster Sharding)

Lagom使用Akka群集分片将聚合分布到所有节点上,并保证在任何时候,在整个群集的内存中只加载一个给定聚合的实例。

创建聚合实例

聚合需要在ClusterSharding上初始化,然后才能使用。该过程不会创建任何特定的聚合实例,它只会创建分片区域并准备使用(请阅读Akka Cluster Sharding文档中有关分片区域的更多信息)。

注:在Akka集群中,指分片参与者的术语是实体,因此分片聚合也可以称为聚合实体。

您必须定义EntityTypeKeyEntityContext[Command]=>Behavior[Command]函数来初始化购物车聚合服务的EventSourcedBehavior
EntityTypeKey具有唯一标识集群中此模型的名称。它应该在ShoppingCartCommand上输入,这是购物车可以接收的消息类型。
ShoppingCart的伴生对象中,定义EntityTypeKey和工厂方法来初始化购物车聚合的EventSourcedBehavior

  1. object ShoppingCart {
  2. val empty = ShoppingCart(items = Map.empty)
  3. val typeKey: EntityTypeKey[ShoppingCartCommand] = EntityTypeKey[ShoppingCartCommand]("ShoppingCart")
  4. def apply(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = {
  5. EventSourcedBehavior
  6. .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
  7. persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
  8. emptyState = ShoppingCart.empty,
  9. commandHandler = (cart, cmd) => cart.applyCommand(cmd),
  10. eventHandler = (cart, evt) => cart.applyEvent(evt)
  11. )
  12. .withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
  13. .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
  14. }
  15. }

最后,在ClusterSharding中使用typedKeybehavior来完成聚合的初始化。为了方便起见,Lagom通过依赖项注入提供了clusterSharding扩展的一个实例。初始化实体只应执行一次,对于Lagom聚合,通常在LagomApplication中执行:

  1. class ShoppingCartLoader extends LagomApplicationLoader {
  2. override def load(context: LagomApplicationContext): LagomApplication =
  3. new ShoppingCartApplication(context) with AkkaDiscoveryComponents
  4. override def loadDevMode(context: LagomApplicationContext): LagomApplication =
  5. new ShoppingCartApplication(context) with LagomDevModeComponents
  6. override def describeService = Some(readDescriptor[ShoppingCartService])
  7. }
  8. trait ShoppingCartComponents
  9. extends LagomServerComponents
  10. with SlickPersistenceComponents
  11. with HikariCPComponents
  12. with AhcWSComponents {
  13. implicit def executionContext: ExecutionContext
  14. override lazy val lagomServer: LagomServer = serverFor[ShoppingCartService](wire[ShoppingCartServiceImpl])
  15. override lazy val jsonSerializerRegistry: JsonSerializerRegistry = ShoppingCartSerializerRegistry
  16. // Initialize the sharding for the ShoppingCart aggregate.
  17. // See https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html
  18. clusterSharding.init(
  19. Entity(ShoppingCart.typeKey) { entityContext =>
  20. ShoppingCart(entityContext)
  21. }
  22. )
  23. }
  24. abstract class ShoppingCartApplication(context: LagomApplicationContext)
  25. extends LagomApplication(context)
  26. with ShoppingCartComponents
  27. with LagomKafkaComponents {}

获取聚合根实体的实例

要访问聚合实例(可能在集群上本地或远程运行),应在服务上注入ClusterSharding

  1. class ShoppingCartServiceImpl(
  2. clusterSharding: ClusterSharding,
  3. persistentEntityRegistry: PersistentEntityRegistry
  4. )(implicit ec: ExecutionContext)
  5. extends ShoppingCartService // class body follows

然后可以使用EntityRefFor方法实例化EntityRef。在我们的例子中,EntityRef被输入为只接受 ShoppingCart.Command

  1. def entityRef(id: String): EntityRef[ShoppingCartCommand] = {
  2. clusterSharding.entityRefFor(ShoppingCart.typeKey, id)
  3. }

要在集群中找到正确的参与者,需要指定用于初始化实体的EntityTypeKey和所需实例的id。Akka Cluster将在集群上的一个节点中创建所需的actor,或者如果actor已经创建且仍处于活动状态,则重用现有实例。
EntityRef类似于ActorRef,但实体实例是分布在不同的节点。与EntityRef交互意味着交换的消息可能需要路由传输到另一个节点。

使用ask同步模式的注意事项

由于我们希望向聚合发送命令,并且这些命令声明一个应答,因此我们需要使用ask模式
我们下面介绍的代码从ShoppingCartServiceImpl内部创建EntityRef,这意味着我们可以从外部调用EntityRefActorSystem. EntityRef提供了一个现成的ask()重载方法便于外部调用。

  1. implicit val timeout = Timeout(5.seconds)
  2. override def get(id: String): ServiceCall[NotUsed, ShoppingCartView] = ServiceCall { _ =>
  3. entityRef(id)
  4. .ask(reply => Get(reply))
  5. .map(cartSummary => asShoppingCartView(id, cartSummary))
  6. }

因此,我们声明一个隐式timeout,然后调用ask(隐式使用超时)。ask方法接受ActorRef[Res]=>M的函数,其中Res是预期的响应类型,M是发送给参与者的消息。ask方法将创建ActorRef[Res]的实例,该实例可用于构建传出消息(命令)。一旦响应发送到ActorRef[Res],Akka将用响应(在本例中为Future[Summary])完成Future[Res]的返回。
最后,我们对cartSummary进行操作(在本例中,我们将其映射到另一种类型,即ShoppingCartView)。
ShoppingCartViewasShoppingCartView的定义如下:

  1. final case class ShoppingCartItem(itemId: String, quantity: Int)
  2. final case class ShoppingCartView(id: String, items: Seq[ShoppingCartItem], checkedOut: Boolean)
  3. object ShoppingCartItem {
  4. implicit val format: Format[ShoppingCartItem] = Json.format
  5. }
  6. object ShoppingCartView {
  7. implicit val format: Format[ShoppingCartView] = Json.format
  8. }
  9. private def asShoppingCartView(id: String, cartSummary: Summary): ShoppingCartView = {
  10. ShoppingCartView(
  11. id,
  12. cartSummary.items.map((ShoppingCartItem.apply _).tupled).toSeq,
  13. cartSummary.checkedOut
  14. )
  15. }

配置分区数量

根据Akka建议,分区的数量应该比计划的最大集群节点数量高出10倍。不一定要精确。分区数少于节点数将导致某些节点无法承载任何分区。过多的分区将导致分区管理效率降低,例如重新平衡开销,并增加延迟,因为协调器参与了每个分区的第一条消息的路由。
有关如何配置分区数量的详细信息,请参阅Akka Cluster Sharding文档。

配置实体钝化

将所有聚合始终保存在内存中是低效的。取而代之的是,使用实体钝化功能,然后当分区实体(聚合)已被闲置一段时间时,将其从集群中移除。
Akka支持编程钝化自动钝化。自动钝化作为默认方式通常可以满足要求。

数据序列化

消息(命令、回复)和持久类(事件、状态快照)需要可序列化,才能通过网络跨集群发送或存储在数据库中。Akka建议将基于Jackson的序列化程序(最好是JSON,但也支持CBOR)作为大多数情况下的默认值。在Akka序列化程序的基础上,Lagom可以轻松添加Play JSON序列化支持,这对一些Scala开发人员来说可能更熟悉。
特别是在Akka Persistence Typed中,当您采用CQRS/ES实践时,命令将包括replyTo:ActorRef[Reply]字段。此replyTo字段将在代码中用于发送回复,如上面的示例所示。序列化ActorRef[T]需要使用Akka-Jackson序列化程序,这意味着不能使用Play JSON来序列化命令。
Akka Jackson用于命令消息的限制不适用于事件、快照甚至回复等其他消息。Akka需要序列化的每种类型都可能使用不同的序列化程序。
请在序列化部分中阅读有关序列化设置和配置的更多信息。

测试

Testing章节中涵盖了为聚合编写单元测试所需的所有步骤和功能。