本页专门介绍Lagom对使用Slick的关系数据库读取端的支持。在阅读本文之前,您应该熟悉Lagom的一般读取端支持和关系数据库读取端支持概述。
配置
对Slick
支持建立在Lagom对在关系数据库中存储持久实体的支持之上。有关如何配置Lagom以使用正确的JDBC驱动程序和数据库URL的说明,请参阅该指南。
接下来,我们需要为读取端模型配置灵活的映射。注意,这个例子使用的是slick.jdbc.H2Profile
。请确保导入使用的依赖。
import slick.jdbc.H2Profile.api._
class PostSummaryRepository {
class PostSummaryTable(tag: Tag) extends Table[PostSummary](tag, "post_summary") {
def * = (postId, title) <> (PostSummary.tupled, PostSummary.unapply)
def postId = column[String]("post_id", O.PrimaryKey)
def title = column[String]("title")
}
val postSummaries = TableQuery[PostSummaryTable]
def selectPostSummaries() = postSummaries.result
}
查询读测数据库
让我们首先看看服务如何使用Slick从关系数据库检索数据。
import com.lightbend.lagom.scaladsl.api.Service
import com.lightbend.lagom.scaladsl.api.ServiceCall
import slick.jdbc.JdbcBackend.Database
class BlogServiceImpl(db: Database, val postSummaryRepo: PostSummaryRepository) extends BlogService {
override def getPostSummaries() = ServiceCall { request =>
db.run(postSummaryRepo.selectPostSummaries())
}
注意,一个Slick数据库与之前定义的PostSummaryRepository
一起注入到构造函数中。Slick的数据库允许执行selectPostSummaries()
返回的Slick DBIOAciton
。重要的是,它还在一个线程池中管理阻塞JDBC调用的执行,该线程池设计用于处理JDBC调用,这就是它返回Future
的原因。
修改读测数据
我们需要将持久实体生成的事件转换为可以查询的数据库表,如前一节所示。为此,我们将在SlickReadSide支持组件的帮助下实现ReadSideProcessor。它将使用持久实体生成的事件,并更新为查询而优化的一个或多个数据库表。
以下是ReadSideProcessor类在填写实现细节之前的样子:
import akka.Done
import com.lightbend.lagom.scaladsl.persistence.AggregateEventTag
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor
import com.lightbend.lagom.scaladsl.persistence.slick.SlickReadSide
import com.lightbend.lagom.scaladsl.persistence.EventStreamElement
import docs.home.scaladsl.persistence.SlickRepos.Full.PostSummaryRepository
import slick.dbio.DBIO
import scala.concurrent.ExecutionContext
class BlogEventProcessor(
readSide: SlickReadSide,
postSummaryRepo: PostSummaryRepository
) extends ReadSideProcessor[BlogEvent] {
override def buildHandler(): ReadSideProcessor.ReadSideHandler[BlogEvent] = {
// TODO build read side handler
???
}
override def aggregateTags: Set[AggregateEventTag[BlogEvent]] = {
// TODO return the tag for the events
???
}
}
可以看到,我们已经注入了Slick的读取端支持,这将在以后需要。
就像在文档中描述的那样,您应该已经实现了事件标记,因此首先我们将在读取端处理器存根中实现aggregateTags
方法,如下所示:
override def aggregateTags: Set[AggregateEventTag[BlogEvent]] =
BlogEvent.Tag.allTags
构建读侧处理程序
ReadSideProcessor
上的另一个方法是buildHandler
。这负责创建将处理事件的ReadSideHandler。它还提供了运行两个回调的机会,一个是全局准备回调,另一个是常规准备回调。
SlickReadSide有一个为这些读测处理器创建建造器的builder
方法,该处理程序将自动管理事务并为您处理读取端偏移。可以这样创建:
val builder = readSide.builder[BlogEvent]("blogsummaryoffset")
传递给此方法的参数是读取端处理器的标识符,Lagom在保持偏移量时应该使用该标识符。Lagom将偏移量存储在一个表中,如果不存在,它将自动创建该表。如果您希望Lagom不会自动为您创建此表,可以在application.conf
文件中设置lagom.persistence.jdbc.create-tables.auto=false
来关闭此功能。此表模式的DDL如下所示:
CREATE TABLE read_side_offsets (
read_side_id VARCHAR(255), tag VARCHAR(255),
sequence_offset bigint, time_uuid_offset char(36),
PRIMARY KEY (read_side_id, tag)
)
全局准备回调
全局准备回调在整个集群中至少运行一次。它用于创建表和准备任何需要在读端处理开始之前可用的数据。读端处理器可能会在多个节点上分片,因此创建表之类的任务通常只能在一个节点上完成。
全局准备回调从Akka集群单例运行。它可能会运行多次——每次新节点成为新的单例时,都会运行回调。因此,任务必须是幂等的。如果失败,它将使用指数退避再次运行,整个集群的读取端处理将在成功运行之前不会启动。
当然,设置全局准备回调是完全可选的,您可能更喜欢手动管理数据库表,但开发和测试环境使用此回调为您创建数据库表非常方便。
下面是一个示例方法,我们已经实现了使用Slick DDL生成来创建表。这里,对DDL语句的灵活支持仅用于在表不存在时创建表,这样操作就可以像前面解释的那样是幂等的。
import scala.concurrent.ExecutionContext.Implicits.global
import slick.jdbc.H2Profile.api._
class PostSummaryRepository {
// table mapping omitted for conciseness
val postSummaries = TableQuery[PostSummaryTable]
def createTable = postSummaries.schema.createIfNotExists
}
定义这种方法的最佳位置是在Model Repository
中,我们通常在其中添加与数据库操作相关的所有代码。
然后可以在buildHandler
方法中将其注册为全局准备回调:
builder.setGlobalPrepare(postSummaryRepo.createTable)
准备回调
除了全局准备回调之外,还可以通过调用 builder.setPrepare来设置。当读取端处理器启动时,每个分片将执行一次。
如果您阅读了Cassandra read side support guide,您可能已经看到它被用于准备数据库语句以备将来使用。然而,JDBC PreparedStatement
实例不能保证线程安全,因此prepare回调不应用于关系数据库。
同样,这个回调是可选的,在我们的示例中,我们不需要准备回调,因此没有指定任何回调。
注册读测处理器
一旦创建读端处理器后,需要向Lagom注册。这是使用ReadSide
组件完成的:
class BlogServiceImpl(persistentEntityRegistry: PersistentEntityRegistry, readSide: ReadSide, myDatabase: MyDatabase)
extends BlogService {
readSide.register[BlogEvent](new BlogEventProcessor(myDatabase))
请注意,如果使用Macwire进行依赖项注入,只需将以下内容添加到应用程序加载器:
readSide.register(wire[BlogEventProcessor])
事件处理器
事件处理程序接收一个事件并返回一个DBIOAction
,下面是一个处理PostAdded
事件的回调示例:
/* added to PostSummaryRepository to insert or update Post Summaries */
def save(postSummary: PostSummary) = {
postSummaries.insertOrUpdate(postSummary).map(_ => Done)
}
private def processPostAdded(eventElement: EventStreamElement[PostAdded]): DBIO[Done] = {
postSummaryRepo.save(
PostSummary(
eventElement.event.postId,
eventElement.event.content.title
)
)
}
然后可以使用setEventHandler
向构建器注册:
builder.setEventHandler[PostAdded](processPostAdded)
注册完所有事件处理程序后,可以调用build
方法并返回生成的处理程序:
builder.build()
应用程序加载器
Lagom应用程序加载器需要配置为Slick 持久化。这可以通过混入SlickPersistentComponents
特质来实现,如下所示:
abstract class SlickBlogApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with JdbcPersistenceComponents
with SlickPersistenceComponents
with HikariCPComponents
with AhcWSComponents {