Lagom 内存版消息代理实现还将帮助测试消息序列化和反序列化。但这只在测试发布的工具中可用,因为发布端负责描述通过网络发送的消息。当您测试主题的消费端时,不会在后台运行反序列化。


当服务将数据发布到Topic中时,描述符会列出公共API上的TopicCall 。测试事件发布与测试服务API中的ServiceCall非常相似。

  1. "The PublishService" should {
  2. "publish events on the topic" in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
  3. new PublishApplication(ctx) with LocalServiceLocator with TestTopicComponents
  4. } { server =>
  5. implicit val system = server.actorSystem
  6. implicit val mat = server.materializer
  7. val client: PublishService = server.serviceClient.implement[PublishService]
  8. val source = client.events().subscribe.atMostOnceSource
  9. source
  10. .runWith(TestSink.probe[PubMessage])
  11. .request(1)
  12. .expectNext should ===(PubMessage("msg 1"))
  13. }
  14. }




  1. 内存中的Topic是必需的,它意味着向内存中的主题发送消息。使用ProducerStubFactory可以获得给定主题名称的ProducerStub
  2. 通过producerStub实例,可以构建一个服务存根来替换生产就绪的上游服务。这必须使用绑定到上一步中创建的ProducerStub的主题。
  3. 在测试中使用ProducerStub向主题发送消息,并与测试中的服务正常交互,以验证服务代码。 ```scala class AnotherServiceSpec extends WordSpec with Matchers with Eventually with ScalaFutures { var producerStub: ProducerStub[GreetingMessage] = _

    “The AnotherService” should { “publish updates on greetings message” in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>

    1. new AnotherApplication(ctx) with LocalServiceLocator {
    2. // (1) creates an in-memory topic and binds it to a producer stub
    3. val stubFactory = new ProducerStubFactory(actorSystem, materializer)
    4. producerStub = stubFactory.producer[GreetingMessage](HelloService.TOPIC_NAME)
    5. // (2) Override the default Hello service with our service stub
    6. // which gets the producer stub injected
    7. override lazy val helloService = new HelloServiceStub(producerStub)
    8. }

    } { server =>

    1. // (3) produce a message in the stubbed topic via it's producer
    2. producerStub.send(GreetingMessage("Hi there!"))
    3. // create a service client to assert the message was consumed
    4. eventually(timeout(Span(5, Seconds))) {
    5. // cannot use async specs here because eventually only detects raised exceptions to retry.
    6. // if a future fail at the first time, eventually won't retry though future will succeed later.
    7. // see https://github.com/lagom/lagom/issues/876 for detail info.
    8. val futureResp = server.serviceClient.implement[AnotherService].foo.invoke()
    9. whenReady(futureResp) { resp =>
    10. resp should ===("Hi there!")
    11. }
    12. }

    } } }

// (2) a Service stub that will use the in-memoru topic bound to // our producer stub class HelloServiceStub(stub: ProducerStub[GreetingMessage]) extends HelloService { override def greetingsTopic(): Topic[GreetingMessage] = stub.topic

override def hello(id: String): ServiceCall[NotUsed, String] = ???

override def useGreeting(id: String): ServiceCall[GreetingMessage, Done] = ??? } `` 在测试订阅时,被测试的代码可能包含一个本身就是生产者的服务。在这些情况下,用于单元测试的Application必须与用于生产环境的Application不同。单元测试中使用的Application不能混入LagomKafkaComponents,而只能使用TestTopicComponents`。