当通过消息代理解耦服务间通信时,您可以从Topic的两端进行测试。当您的Service将事件发布到Topic中时(如声明主题中所述),您的测试应该验证是否将正确的数据推送到主题中。同时,当您的服务订阅了一个上游Topic时,您可能需要测试在有传入事件时您的Service的行为。
无论是在编写发布测试还是消费测试时,都不会启动代理。相反,Lagom提供了代理API的内存实现,以加快测试速度。应在以后实施与完整代理的集成测试,但这超出了本文档的范围。提供的代理API的内存实现在本地运行,并且只提供一次交付。如果您想在消息丢失(at-most-once)或消息重复(at-least-once)的情况下测试代码,您将负责通过注入重复消息或跳过消息来编写此类行为。
Lagom 内存版消息代理实现还将帮助测试消息序列化和反序列化。但这只在测试发布的工具中可用,因为发布端负责描述通过网络发送的消息。当您测试主题的消费端时,不会在后台运行反序列化。
下面的代码示例使用了HelloService和前面几节中已经介绍过的AnotherService。HelloService发布关于"greetings"主题的GreetingsMessageAnotherService使用atLeastOnce语义订阅这些消息。

测试发布

当服务将数据发布到Topic中时,描述符会列出公共API上的TopicCall 。测试事件发布与测试服务API中的ServiceCall非常相似。

  1. "The PublishService" should {
  2. "publish events on the topic" in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
  3. new PublishApplication(ctx) with LocalServiceLocator with TestTopicComponents
  4. } { server =>
  5. implicit val system = server.actorSystem
  6. implicit val mat = server.materializer
  7. val client: PublishService = server.serviceClient.implement[PublishService]
  8. val source = client.events().subscribe.atMostOnceSource
  9. source
  10. .runWith(TestSink.probe[PubMessage])
  11. .request(1)
  12. .expectNext should ===(PubMessage("msg 1"))
  13. }
  14. }

为了使用存根代理启动应用程序,必须将TestTopicComponents混入到测试应用程序中。
使用ServiceTest为您的服务创建一个客户端,并使用该客户端订阅已发布的主题。最后,与服务交互有某些事件的发出后,您可以断言该Topic上发布了事件。
生产者端负责描述公共API,并为所有交换的消息(ServiceCallsTopicCalls中的消息)提供可序列化的映射。授予发布操作正确行为的测试,还应该测试消息的序列化和反序列化能力。

测试订阅

测试消息的使用需要启动被测服务,使用上游服务的存根将数据生成到主题中。下面的代码片段演示了如何实现它。

  1. 内存中的Topic是必需的,它意味着向内存中的主题发送消息。使用ProducerStubFactory可以获得给定主题名称的ProducerStub
  2. 通过producerStub实例,可以构建一个服务存根来替换生产就绪的上游服务。这必须使用绑定到上一步中创建的ProducerStub的主题。
  3. 在测试中使用ProducerStub向主题发送消息,并与测试中的服务正常交互,以验证服务代码。 ```scala class AnotherServiceSpec extends WordSpec with Matchers with Eventually with ScalaFutures { var producerStub: ProducerStub[GreetingMessage] = _

    “The AnotherService” should { “publish updates on greetings message” in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>

    1. new AnotherApplication(ctx) with LocalServiceLocator {
    2. // (1) creates an in-memory topic and binds it to a producer stub
    3. val stubFactory = new ProducerStubFactory(actorSystem, materializer)
    4. producerStub = stubFactory.producer[GreetingMessage](HelloService.TOPIC_NAME)
    5. // (2) Override the default Hello service with our service stub
    6. // which gets the producer stub injected
    7. override lazy val helloService = new HelloServiceStub(producerStub)
    8. }

    } { server =>

    1. // (3) produce a message in the stubbed topic via it's producer
    2. producerStub.send(GreetingMessage("Hi there!"))
    3. // create a service client to assert the message was consumed
    4. eventually(timeout(Span(5, Seconds))) {
    5. // cannot use async specs here because eventually only detects raised exceptions to retry.
    6. // if a future fail at the first time, eventually won't retry though future will succeed later.
    7. // see https://github.com/lagom/lagom/issues/876 for detail info.
    8. val futureResp = server.serviceClient.implement[AnotherService].foo.invoke()
    9. whenReady(futureResp) { resp =>
    10. resp should ===("Hi there!")
    11. }
    12. }

    } } }

// (2) a Service stub that will use the in-memoru topic bound to // our producer stub class HelloServiceStub(stub: ProducerStub[GreetingMessage]) extends HelloService { override def greetingsTopic(): Topic[GreetingMessage] = stub.topic

override def hello(id: String): ServiceCall[NotUsed, String] = ???

override def useGreeting(id: String): ServiceCall[GreetingMessage, Done] = ??? } `` 在测试订阅时,被测试的代码可能包含一个本身就是生产者的服务。在这些情况下,用于单元测试的Application必须与用于生产环境的Application不同。单元测试中使用的Application不能混入LagomKafkaComponents,而只能使用TestTopicComponents`。