本页专门介绍Lagom对使用Slick的关系数据库读取端的支持。在阅读本文之前,您应该熟悉Lagom的一般读取端支持和关系数据库读取端支持概述。

配置

Slick支持建立在Lagom对在关系数据库中存储持久实体的支持之上。有关如何配置Lagom以使用正确的JDBC驱动程序和数据库URL的说明,请参阅该指南。
接下来,我们需要为读取端模型配置灵活的映射。注意,这个例子使用的是slick.jdbc.H2Profile。请确保导入使用的依赖。

  1. import slick.jdbc.H2Profile.api._
  2. class PostSummaryRepository {
  3. class PostSummaryTable(tag: Tag) extends Table[PostSummary](tag, "post_summary") {
  4. def * = (postId, title) <> (PostSummary.tupled, PostSummary.unapply)
  5. def postId = column[String]("post_id", O.PrimaryKey)
  6. def title = column[String]("title")
  7. }
  8. val postSummaries = TableQuery[PostSummaryTable]
  9. def selectPostSummaries() = postSummaries.result
  10. }

查询读测数据库

让我们首先看看服务如何使用Slick从关系数据库检索数据。

  1. import com.lightbend.lagom.scaladsl.api.Service
  2. import com.lightbend.lagom.scaladsl.api.ServiceCall
  3. import slick.jdbc.JdbcBackend.Database
  4. class BlogServiceImpl(db: Database, val postSummaryRepo: PostSummaryRepository) extends BlogService {
  5. override def getPostSummaries() = ServiceCall { request =>
  6. db.run(postSummaryRepo.selectPostSummaries())
  7. }

注意,一个Slick数据库与之前定义的PostSummaryRepository一起注入到构造函数中。Slick的数据库允许执行selectPostSummaries()返回的Slick DBIOAciton。重要的是,它还在一个线程池中管理阻塞JDBC调用的执行,该线程池设计用于处理JDBC调用,这就是它返回Future的原因。

修改读测数据

我们需要将持久实体生成的事件转换为可以查询的数据库表,如前一节所示。为此,我们将在SlickReadSide支持组件的帮助下实现ReadSideProcessor。它将使用持久实体生成的事件,并更新为查询而优化的一个或多个数据库表。
以下是ReadSideProcessor类在填写实现细节之前的样子:

  1. import akka.Done
  2. import com.lightbend.lagom.scaladsl.persistence.AggregateEventTag
  3. import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor
  4. import com.lightbend.lagom.scaladsl.persistence.slick.SlickReadSide
  5. import com.lightbend.lagom.scaladsl.persistence.EventStreamElement
  6. import docs.home.scaladsl.persistence.SlickRepos.Full.PostSummaryRepository
  7. import slick.dbio.DBIO
  8. import scala.concurrent.ExecutionContext
  9. class BlogEventProcessor(
  10. readSide: SlickReadSide,
  11. postSummaryRepo: PostSummaryRepository
  12. ) extends ReadSideProcessor[BlogEvent] {
  13. override def buildHandler(): ReadSideProcessor.ReadSideHandler[BlogEvent] = {
  14. // TODO build read side handler
  15. ???
  16. }
  17. override def aggregateTags: Set[AggregateEventTag[BlogEvent]] = {
  18. // TODO return the tag for the events
  19. ???
  20. }
  21. }

可以看到,我们已经注入了Slick的读取端支持,这将在以后需要。
就像在文档中描述的那样,您应该已经实现了事件标记,因此首先我们将在读取端处理器存根中实现aggregateTags方法,如下所示:

  1. override def aggregateTags: Set[AggregateEventTag[BlogEvent]] =
  2. BlogEvent.Tag.allTags

构建读侧处理程序

ReadSideProcessor上的另一个方法是buildHandler。这负责创建将处理事件的ReadSideHandler。它还提供了运行两个回调的机会,一个是全局准备回调,另一个是常规准备回调。
SlickReadSide有一个为这些读测处理器创建建造器的builder方法,该处理程序将自动管理事务并为您处理读取端偏移。可以这样创建:

  1. val builder = readSide.builder[BlogEvent]("blogsummaryoffset")

传递给此方法的参数是读取端处理器的标识符,Lagom在保持偏移量时应该使用该标识符。Lagom将偏移量存储在一个表中,如果不存在,它将自动创建该表。如果您希望Lagom不会自动为您创建此表,可以在application.conf文件中设置lagom.persistence.jdbc.create-tables.auto=false来关闭此功能。此表模式的DDL如下所示:

  1. CREATE TABLE read_side_offsets (
  2. read_side_id VARCHAR(255), tag VARCHAR(255),
  3. sequence_offset bigint, time_uuid_offset char(36),
  4. PRIMARY KEY (read_side_id, tag)
  5. )

全局准备回调

全局准备回调在整个集群中至少运行一次。它用于创建表和准备任何需要在读端处理开始之前可用的数据。读端处理器可能会在多个节点上分片,因此创建表之类的任务通常只能在一个节点上完成。
全局准备回调从Akka集群单例运行。它可能会运行多次——每次新节点成为新的单例时,都会运行回调。因此,任务必须是幂等的。如果失败,它将使用指数退避再次运行,整个集群的读取端处理将在成功运行之前不会启动。
当然,设置全局准备回调是完全可选的,您可能更喜欢手动管理数据库表,但开发和测试环境使用此回调为您创建数据库表非常方便。
下面是一个示例方法,我们已经实现了使用Slick DDL生成来创建表。这里,对DDL语句的灵活支持仅用于在表不存在时创建表,这样操作就可以像前面解释的那样是幂等的。

  1. import scala.concurrent.ExecutionContext.Implicits.global
  2. import slick.jdbc.H2Profile.api._
  3. class PostSummaryRepository {
  4. // table mapping omitted for conciseness
  5. val postSummaries = TableQuery[PostSummaryTable]
  6. def createTable = postSummaries.schema.createIfNotExists
  7. }

定义这种方法的最佳位置是在Model Repository中,我们通常在其中添加与数据库操作相关的所有代码。
然后可以在buildHandler方法中将其注册为全局准备回调:

  1. builder.setGlobalPrepare(postSummaryRepo.createTable)

准备回调

除了全局准备回调之外,还可以通过调用 builder.setPrepare来设置。当读取端处理器启动时,每个分片将执行一次。
如果您阅读了Cassandra read side support guide,您可能已经看到它被用于准备数据库语句以备将来使用。然而,JDBC PreparedStatement实例不能保证线程安全,因此prepare回调不应用于关系数据库。
同样,这个回调是可选的,在我们的示例中,我们不需要准备回调,因此没有指定任何回调。

注册读测处理器

一旦创建读端处理器后,需要向Lagom注册。这是使用ReadSide组件完成的:

  1. class BlogServiceImpl(persistentEntityRegistry: PersistentEntityRegistry, readSide: ReadSide, myDatabase: MyDatabase)
  2. extends BlogService {
  3. readSide.register[BlogEvent](new BlogEventProcessor(myDatabase))

请注意,如果使用Macwire进行依赖项注入,只需将以下内容添加到应用程序加载器:

  1. readSide.register(wire[BlogEventProcessor])

事件处理器

事件处理程序接收一个事件并返回一个DBIOAction,下面是一个处理PostAdded事件的回调示例:

  1. /* added to PostSummaryRepository to insert or update Post Summaries */
  2. def save(postSummary: PostSummary) = {
  3. postSummaries.insertOrUpdate(postSummary).map(_ => Done)
  4. }
  5. private def processPostAdded(eventElement: EventStreamElement[PostAdded]): DBIO[Done] = {
  6. postSummaryRepo.save(
  7. PostSummary(
  8. eventElement.event.postId,
  9. eventElement.event.content.title
  10. )
  11. )
  12. }

然后可以使用setEventHandler向构建器注册:

  1. builder.setEventHandler[PostAdded](processPostAdded)

注册完所有事件处理程序后,可以调用build方法并返回生成的处理程序:

  1. builder.build()

应用程序加载器

Lagom应用程序加载器需要配置为Slick 持久化。这可以通过混入SlickPersistentComponents特质来实现,如下所示:

  1. abstract class SlickBlogApplication(context: LagomApplicationContext)
  2. extends LagomApplication(context)
  3. with JdbcPersistenceComponents
  4. with SlickPersistenceComponents
  5. with HikariCPComponents
  6. with AhcWSComponents {