本节介绍了使用Akka Persistence Typed并遵循Lagom采用的CQRS原则,按照域驱动设计中的定义,对聚合进行建模的所有步骤。虽然Akka Persistence Typed
为构建事件源参与者提供了API,但这并不一定适用于CQRS聚合。为了构建CQRS应用程序,我们需要在设计中使用一些规则。
我们使用一个简化的购物车示例来指导您完成这个过程。您可以在我们的样例代码库中找到一个完整的购物车样例。
定义模型
State状态模型
购物车的状态模型定义如下:
final case class ShoppingCart(
items: Map[String, Int],
// checkedOutTime defines if cart was checked-out or not:
// case None, cart is open
// case Some, cart is checked-out
checkedOutTime: Option[Instant] = None
)
请注意,我们使用案例类ShoppingCart
对其进行建模,当从一个状态(打开的购物车)转换到另一个状态(签出的购物车)时,可以设置checkedOutTime
。正如我们将在后面看到的,每个状态对它可以处理的命令、它可以持久化的事件以及它可以转换到的其他状态进行编码。
注:上面显示的示例是一个简化案例。每当你的模型经历不同的状态转换时,更好的方法是为每个状态都有一个特性和它的扩展。有关信息请参见样式指南中的示例。
命令和响应模型
接下来,我们定义可以发送的命令。
每个命令都通过replyTo: ActorRef[R]
字段定义一个应答,其中R
是将发送回调用方的应答类型。回复用于在命令被接受或拒绝时进行回复,或读取聚合服务的数据时进行回复(即:只读命令),也有可能两者兼而有之。例如,如果命令成功,它将返回一些更新的数据;如果失败,它将返回一条被拒绝的消息。或者你可以使用没有任何回复的命令(如:即发即弃)。不过,这是CQRS聚合建模中不太常见的模式,本文中没有介绍。
trait CommandSerializable
sealed trait ShoppingCartCommand extends CommandSerializable
final case class AddItem(itemId: String, quantity: Int, replyTo: ActorRef[Confirmation]) extends ShoppingCartCommand
final case class Checkout(replyTo: ActorRef[Confirmation]) extends ShoppingCartCommand
final case class Get(replyTo: ActorRef[Summary]) extends ShoppingCartCommand
在Akka类型中,与Akka classic和Lagom持久化不同,不可能向调用方返回异常。参与者和调用者之间的所有通信都必须通过命令中传递的replyTo:ActorRef[R]
完成。因此,如果你想发出拒绝信号,你必须在回复协议中对其进行编码。
上述命令使用的回复定义如下:
sealed trait Confirmation
final case class Accepted(summary: Summary) extends Confirmation
final case class Rejected(reason: String) extends Confirmation
final case class Summary(items: Map[String, Int], checkedOut: Boolean)
这里有两种不同的回复:Confirmation
和 Summary
。当我们想要修改状态时,使用Confirmation
。修改请求可以被Accepted
或 Rejected
。然后,使用Summary
来查询我们想看的购物车的状态。
注意:请记住,
Summary
不是购物车本身的数据结构,而是我们想要向外部世界展示的表现。将聚合的内部状态保持为私有是一种很好的做法,因为它允许内部状态和公开的API独立发展。
事件模型
接下来,我们定义需要持久化的事件。这些事件必须扩展自Lagom的AggregateEvent。这对于标记事件很重要。我们将在下文中标记事件小节介绍它。
sealed trait ShoppingCartEvent extends AggregateEvent[ShoppingCartEvent] {
override def aggregateTag: AggregateEventTagger[ShoppingCartEvent] = ShoppingCartEvent.Tag
}
final case class ItemAdded(itemId: String, quantity: Int) extends ShoppingCartEvent
final case class CartCheckedOut(eventTime: Instant) extends ShoppingCartEvent
定义命令处理器
一旦根据命令、回复、事件和状态定义了模型,就需要指定业务规则。命令处理程序定义如何处理每个传入的命令,必须应用哪些验证,以及哪些事件将被持久化。
你可以用不同的方式编码。建议的样式是在状态类中添加命令处理程序。对于ShoppingCart
,我们可以根据两种可能的状态定义命令处理程序:
def applyCommand(cmd: ShoppingCartCommand): ReplyEffect[ShoppingCartEvent, ShoppingCart] =
if (isOpen) {
cmd match {
case AddItem(itemId, quantity, replyTo) => onAddItem(itemId, quantity, replyTo)
case Checkout(replyTo) => onCheckout(replyTo)
case Get(replyTo) => onGet(replyTo)
}
} else {
cmd match {
case AddItem(_, _, replyTo) => Effect.reply(replyTo)(Rejected("Cannot add an item to a checked-out cart"))
case Checkout(replyTo) => Effect.reply(replyTo)(Rejected("Cannot checkout a checked-out cart"))
case Get(replyTo) => onGet(replyTo)
}
}
private def onAddItem(
itemId: String,
quantity: Int,
replyTo: ActorRef[Confirmation]
): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
if (items.contains(itemId))
Effect.reply(replyTo)(Rejected(s"Item '$itemId' was already added to this shopping cart"))
else if (quantity <= 0)
Effect.reply(replyTo)(Rejected("Quantity must be greater than zero"))
else
Effect
.persist(ItemAdded(itemId, quantity))
.thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))
}
private def onCheckout(replyTo: ActorRef[Confirmation]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
if (items.isEmpty)
Effect.reply(replyTo)(Rejected("Cannot checkout an empty shopping cart"))
else
Effect
.persist(CartCheckedOut(Instant.now()))
.thenReply(replyTo)(updatedCart => Accepted(toSummary(updatedCart)))
}
private def onGet(replyTo: ActorRef[Summary]): ReplyEffect[ShoppingCartEvent, ShoppingCart] = {
Effect.reply(replyTo)(toSummary(shoppingCart = this))
}
private def toSummary(shoppingCart: ShoppingCart): Summary = {
Summary(shoppingCart.items, shoppingCart.checkedOut)
}
注意:当然可以按您认为更方便的方式组织命令处理程序,但我们建议使用
onCommand
模式,因为它可以帮助保持每个命令的处理逻辑相互隔离。
命令处理程序是模型的核心。它们对模型的业务规则进行编码,并充当模型一致性的守护者。命令处理程序必须首先验证传入命令是否可以应用于当前模型状态。在验证成功的情况下,一个或多个事件变化会被持久化。一旦事件被持久化,它们就会从当前状态迁移到下一个有效的状态。
由于聚合服务旨在为一致性边界建模,因此不建议使用范围内不可用的数据验证命令。任何决策都应该完全基于命令中传递的数据和当前聚合的状态。任何外部调用都应该被视为对聚合服务的一种破坏,因为这意味着聚合服务不能完全控制它应该保护的不变量。
回复有两种方式:Effect.reply
and Effect.persist(...).thenReply
。第一个选项在生效时直接可用,在回复时应在不保留任何事件的情况下使用。在这种情况下,您可以使用作用域中的可用状态,因为它保证不会更改。第二种变体应该在持久化一个或多个事件时使用。然后,您可以在用于定义回复的函数上改变到更新的状态。
您可以在命令处理程序中运行副作用函数。请参考 Akka文档 有关详细信息。
定义事件处理器
事件处理程序通过响应聚合事件来改变聚合状态。事件处理程序必须是纯函数,因为它们将在实例化聚合和重放事件日志时使用。与命令处理程序类似,建议将它们添加到状态类中。
def applyEvent(evt: ShoppingCartEvent): ShoppingCart =
evt match {
case ItemAdded(itemId, quantity) => onItemAdded(itemId, quantity)
case CartCheckedOut(checkedOutTime) => onCartCheckedOut(checkedOutTime)
}
private def onItemAdded(itemId: String, quantity: Int): ShoppingCart =
copy(items = items + (itemId -> quantity))
private def onCartCheckedOut(checkedOutTime: Instant): ShoppingCart = {
copy(checkedOutTime = Option(checkedOutTime))
}
EventSourcingBehaviour-模型组合
对所有模型进行编码后,下一步是将所有片段组合在一起,这样我们就可以让它运行。为此,请定义EventSourcedBehavior
。在建模CQRS聚合时,建议使用withEnforcedReplies
定义EventSourcedBehavior
。使用强制回复要求命令处理程序返回ReplyEffect
,通过这样来迫使开发人员给出明确回复。
EventSourcedBehavior
.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
emptyState = ShoppingCart.empty,
commandHandler = (cart, cmd) => cart.applyCommand(cmd),
eventHandler = (cart, evt) => cart.applyEvent(evt)
)
EventSourcedBehavior.withEnforcedReplies 有四个字段需要定义: persistenceId, emptyState, commandHandler 和eventHandler。persistenceId
定义将在事件日志中使用的id。id由名称(例如entityContext.entityTypeKey.name)和业务id(例如entityContext.entityId
)组成。默认情况下,这两个值将使用“|”连接(例如,“ShoppingCart | 123456”
)。有关更多详细信息,请参阅Akka文档。emptyState
是日志为空时使用的状态。这是初始状态:
val empty: ShoppingCart = ShoppingCart(items = Map.empty)
commandHandler
是一个(State,Command)=>ReplyEffect[Event,State]
的函数。在本例中,它是在passed状态下使用applyCommand定义的。同样,eventHandler
是一个(State,Event)=>Event
的函数,并在passed状态中定义。
改变行为-有限状态机
如果你熟悉Akka Actors,你可能知道在处理一条消息后,你应该返回下一个要使用的行为。有了Akka Persistence Typed 情况就不同了。命令处理程序和事件处理程序都依赖于当前状态,因此可以通过在事件处理程序中返回新状态来更改行为。有关此主题的更多信息,请参阅Akka文档。
标记事件–Akka持久化查询注意事项
事件被持久保存在事件日志中,主要用于在每次需要实例化时重播聚合的状态。然而,在CQRS中,我们还希望使用这些相同的事件并生成读取端视图,或者将它们发布到消息代理(例如:Kafka)中以供外部使用。
为了能够在读取端使用事件,必须对事件进行标记。这是使用AggregateEventTag
来完成的。建议对标签进行分片,以便Lagom的读侧处理器和主题生产者能够以分布式方式使用它们。虽然不建议这样做,但也可以不对事件进行分片,参照这里。
本例将标记拆分为10个片段,并在ShoppingCart.Event
的伴生对象中定义事件标记器。请注意标记名以及分片的数量必须稳定。如果不迁移日志,这两个值以后无法更改。
object ShoppingCartEvent {
// will produce tags with shard numbers from 0 to 9
val Tag: AggregateEventShards[ShoppingCartEvent] =
AggregateEventTag.sharded[ShoppingCartEvent](numShards = 10)
}
注意:如果使用JDBC数据库存储日志,则分片标记(numshard)的数量不应大于10。这是由于插件中存在缺陷造成的。如果不遵守此指令,将导致某些事件在读侧处理器和主题生产者上被多次传递。
AggregateEventTag
是Lagom的读侧处理器和主题生产者使用的Lagom类,但是Akka Persistence Type需要一个函数Event=>Set[String]
。因此,我们需要使用适配器将Lagom的AggregateEventTag
转换为所需的Akka tagger函数。
EventSourcedBehavior
.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
emptyState = ShoppingCart.empty,
commandHandler = (cart, cmd) => cart.applyCommand(cmd),
eventHandler = (cart, evt) => cart.applyEvent(evt)
)
.withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
配置快照
快照是一种常见的优化,可以避免从一开始就重播所有事件。
可以通过两种方式定义快照规则:通过断言行为(snapshotWhen
)和通过计数器。两者可以结合。下面的示例使用计数器来说明API。你可以在Akka文档中找到更多细节。
EventSourcedBehavior
.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
emptyState = ShoppingCart.empty,
commandHandler = (cart, cmd) => cart.applyCommand(cmd),
eventHandler = (cart, evt) => cart.applyEvent(evt)
)
// snapshot every 100 events and keep at most 2 snapshots on db
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
通过断言行为来定义快照:
import akka.persistence.typed.scaladsl.Effect
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId.ofUniqueId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new NotImplementedError("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => state) // do something based on a particular state
.snapshotWhen {
case (state, BookingCompleted(_), sequenceNumber) => true
case (state, event, sequenceNumber) => false
}
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
Akka集群分片(Cluster Sharding)
Lagom使用Akka群集分片将聚合分布到所有节点上,并保证在任何时候,在整个群集的内存中只加载一个给定聚合的实例。
创建聚合实例
聚合需要在ClusterSharding
上初始化,然后才能使用。该过程不会创建任何特定的聚合实例,它只会创建分片区域并准备使用(请阅读Akka Cluster Sharding文档中有关分片区域的更多信息)。
注:在Akka集群中,指分片参与者的术语是实体,因此分片聚合也可以称为聚合实体。
您必须定义EntityTypeKey
和EntityContext[Command]=>Behavior[Command]
函数来初始化购物车聚合服务的EventSourcedBehavior
。EntityTypeKey
具有唯一标识集群中此模型的名称。它应该在ShoppingCartCommand
上输入,这是购物车可以接收的消息类型。
在ShoppingCart
的伴生对象中,定义EntityTypeKey
和工厂方法来初始化购物车聚合的EventSourcedBehavior
。
object ShoppingCart {
val empty = ShoppingCart(items = Map.empty)
val typeKey: EntityTypeKey[ShoppingCartCommand] = EntityTypeKey[ShoppingCartCommand]("ShoppingCart")
def apply(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = {
EventSourcedBehavior
.withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
emptyState = ShoppingCart.empty,
commandHandler = (cart, cmd) => cart.applyCommand(cmd),
eventHandler = (cart, evt) => cart.applyEvent(evt)
)
.withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
}
}
最后,在ClusterSharding
中使用typedKey
和behavior
来完成聚合的初始化。为了方便起见,Lagom通过依赖项注入提供了clusterSharding
扩展的一个实例。初始化实体只应执行一次,对于Lagom聚合,通常在LagomApplication
中执行:
class ShoppingCartLoader extends LagomApplicationLoader {
override def load(context: LagomApplicationContext): LagomApplication =
new ShoppingCartApplication(context) with AkkaDiscoveryComponents
override def loadDevMode(context: LagomApplicationContext): LagomApplication =
new ShoppingCartApplication(context) with LagomDevModeComponents
override def describeService = Some(readDescriptor[ShoppingCartService])
}
trait ShoppingCartComponents
extends LagomServerComponents
with SlickPersistenceComponents
with HikariCPComponents
with AhcWSComponents {
implicit def executionContext: ExecutionContext
override lazy val lagomServer: LagomServer = serverFor[ShoppingCartService](wire[ShoppingCartServiceImpl])
override lazy val jsonSerializerRegistry: JsonSerializerRegistry = ShoppingCartSerializerRegistry
// Initialize the sharding for the ShoppingCart aggregate.
// See https://doc.akka.io/docs/akka/2.6/typed/cluster-sharding.html
clusterSharding.init(
Entity(ShoppingCart.typeKey) { entityContext =>
ShoppingCart(entityContext)
}
)
}
abstract class ShoppingCartApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with ShoppingCartComponents
with LagomKafkaComponents {}
获取聚合根实体的实例
要访问聚合实例(可能在集群上本地或远程运行),应在服务上注入ClusterSharding
:
class ShoppingCartServiceImpl(
clusterSharding: ClusterSharding,
persistentEntityRegistry: PersistentEntityRegistry
)(implicit ec: ExecutionContext)
extends ShoppingCartService // class body follows
然后可以使用EntityRefFor
方法实例化EntityRef
。在我们的例子中,EntityRef
被输入为只接受 ShoppingCart.Command
。
def entityRef(id: String): EntityRef[ShoppingCartCommand] = {
clusterSharding.entityRefFor(ShoppingCart.typeKey, id)
}
要在集群中找到正确的参与者,需要指定用于初始化实体的EntityTypeKey
和所需实例的id
。Akka Cluster将在集群上的一个节点中创建所需的actor,或者如果actor已经创建且仍处于活动状态,则重用现有实例。EntityRef
类似于ActorRef
,但实体实例是分布在不同的节点。与EntityRef
交互意味着交换的消息可能需要路由传输到另一个节点。
使用ask同步模式的注意事项
由于我们希望向聚合发送命令,并且这些命令声明一个应答,因此我们需要使用ask模式。
我们下面介绍的代码从ShoppingCartServiceImpl
内部创建EntityRef
,这意味着我们可以从外部调用EntityRef
,ActorSystem. EntityRef
提供了一个现成的ask()
重载方法便于外部调用。
implicit val timeout = Timeout(5.seconds)
override def get(id: String): ServiceCall[NotUsed, ShoppingCartView] = ServiceCall { _ =>
entityRef(id)
.ask(reply => Get(reply))
.map(cartSummary => asShoppingCartView(id, cartSummary))
}
因此,我们声明一个隐式timeout,然后调用ask
(隐式使用超时)。ask
方法接受ActorRef[Res]=>M
的函数,其中Res
是预期的响应类型,M
是发送给参与者的消息。ask
方法将创建ActorRef[Res]
的实例,该实例可用于构建传出消息(命令)。一旦响应发送到ActorRef[Res]
,Akka将用响应(在本例中为Future[Summary]
)完成Future[Res]
的返回。
最后,我们对cartSummary
进行操作(在本例中,我们将其映射到另一种类型,即ShoppingCartView
)。ShoppingCartView
和asShoppingCartView
的定义如下:
final case class ShoppingCartItem(itemId: String, quantity: Int)
final case class ShoppingCartView(id: String, items: Seq[ShoppingCartItem], checkedOut: Boolean)
object ShoppingCartItem {
implicit val format: Format[ShoppingCartItem] = Json.format
}
object ShoppingCartView {
implicit val format: Format[ShoppingCartView] = Json.format
}
private def asShoppingCartView(id: String, cartSummary: Summary): ShoppingCartView = {
ShoppingCartView(
id,
cartSummary.items.map((ShoppingCartItem.apply _).tupled).toSeq,
cartSummary.checkedOut
)
}
配置分区数量
根据Akka建议,分区的数量应该比计划的最大集群节点数量高出10倍。不一定要精确。分区数少于节点数将导致某些节点无法承载任何分区。过多的分区将导致分区管理效率降低,例如重新平衡开销,并增加延迟,因为协调器参与了每个分区的第一条消息的路由。
有关如何配置分区数量的详细信息,请参阅Akka Cluster Sharding文档。
配置实体钝化
将所有聚合始终保存在内存中是低效的。取而代之的是,使用实体钝化功能,然后当分区实体(聚合)已被闲置一段时间时,将其从集群中移除。
Akka支持编程钝化和自动钝化。自动钝化作为默认方式通常可以满足要求。
数据序列化
消息(命令、回复)和持久类(事件、状态快照)需要可序列化,才能通过网络跨集群发送或存储在数据库中。Akka建议将基于Jackson的序列化程序(最好是JSON,但也支持CBOR)作为大多数情况下的默认值。在Akka序列化程序的基础上,Lagom可以轻松添加Play JSON序列化支持,这对一些Scala开发人员来说可能更熟悉。
特别是在Akka Persistence Typed中,当您采用CQRS/ES实践时,命令将包括replyTo:ActorRef[Reply]
字段。此replyTo
字段将在代码中用于发送回复,如上面的示例所示。序列化ActorRef[T]
需要使用Akka-Jackson序列化程序,这意味着不能使用Play JSON来序列化命令。
Akka Jackson用于命令消息的限制不适用于事件、快照甚至回复等其他消息。Akka需要序列化的每种类型都可能使用不同的序列化程序。
请在序列化部分中阅读有关序列化设置和配置的更多信息。
测试
Testing章节中涵盖了为聚合编写单元测试所需的所有步骤和功能。