本页专门介绍Lagom对使用Slick的关系数据库读取端的支持。在阅读本文之前,您应该熟悉Lagom的一般读取端支持和关系数据库读取端支持概述。
配置
对Slick支持建立在Lagom对在关系数据库中存储持久实体的支持之上。有关如何配置Lagom以使用正确的JDBC驱动程序和数据库URL的说明,请参阅该指南。
接下来,我们需要为读取端模型配置灵活的映射。注意,这个例子使用的是slick.jdbc.H2Profile。请确保导入使用的依赖。
import slick.jdbc.H2Profile.api._class PostSummaryRepository {class PostSummaryTable(tag: Tag) extends Table[PostSummary](tag, "post_summary") {def * = (postId, title) <> (PostSummary.tupled, PostSummary.unapply)def postId = column[String]("post_id", O.PrimaryKey)def title = column[String]("title")}val postSummaries = TableQuery[PostSummaryTable]def selectPostSummaries() = postSummaries.result}
查询读测数据库
让我们首先看看服务如何使用Slick从关系数据库检索数据。
import com.lightbend.lagom.scaladsl.api.Serviceimport com.lightbend.lagom.scaladsl.api.ServiceCallimport slick.jdbc.JdbcBackend.Databaseclass BlogServiceImpl(db: Database, val postSummaryRepo: PostSummaryRepository) extends BlogService {override def getPostSummaries() = ServiceCall { request =>db.run(postSummaryRepo.selectPostSummaries())}
注意,一个Slick数据库与之前定义的PostSummaryRepository一起注入到构造函数中。Slick的数据库允许执行selectPostSummaries()返回的Slick DBIOAciton。重要的是,它还在一个线程池中管理阻塞JDBC调用的执行,该线程池设计用于处理JDBC调用,这就是它返回Future的原因。
修改读测数据
我们需要将持久实体生成的事件转换为可以查询的数据库表,如前一节所示。为此,我们将在SlickReadSide支持组件的帮助下实现ReadSideProcessor。它将使用持久实体生成的事件,并更新为查询而优化的一个或多个数据库表。
以下是ReadSideProcessor类在填写实现细节之前的样子:
import akka.Doneimport com.lightbend.lagom.scaladsl.persistence.AggregateEventTagimport com.lightbend.lagom.scaladsl.persistence.ReadSideProcessorimport com.lightbend.lagom.scaladsl.persistence.slick.SlickReadSideimport com.lightbend.lagom.scaladsl.persistence.EventStreamElementimport docs.home.scaladsl.persistence.SlickRepos.Full.PostSummaryRepositoryimport slick.dbio.DBIOimport scala.concurrent.ExecutionContextclass BlogEventProcessor(readSide: SlickReadSide,postSummaryRepo: PostSummaryRepository) extends ReadSideProcessor[BlogEvent] {override def buildHandler(): ReadSideProcessor.ReadSideHandler[BlogEvent] = {// TODO build read side handler???}override def aggregateTags: Set[AggregateEventTag[BlogEvent]] = {// TODO return the tag for the events???}}
可以看到,我们已经注入了Slick的读取端支持,这将在以后需要。
就像在文档中描述的那样,您应该已经实现了事件标记,因此首先我们将在读取端处理器存根中实现aggregateTags方法,如下所示:
override def aggregateTags: Set[AggregateEventTag[BlogEvent]] =BlogEvent.Tag.allTags
构建读侧处理程序
ReadSideProcessor上的另一个方法是buildHandler。这负责创建将处理事件的ReadSideHandler。它还提供了运行两个回调的机会,一个是全局准备回调,另一个是常规准备回调。
SlickReadSide有一个为这些读测处理器创建建造器的builder方法,该处理程序将自动管理事务并为您处理读取端偏移。可以这样创建:
val builder = readSide.builder[BlogEvent]("blogsummaryoffset")
传递给此方法的参数是读取端处理器的标识符,Lagom在保持偏移量时应该使用该标识符。Lagom将偏移量存储在一个表中,如果不存在,它将自动创建该表。如果您希望Lagom不会自动为您创建此表,可以在application.conf文件中设置lagom.persistence.jdbc.create-tables.auto=false来关闭此功能。此表模式的DDL如下所示:
CREATE TABLE read_side_offsets (read_side_id VARCHAR(255), tag VARCHAR(255),sequence_offset bigint, time_uuid_offset char(36),PRIMARY KEY (read_side_id, tag))
全局准备回调
全局准备回调在整个集群中至少运行一次。它用于创建表和准备任何需要在读端处理开始之前可用的数据。读端处理器可能会在多个节点上分片,因此创建表之类的任务通常只能在一个节点上完成。
全局准备回调从Akka集群单例运行。它可能会运行多次——每次新节点成为新的单例时,都会运行回调。因此,任务必须是幂等的。如果失败,它将使用指数退避再次运行,整个集群的读取端处理将在成功运行之前不会启动。
当然,设置全局准备回调是完全可选的,您可能更喜欢手动管理数据库表,但开发和测试环境使用此回调为您创建数据库表非常方便。
下面是一个示例方法,我们已经实现了使用Slick DDL生成来创建表。这里,对DDL语句的灵活支持仅用于在表不存在时创建表,这样操作就可以像前面解释的那样是幂等的。
import scala.concurrent.ExecutionContext.Implicits.globalimport slick.jdbc.H2Profile.api._class PostSummaryRepository {// table mapping omitted for concisenessval postSummaries = TableQuery[PostSummaryTable]def createTable = postSummaries.schema.createIfNotExists}
定义这种方法的最佳位置是在Model Repository中,我们通常在其中添加与数据库操作相关的所有代码。
然后可以在buildHandler方法中将其注册为全局准备回调:
builder.setGlobalPrepare(postSummaryRepo.createTable)
准备回调
除了全局准备回调之外,还可以通过调用 builder.setPrepare来设置。当读取端处理器启动时,每个分片将执行一次。
如果您阅读了Cassandra read side support guide,您可能已经看到它被用于准备数据库语句以备将来使用。然而,JDBC PreparedStatement实例不能保证线程安全,因此prepare回调不应用于关系数据库。
同样,这个回调是可选的,在我们的示例中,我们不需要准备回调,因此没有指定任何回调。
注册读测处理器
一旦创建读端处理器后,需要向Lagom注册。这是使用ReadSide组件完成的:
class BlogServiceImpl(persistentEntityRegistry: PersistentEntityRegistry, readSide: ReadSide, myDatabase: MyDatabase)extends BlogService {readSide.register[BlogEvent](new BlogEventProcessor(myDatabase))
请注意,如果使用Macwire进行依赖项注入,只需将以下内容添加到应用程序加载器:
readSide.register(wire[BlogEventProcessor])
事件处理器
事件处理程序接收一个事件并返回一个DBIOAction,下面是一个处理PostAdded事件的回调示例:
/* added to PostSummaryRepository to insert or update Post Summaries */def save(postSummary: PostSummary) = {postSummaries.insertOrUpdate(postSummary).map(_ => Done)}private def processPostAdded(eventElement: EventStreamElement[PostAdded]): DBIO[Done] = {postSummaryRepo.save(PostSummary(eventElement.event.postId,eventElement.event.content.title))}
然后可以使用setEventHandler向构建器注册:
builder.setEventHandler[PostAdded](processPostAdded)
注册完所有事件处理程序后,可以调用build方法并返回生成的处理程序:
builder.build()
应用程序加载器
Lagom应用程序加载器需要配置为Slick 持久化。这可以通过混入SlickPersistentComponents特质来实现,如下所示:
abstract class SlickBlogApplication(context: LagomApplicationContext)extends LagomApplication(context)with JdbcPersistenceComponentswith SlickPersistenceComponentswith HikariCPComponentswith AhcWSComponents {
