本节介绍了使用Akka Persistence Typed并遵循Lagom采用的CQRS原则,按照域驱动设计中的定义,对聚合进行建模的所有步骤。虽然Akka Persistence Typed为构建事件源参与者提供了API,但这并不一定适用于CQRS聚合。为了构建CQRS应用程序,我们需要在设计中使用一些规则。
我们使用一个简化的购物车示例来指导您完成这个过程。您可以在我们的样例代码库中找到一个完整的购物车样例。
定义模型
State状态模型
购物车的状态模型定义如下:
final case class ShoppingCart(items: Map[String, Int],// checkedOutTime defines if cart was checked-out or not:// case None, cart is open// case Some, cart is checked-outcheckedOutTime: Option[Instant] = None)
请注意,我们使用案例类ShoppingCart对其进行建模,当从一个状态(打开的购物车)转换到另一个状态(签出的购物车)时,可以设置checkedOutTime。正如我们将在后面看到的,每个状态对它可以处理的命令、它可以持久化的事件以及它可以转换到的其他状态进行编码。
注:上面显示的示例是一个简化案例。每当你的模型经历不同的状态转换时,更好的方法是为每个状态都有一个特性和它的扩展。有关信息请参见样式指南中的示例。
命令和响应模型
接下来,我们定义可以发送的命令。
每个命令都通过replyTo: ActorRef[R]字段定义一个应答,其中R是将发送回调用方的应答类型。回复用于在命令被接受或拒绝时进行回复,或读取聚合服务的数据时进行回复(即:只读命令),也有可能两者兼而有之。例如,如果命令成功,它将返回一些更新的数据;如果失败,它将返回一条被拒绝的消息。或者你可以使用没有任何回复的命令(如:即发即弃)。不过,这是CQRS聚合建模中不太常见的模式,本文中没有介绍。
trait CommandSerializablesealed trait ShoppingCartCommand extends CommandSerializablefinal case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[Confirmation]) extends ShoppingCartCommandfinal case class Checkout(replyTo: ActorRef[Confirmation]) extends ShoppingCartCommandfinal case class Get(replyTo: ActorRef[Summary]) extends ShoppingCartCommand
在Akka类型中,与Akka classic和Lagom持久化不同,不可能向调用方返回异常。参与者和调用者之间的所有通信都必须通过命令中传递的replyTo:ActorRef[R]完成。因此,如果你想发出拒绝信号,你必须在回复协议中对其进行编码。
上述命令使用的回复定义如下:
sealed trait Confirmationfinal case class Accepted(summary: Summary) extends Confirmationfinal case class Rejected(reason: String) extends Confirmationfinal case class Summary(items: Map[String, Int], checkedOut: Boolean)
这里有两种不同的回复:Confirmation和 Summary。当我们想要修改状态时,使用Confirmation。修改请求可以被Accepted或 Rejected。然后,使用Summary来查询我们想看的购物车的状态。
注意:请记住,
Summary不是购物车本身的数据结构,而是我们想要向外部世界展示的表现。将聚合的内部状态保持为私有是一种很好的做法,因为它允许内部状态和公开的API独立发展。
事件模型
接下来,我们定义需要持久化的事件。这些事件必须扩展自Lagom的AggregateEvent。这对于标记事件很重要。我们将在下文中标记事件小节介绍它。
sealed trait ShoppingCartEvent extends AggregateEvent[ShoppingCartEvent] {override def aggregateTag: AggregateEventTagger[ShoppingCartEvent] = ShoppingCartEvent.Tag}final case class ItemAdded(itemId: String, quantity: Int) extends ShoppingCartEventfinal case class CartCheckedOut(eventTime: Instant) extends ShoppingCartEvent
定义命令处理器
一旦根据命令、回复、事件和状态定义了模型,就需要指定业务规则。命令处理程序定义如何处理每个传入的命令,必须应用哪些验证,以及哪些事件将被持久化。
你可以用不同的方式编码。建议的样式是在状态类中添加命令处理程序。对于ShoppingCart,我们可以根据两种可能的状态定义命令处理程序:
def applyCommand(cmd: ShoppingCartCommand): ReplyEffect[ShoppingCartEvent, ShoppingCart] =if (isOpen) {cmd match {case AddItem(itemId, quantity, replyTo) => onAddItem(itemId, quantity, replyTo)case Checkout(replyTo) => onCheckout(replyTo)case Get(replyTo) => onGet(replyTo)}} else {cmd match {case AddItem(_, _, replyTo) => Effect.reply(replyTo)(Rejected("Cannot add an item to a checked-out cart"))case Checkout(replyTo) => Effect.reply(replyTo)(Rejected("Cannot checkout a checked-out cart"))case Get(replyTo) => onGet(replyTo)}}private def onAddItem(itemId: String,quantity: Int,replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {if (items.contains(itemId))Effect.reply(replyTo)(Rejected(s"Item '$itemId' was already added to this shopping cart"))else if (quantity <= 0)Effect.reply(replyTo)(Rejected("Quantity must be greater than zero"))elseEffect.persist(ItemAdded(itemId, quantity)).thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))}private def onCheckout(replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {if (items.isEmpty)Effect.reply(replyTo)(Rejected("Cannot checkout an empty shopping cart"))elseEffect.persist(CartCheckedOut(Instant.now())).thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))}private def onGet(replyTo: ActorRef[Summary]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {Effect.reply(replyTo)(toSummary(shoppingCart = this))}private def toSummary(shoppingCart: ShoppingCart): Summary = {Summary(shoppingCart.items, shoppingCart.checkedOut)}
注意:当然可以按您认为更方便的方式组织命令处理程序,但我们建议使用
onCommand模式,因为它可以帮助保持每个命令的处理逻辑相互隔离。
命令处理程序是模型的核心。它们对模型的业务规则进行编码,并充当模型一致性的守护者。命令处理程序必须首先验证传入命令是否可以应用于当前模型状态。在验证成功的情况下,一个或多个事件变化会被持久化。一旦事件被持久化,它们就会从当前状态迁移到下一个有效的状态。
由于聚合服务旨在为一致性边界建模,因此不建议使用范围内不可用的数据验证命令。任何决策都应该完全基于命令中传递的数据和当前聚合的状态。任何外部调用都应该被视为对聚合服务的一种破坏,因为这意味着聚合服务不能完全控制它应该保护的不变量。
回复有两种方式:Effect.reply and Effect.persist(...).thenReply。第一个选项在生效时直接可用,在回复时应在不保留任何事件的情况下使用。在这种情况下,您可以使用作用域中的可用状态,因为它保证不会更改。第二种变体应该在持久化一个或多个事件时使用。然后,您可以在用于定义回复的函数上改变到更新的状态。
您可以在命令处理程序中运行副作用函数。请参考 Akka文档 有关详细信息。
定义事件处理器
事件处理程序通过响应聚合事件来改变聚合状态。事件处理程序必须是纯函数,因为它们将在实例化聚合和重放事件日志时使用。与命令处理程序类似,建议将它们添加到状态类中。
def applyEvent(evt: ShoppingCartEvent): ShoppingCart =evt match {case ItemAdded(itemId, quantity) => onItemAdded(itemId, quantity)case CartCheckedOut(checkedOutTime) => onCartCheckedOut(checkedOutTime)}private def onItemAdded(itemId: String, quantity: Int): ShoppingCart =copy(items = items + (itemId -> quantity))private def onCartCheckedOut(checkedOutTime: Instant): ShoppingCart = {copy(checkedOutTime = Option(checkedOutTime))}
EventSourcingBehaviour-模型组合
对所有模型进行编码后,下一步是将所有片段组合在一起,这样我们就可以让它运行。为此,请定义EventSourcedBehavior。在建模CQRS聚合时,建议使用withEnforcedReplies定义EventSourcedBehavior。使用强制回复要求命令处理程序返回ReplyEffect,通过这样来迫使开发人员给出明确回复。
EventSourcedBehavior.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),emptyState = ShoppingCart.empty,commandHandler = (cart, cmd) => cart.applyCommand(cmd),eventHandler = (cart, evt) => cart.applyEvent(evt))
EventSourcedBehavior.withEnforcedReplies 有四个字段需要定义: persistenceId, emptyState, commandHandler 和eventHandler。persistenceId定义将在事件日志中使用的id。id由名称(例如entityContext.entityTypeKey.name)和业务id(例如entityContext.entityId)组成。默认情况下,这两个值将使用“|”连接(例如,“ShoppingCart | 123456”)。有关更多详细信息,请参阅Akka文档。emptyState是日志为空时使用的状态。这是初始状态:
val empty: ShoppingCart = ShoppingCart(items = Map.empty)
commandHandler是一个(State,Command)=>ReplyEffect[Event,State]的函数。在本例中,它是在passed状态下使用applyCommand定义的。同样,eventHandler是一个(State,Event)=>Event的函数,并在passed状态中定义。
改变行为-有限状态机
如果你熟悉Akka Actors,你可能知道在处理一条消息后,你应该返回下一个要使用的行为。有了Akka Persistence Typed 情况就不同了。命令处理程序和事件处理程序都依赖于当前状态,因此可以通过在事件处理程序中返回新状态来更改行为。有关此主题的更多信息,请参阅Akka文档。
标记事件–Akka持久化查询注意事项
事件被持久保存在事件日志中,主要用于在每次需要实例化时重播聚合的状态。然而,在CQRS中,我们还希望使用这些相同的事件并生成读取端视图,或者将它们发布到消息代理(例如:Kafka)中以供外部使用。
为了能够在读取端使用事件,必须对事件进行标记。这是使用AggregateEventTag来完成的。建议对标签进行分片,以便Lagom的读侧处理器和主题生产者能够以分布式方式使用它们。虽然不建议这样做,但也可以不对事件进行分片,参照这里。
本例将标记拆分为10个片段,并在ShoppingCart.Event的伴生对象中定义事件标记器。请注意标记名以及分片的数量必须稳定。如果不迁移日志,这两个值以后无法更改。
object ShoppingCartEvent {// will produce tags with shard numbers from 0 to 9val Tag: AggregateEventShards[ShoppingCartEvent] =AggregateEventTag.sharded[ShoppingCartEvent](numShards = 10)}
注意:如果使用JDBC数据库存储日志,则分片标记(numshard)的数量不应大于10。这是由于插件中存在缺陷造成的。如果不遵守此指令,将导致某些事件在读侧处理器和主题生产者上被多次传递。
AggregateEventTag是Lagom的读侧处理器和主题生产者使用的Lagom类,但是Akka Persistence Type需要一个函数Event=>Set[String]。因此,我们需要使用适配器将Lagom的AggregateEventTag转换为所需的Akka tagger函数。
EventSourcedBehavior.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),emptyState = ShoppingCart.empty,commandHandler = (cart, cmd) => cart.applyCommand(cmd),eventHandler = (cart, evt) => cart.applyEvent(evt)).withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
配置快照
快照是一种常见的优化,可以避免从一开始就重播所有事件。
可以通过两种方式定义快照规则:通过断言行为(snapshotWhen)和通过计数器。两者可以结合。下面的示例使用计数器来说明API。你可以在Akka文档中找到更多细节。
EventSourcedBehavior.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),emptyState = ShoppingCart.empty,commandHandler = (cart, cmd) => cart.applyCommand(cmd),eventHandler = (cart, evt) => cart.applyEvent(evt))// snapshot every 100 events and keep at most 2 snapshots on db.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
通过断言行为来定义快照:
import akka.persistence.typed.scaladsl.EffectEventSourcedBehavior[Command, Event, State](persistenceId = PersistenceId.ofUniqueId("abc"),emptyState = State(),commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),eventHandler = (state, evt) => state) // do something based on a particular state.snapshotWhen {case (state, BookingCompleted(_), sequenceNumber) => truecase (state, event, sequenceNumber) => false}.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
Akka集群分片(Cluster Sharding)
Lagom使用Akka群集分片将聚合分布到所有节点上,并保证在任何时候,在整个群集的内存中只加载一个给定聚合的实例。
创建聚合实例
聚合需要在ClusterSharding上初始化,然后才能使用。该过程不会创建任何特定的聚合实例,它只会创建分片区域并准备使用(请阅读Akka Cluster Sharding文档中有关分片区域的更多信息)。
注:在Akka集群中,指分片参与者的术语是实体,因此分片聚合也可以称为聚合实体。
您必须定义EntityTypeKey和EntityContext[Command]=>Behavior[Command]函数来初始化购物车聚合服务的EventSourcedBehavior。EntityTypeKey具有唯一标识集群中此模型的名称。它应该在ShoppingCartCommand上输入,这是购物车可以接收的消息类型。
在ShoppingCart的伴生对象中,定义EntityTypeKey和工厂方法来初始化购物车聚合的EventSourcedBehavior。
object ShoppingCart {val empty = ShoppingCart(items = Map.empty)val typeKey: EntityTypeKey[ShoppingCartCommand] = EntityTypeKey[ShoppingCartCommand]("ShoppingCart")def apply(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = {EventSourcedBehavior.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),emptyState = ShoppingCart.empty,commandHandler = (cart, cmd) => cart.applyCommand(cmd),eventHandler = (cart, evt) => cart.applyEvent(evt)).withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag)).withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))}}
最后,在ClusterSharding中使用typedKey和behavior来完成聚合的初始化。为了方便起见,Lagom通过依赖项注入提供了clusterSharding扩展的一个实例。初始化实体只应执行一次,对于Lagom聚合,通常在LagomApplication中执行:
class ShoppingCartLoader extends LagomApplicationLoader {override def load(context: LagomApplicationContext): LagomApplication =new ShoppingCartApplication(context) with AkkaDiscoveryComponentsoverride def loadDevMode(context: LagomApplicationContext): LagomApplication =new ShoppingCartApplication(context) with LagomDevModeComponentsoverride def describeService = Some(readDescriptor[ShoppingCartService])}trait ShoppingCartComponentsextends LagomServerComponentswith SlickPersistenceComponentswith HikariCPComponentswith AhcWSComponents {implicit def executionContext: ExecutionContextoverride lazy val lagomServer: LagomServer = serverFor[ShoppingCartService](wire[ShoppingCartServiceImpl])override lazy val jsonSerializerRegistry: JsonSerializerRegistry = ShoppingCartSerializerRegistry// Initialize the sharding for the ShoppingCart aggregate.// See https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.htmlclusterSharding.init(Entity(ShoppingCart.typeKey) { entityContext =>ShoppingCart(entityContext)})}abstract class ShoppingCartApplication(context: LagomApplicationContext)extends LagomApplication(context)with ShoppingCartComponentswith LagomKafkaComponents {}
获取聚合根实体的实例
要访问聚合实例(可能在集群上本地或远程运行),应在服务上注入ClusterSharding:
class ShoppingCartServiceImpl(clusterSharding: ClusterSharding,persistentEntityRegistry: PersistentEntityRegistry)(implicit ec: ExecutionContext)extends ShoppingCartService // class body follows
然后可以使用EntityRefFor方法实例化EntityRef。在我们的例子中,EntityRef被输入为只接受 ShoppingCart.Command。
def entityRef(id: String): EntityRef[ShoppingCartCommand] = {clusterSharding.entityRefFor(ShoppingCart.typeKey, id)}
要在集群中找到正确的参与者,需要指定用于初始化实体的EntityTypeKey和所需实例的id。Akka Cluster将在集群上的一个节点中创建所需的actor,或者如果actor已经创建且仍处于活动状态,则重用现有实例。EntityRef类似于ActorRef,但实体实例是分布在不同的节点。与EntityRef交互意味着交换的消息可能需要路由传输到另一个节点。
使用ask同步模式的注意事项
由于我们希望向聚合发送命令,并且这些命令声明一个应答,因此我们需要使用ask模式。
我们下面介绍的代码从ShoppingCartServiceImpl内部创建EntityRef,这意味着我们可以从外部调用EntityRef,ActorSystem. EntityRef提供了一个现成的ask()重载方法便于外部调用。
implicit val timeout = Timeout(5.seconds)override def get(id: String): ServiceCall[NotUsed, ShoppingCartView] = ServiceCall { _ =>entityRef(id).ask(reply => Get(reply)).map(cartSummary => asShoppingCartView(id, cartSummary))}
因此,我们声明一个隐式timeout,然后调用ask(隐式使用超时)。ask方法接受ActorRef[Res]=>M的函数,其中Res是预期的响应类型,M是发送给参与者的消息。ask方法将创建ActorRef[Res]的实例,该实例可用于构建传出消息(命令)。一旦响应发送到ActorRef[Res],Akka将用响应(在本例中为Future[Summary])完成Future[Res]的返回。
最后,我们对cartSummary进行操作(在本例中,我们将其映射到另一种类型,即ShoppingCartView)。ShoppingCartView和asShoppingCartView的定义如下:
final case class ShoppingCartItem(itemId: String, quantity: Int)final case class ShoppingCartView(id: String, items: Seq[ShoppingCartItem], checkedOut: Boolean)object ShoppingCartItem {implicit val format: Format[ShoppingCartItem] = Json.format}object ShoppingCartView {implicit val format: Format[ShoppingCartView] = Json.format}private def asShoppingCartView(id: String, cartSummary: Summary): ShoppingCartView = {ShoppingCartView(id,cartSummary.items.map((ShoppingCartItem.apply _).tupled).toSeq,cartSummary.checkedOut)}
配置分区数量
根据Akka建议,分区的数量应该比计划的最大集群节点数量高出10倍。不一定要精确。分区数少于节点数将导致某些节点无法承载任何分区。过多的分区将导致分区管理效率降低,例如重新平衡开销,并增加延迟,因为协调器参与了每个分区的第一条消息的路由。
有关如何配置分区数量的详细信息,请参阅Akka Cluster Sharding文档。
配置实体钝化
将所有聚合始终保存在内存中是低效的。取而代之的是,使用实体钝化功能,然后当分区实体(聚合)已被闲置一段时间时,将其从集群中移除。
Akka支持编程钝化和自动钝化。自动钝化作为默认方式通常可以满足要求。
数据序列化
消息(命令、回复)和持久类(事件、状态快照)需要可序列化,才能通过网络跨集群发送或存储在数据库中。Akka建议将基于Jackson的序列化程序(最好是JSON,但也支持CBOR)作为大多数情况下的默认值。在Akka序列化程序的基础上,Lagom可以轻松添加Play JSON序列化支持,这对一些Scala开发人员来说可能更熟悉。
特别是在Akka Persistence Typed中,当您采用CQRS/ES实践时,命令将包括replyTo:ActorRef[Reply]字段。此replyTo字段将在代码中用于发送回复,如上面的示例所示。序列化ActorRef[T]需要使用Akka-Jackson序列化程序,这意味着不能使用Play JSON来序列化命令。
Akka Jackson用于命令消息的限制不适用于事件、快照甚至回复等其他消息。Akka需要序列化的每种类型都可能使用不同的序列化程序。
请在序列化部分中阅读有关序列化设置和配置的更多信息。
测试
Testing章节中涵盖了为聚合编写单元测试所需的所有步骤和功能。
