服务是通过实现提供服务描述符特质接口来实现的,实现描述符指定的每个调用。例如,下面是HelloService描述符的一个实现:

  1. import com.lightbend.lagom.scaladsl.api.ServiceCall
  2. import scala.concurrent.Future
  3. class HelloServiceImpl extends HelloService {
  4. override def sayHello = ServiceCall { name =>
  5. Future.successful(s"Hello $name!")
  6. }
  7. }

正如你所看到的,sayHello方法是通过ServiceCall上的apply工厂方法实现的。它接受一个Request => Future[Response]的函数,并返回一个ServiceCall,其invoke方法简单地委托给该函数。这里需要认识到重要的一点是,对sayHello的调用本身并不执行调用,它只返回要执行的服务调用。这里的优点是,当将调用与其他横切关注点(如身份验证)组合时,可以使用普通的基于函数的组合轻松完成。
让我们再看一下ServiceCall接口定义:

  1. trait ServiceCall[Request, Response] {
  2. def invoke(request: Request): Future[Response]
  3. }

它将接受请求,并将响应作为[Future](https://www.scala-lang.org/api/2.12.x/scala/concurrent/Future.html)返回。如果您以前从未见过Future,那么这个值可能要到以后才可用。当一个API返回一个Future时,这个值可能还没有被计算,但API承诺在未来的某个时候,它将会被计算。因为这个值还没有计算出来,所以您不能立即与它交互。不过,您可以使用mapflatMap方法附加回调,将promise转换为一个新值的promise。FuturemapflatMap方法是Scala中进行响应式编程的基本构建单元,它们允许你的代码是异步的,而不是等待事情发生,而是附加回调来响应已经完成的计算。
当然,一个简单的hello world计算并不是异步的,它所需要做的就是构建一个String,并立即返回。在本例中,我们需要用一个Future中包装该结果。这可以通过调用Future.successful()来实现,该方法返回一个具有立即可用值的future

流处理

如果请求体和响应体是严格消息,那么使用它们就很简单了。但是,如果它们是流的,则需要使用Akka流来处理它们。让我们看看service descriptors示例中的一些流服务调用是如何实现的。
tick服务调用将返回一个按指定间隔发送消息的Source。对于这样的流,Akka流有一个可用的构造函数:

  1. import scala.concurrent.duration._
  2. import akka.stream.scaladsl.Source
  3. override def tick(intervalMs: Int) = ServiceCall { tickMessage =>
  4. Future.successful(
  5. Source
  6. .tick(
  7. intervalMs.milliseconds,
  8. intervalMs.milliseconds,
  9. tickMessage
  10. )
  11. .mapMaterializedValue(_ => NotUsed)
  12. )
  13. }

前两个参数是发送消息之前的延迟和发送消息的间隔。第三个参数是应该在每个刻度处发送的消息。如果使用间隔1000作为请求参数来触发tick服务调用,那么将有一个每秒发送一次tick消息的Stream作为结果被返回。
一个流式的sayHello服务调用可以通过映射传入的名字Source来实现:

  1. override def sayHello = ServiceCall { names =>
  2. Future.successful(names.map(name => s"Hello $name!"))
  3. }

当您map一个Source时,您将获得一个新Source,新Source将传入Source的每个消息执行映射转换。这些处理流的例子显然非常简单。“发布-订阅”和“持久读侧”这两个部分展示了在Lagom中使用流的真实例子

处理头信息

有时您可能需要处理请求头,或向响应头添加信息。ServiceCall提供handleRequestHeaderhandleResponseHeader方法来允许您这样做,但是不建议您直接实现这些,相反,您应该使用ServerServiceCall
ServerServiceCall是一个扩展自ServiceCall的接口,并提供了一个额外的方法invokeWithHeaders。这与常规调用方法不同,因为除了Request参数外,它还接受一个requesttheader参数。它返回一个Future[(ResponseHeader, Response)],而不是返回一个Future[Response]。因此,它允许您处理请求头,并发送自定义响应头。ServerServiceCall实现了handleRequestHeaderhandleResponseHeader方法,因此当Lagom调用invoke方法时,它被委托给invokeWithHeaders方法。
ServerServiceCall伴生对象提供了一个用于创建ServerServiceCall的工厂方法,该方法可以同时处理请求头和响应头。能够创建一个不与头文件一起工作的ServerServiceCall似乎是违反直觉的,但这样做的原因是为了协助服务调用组合,其中组合服务调用可能希望组合两种类型的服务调用。
下面是一个使用头文件的例子:

  1. import com.lightbend.lagom.scaladsl.server.ServerServiceCall
  2. import com.lightbend.lagom.scaladsl.api.transport.ResponseHeader
  3. override def sayHello = ServerServiceCall { (requestHeader, name) =>
  4. val user = requestHeader.principal
  5. .map(_.getName)
  6. .getOrElse("No one")
  7. val response = s"$user wants to say hello to $name"
  8. val responseHeader = ResponseHeader.Ok
  9. .withHeader("Server", "Hello service")
  10. Future.successful((responseHeader, response))
  11. }

服务调用组合

在某些情况下,您可能希望将服务调用与安全或日志记录等横切关注点组合在一起。在Lagom中,这是通过显式组合服务调用来实现的。下面是一个简单的日志服务调用:

  1. def logged[Request, Response](serviceCall: ServerServiceCall[Request, Response]) =
  2. ServerServiceCall.compose { requestHeader =>
  3. println(s"Received ${requestHeader.method} ${requestHeader.uri}")
  4. serviceCall
  5. }

使用ServerServiceCallcompose方法,该方法接受一个接受请求头的回调,并返回一个服务调用。
如果我们要实现被记录的HelloService,我们将像这样使用它:

  1. override def sayHello =
  2. logged(ServerServiceCall { name =>
  3. Future.successful(s"Hello $name!")
  4. })

另一个常见的横切关注点是身份验证。假设你有一个用户存储接口:

  1. trait UserStorage {
  2. def lookupUser(username: String): Future[Option[User]]
  3. }

你可以像这样使用它来实现一个经过验证的服务调用:

  1. import com.lightbend.lagom.scaladsl.api.transport.Forbidden
  2. def authenticated[Request, Response](
  3. serviceCall: User => ServerServiceCall[Request, Response]
  4. ) = ServerServiceCall.composeAsync { requestHeader =>
  5. // First lookup user
  6. val userLookup = requestHeader.principal
  7. .map(principal => userStorage.lookupUser(principal.getName))
  8. .getOrElse(Future.successful(None))
  9. // Then, if it exists, apply it to the service call
  10. userLookup.map {
  11. case Some(user) => serviceCall(user)
  12. case None => throw Forbidden("User must be authenticated to access this service call")
  13. }
  14. }

这一次,由于用户的查找是异步的,所以我们使用了composeAsync,它允许我们异步返回服务调用来处理服务。此外,我们接受用户对服务调用的函数,而不是简单地接受服务调用。这意味着服务调用可以访问用户:

  1. override def sayHello = authenticated { user =>
  2. ServerServiceCall { name =>
  3. Future.successful(s"$user is saying hello to $name")
  4. }
  5. }

注意,在其他框架中,用户对象可以通过线程局部变量或过滤器在非类型化映射中传递,而在这里,用户对象是显式传递的。如果您的代码需要访问用户对象,就不可能出现忘记放置过滤器的配置错误,代码将无法编译。
您通常希望将多个服务调用组合在一起。与注释相比,这正是基于函数的组合的强大之处。因为服务调用只是常规方法,你可以简单地定义一个新方法来组合它们,就像这样:

  1. def filter[Request, Response](
  2. serviceCall: User => ServerServiceCall[Request, Response]
  3. ) = logged(authenticated(serviceCall))

在hello服务中使用这个新方法:

  1. override def sayHello = filter { user =>
  2. ServerServiceCall { name =>
  3. Future.successful(s"$user is saying hello to $name")
  4. }
  5. }