当通过消息代理解耦服务间通信时,您可以从Topic
的两端进行测试。当您的Service
将事件发布到Topic
中时(如声明主题中所述),您的测试应该验证是否将正确的数据推送到主题中。同时,当您的服务订阅了一个上游Topic
时,您可能需要测试在有传入事件时您的Service
的行为。
无论是在编写发布测试还是消费测试时,都不会启动代理。相反,Lagom提供了代理API的内存实现,以加快测试速度。应在以后实施与完整代理的集成测试,但这超出了本文档的范围。提供的代理API的内存实现在本地运行,并且只提供一次交付。如果您想在消息丢失(at-most-once
)或消息重复(at-least-once
)的情况下测试代码,您将负责通过注入重复消息或跳过消息来编写此类行为。
Lagom 内存版消息代理实现还将帮助测试消息序列化和反序列化。但这只在测试发布的工具中可用,因为发布端负责描述通过网络发送的消息。当您测试主题的消费端时,不会在后台运行反序列化。
下面的代码示例使用了HelloService和前面几节中已经介绍过的AnotherService
。HelloService发布关于"greetings"
主题的GreetingsMessage
,AnotherService
使用atLeastOnce
语义订阅这些消息。
测试发布
当服务将数据发布到Topic
中时,描述符会列出公共API上的TopicCall
。测试事件发布与测试服务API中的ServiceCall
非常相似。
"The PublishService" should {
"publish events on the topic" in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
new PublishApplication(ctx) with LocalServiceLocator with TestTopicComponents
} { server =>
implicit val system = server.actorSystem
implicit val mat = server.materializer
val client: PublishService = server.serviceClient.implement[PublishService]
val source = client.events().subscribe.atMostOnceSource
source
.runWith(TestSink.probe[PubMessage])
.request(1)
.expectNext should ===(PubMessage("msg 1"))
}
}
为了使用存根代理启动应用程序,必须将TestTopicComponents
混入到测试应用程序中。
使用ServiceTest
为您的服务创建一个客户端,并使用该客户端订阅已发布的主题。最后,与服务交互有某些事件的发出后,您可以断言该Topic
上发布了事件。
生产者端负责描述公共API,并为所有交换的消息(ServiceCalls
和TopicCalls
中的消息)提供可序列化的映射。授予发布操作正确行为的测试,还应该测试消息的序列化和反序列化能力。
测试订阅
测试消息的使用需要启动被测服务,使用上游服务的存根将数据生成到主题中。下面的代码片段演示了如何实现它。
- 内存中的
Topic
是必需的,它意味着向内存中的主题发送消息。使用ProducerStubFactory
可以获得给定主题名称的ProducerStub
。 - 通过
producerStub
实例,可以构建一个服务存根来替换生产就绪的上游服务。这必须使用绑定到上一步中创建的ProducerStub
的主题。 在测试中使用
ProducerStub
向主题发送消息,并与测试中的服务正常交互,以验证服务代码。 ```scala class AnotherServiceSpec extends WordSpec with Matchers with Eventually with ScalaFutures { var producerStub: ProducerStub[GreetingMessage] = _“The AnotherService” should { “publish updates on greetings message” in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
new AnotherApplication(ctx) with LocalServiceLocator {
// (1) creates an in-memory topic and binds it to a producer stub
val stubFactory = new ProducerStubFactory(actorSystem, materializer)
producerStub = stubFactory.producer[GreetingMessage](HelloService.TOPIC_NAME)
// (2) Override the default Hello service with our service stub
// which gets the producer stub injected
override lazy val helloService = new HelloServiceStub(producerStub)
}
} { server =>
// (3) produce a message in the stubbed topic via it's producer
producerStub.send(GreetingMessage("Hi there!"))
// create a service client to assert the message was consumed
eventually(timeout(Span(5, Seconds))) {
// cannot use async specs here because eventually only detects raised exceptions to retry.
// if a future fail at the first time, eventually won't retry though future will succeed later.
// see https://github.com/lagom/lagom/issues/876 for detail info.
val futureResp = server.serviceClient.implement[AnotherService].foo.invoke()
whenReady(futureResp) { resp =>
resp should ===("Hi there!")
}
}
} } }
// (2) a Service stub that will use the in-memoru topic bound to // our producer stub class HelloServiceStub(stub: ProducerStub[GreetingMessage]) extends HelloService { override def greetingsTopic(): Topic[GreetingMessage] = stub.topic
override def hello(id: String): ServiceCall[NotUsed, String] = ???
override def useGreeting(id: String): ServiceCall[GreetingMessage, Done] = ???
}
``
在测试订阅时,被测试的代码可能包含一个本身就是生产者的服务。在这些情况下,用于单元测试的
Application必须与用于生产环境的
Application不同。单元测试中使用的
Application不能混入
LagomKafkaComponents,而只能使用
TestTopicComponents`。