测试actor系统

1 Testkit实例

这是Ray Roestenburg 在他的博客中的示例代码, 作了改动以兼容 Akka 2.x。

  1. import scala.util.Random
  2. import org.scalatest.BeforeAndAfterAll
  3. import org.scalatest.WordSpecLike
  4. import org.scalatest.Matchers
  5. import com.typesafe.config.ConfigFactory
  6. import akka.actor.Actor
  7. import akka.actor.ActorRef
  8. import akka.actor.ActorSystem
  9. import akka.actor.Props
  10. import akka.testkit.{ TestActors, DefaultTimeout, ImplicitSender, TestKit }
  11. import scala.concurrent.duration._
  12. import scala.collection.immutable
  13. //演示TestKit的示例测试
  14. class TestKitUsageSpec
  15. extends TestKit(ActorSystem("TestKitUsageSpec",
  16. ConfigFactory.parseString(TestKitUsageSpec.config)))
  17. with DefaultTimeout with ImplicitSender
  18. with WordSpecLike with Matchers with BeforeAndAfterAll {
  19. import TestKitUsageSpec._
  20. val echoRef = system.actorOf(TestActors.echoActorProps)
  21. val forwardRef = system.actorOf(Props(classOf[ForwardingActor], testActor))
  22. val filterRef = system.actorOf(Props(classOf[FilteringActor], testActor))
  23. val randomHead = Random.nextInt(6)
  24. val randomTail = Random.nextInt(10)
  25. val headList = immutable.Seq().padTo(randomHead, "0")
  26. val tailList = immutable.Seq().padTo(randomTail, "1")
  27. val seqRef =system.actorOf(Props(classOf[SequencingActor], testActor, headList, tailList))
  28. override def afterAll {
  29. shutdown()
  30. }
  31. "An EchoActor" should {
  32. "Respond with the same message it receives" in {
  33. within(500 millis) {
  34. echoRef ! "test"
  35. expectMsg("test")
  36. }
  37. }
  38. }
  39. "A ForwardingActor" should {
  40. "Forward a message it receives" in {
  41. within(500 millis) {
  42. forwardRef ! "test"
  43. expectMsg("test")
  44. }
  45. }
  46. }
  47. "A FilteringActor" should {
  48. "Filter all messages, except expected messagetypes it receives" in {
  49. var messages = Seq[String]()
  50. within(500 millis) {
  51. filterRef ! "test"
  52. expectMsg("test")
  53. filterRef ! 1
  54. expectNoMsg
  55. filterRef ! "some"
  56. filterRef ! "more"
  57. filterRef ! 1
  58. filterRef ! "text"
  59. filterRef ! 1
  60. receiveWhile(500 millis) {
  61. case msg: String => messages = msg +: messages
  62. } }
  63. messages.length should be(3)
  64. messages.reverse should be(Seq("some", "more", "text"))
  65. }
  66. }
  67. "A SequencingActor" should {
  68. "receive an interesting message at some point " in {
  69. within(500 millis) {
  70. ignoreMsg {
  71. case msg: String => msg != "something"
  72. }
  73. seqRef ! "something"
  74. expectMsg("something")
  75. ignoreMsg {
  76. case msg: String => msg == "1"
  77. }
  78. expectNoMsg
  79. ignoreNoMsg
  80. }
  81. }
  82. }
  83. }
  84. object TestKitUsageSpec {
  85. // Define your test specific configuration here
  86. val config = """
  87. akka {
  88. loglevel = "WARNING"
  89. } """
  90. //将所有消息转发给另一个Actor的actor
  91. class ForwardingActor(next: ActorRef) extends Actor {
  92. def receive = {
  93. case msg => next ! msg
  94. }
  95. }
  96. //仅转发一部分消息给另一个Actor的Actor
  97. class FilteringActor(next: ActorRef) extends Actor {
  98. def receive = {
  99. case msg: String => next ! msg
  100. case _ => None
  101. }
  102. }
  103. //此actor发送一个消息序列,此序列具有随机列表作为头,一个关心的值及一个随机列表作为尾
  104. //想法是你想测试接收到的关心的值,而不用处理其它部分
  105. class SequencingActor(next: ActorRef, head: immutable.Seq[String],
  106. tail: immutable.Seq[String]) extends Actor {
  107. def receive = {
  108. case msg => {
  109. head foreach { next ! _ }
  110. next ! msg
  111. tail foreach { next ! _ }
  112. }
  113. }
  114. }
  115. }

对于任何软件开发,自动化测试都是开发过程的一个重要组成部分。actor 模型对于代码单元如何划分,它们之间如何交互提供了一种新的视角, 这对如何编写测试也造成了影响。

Akka 有一个专门的akka-testkit模块来支持不同层次上的测试, 很明显共有两个类别:

  • 测试独立的不包括actor模型的代码,即没有多线程的内容;在事件发生的次序方面有完全确定性的行为,没有任何并发考虑, 这在下文中称为单元测试(Unit Testing)。
  • 测试(多个)包装过的actor,包括多线程调度; 事件的次序没有确定性但由于使用了actor模型,不需要考虑并发,这在下文中被称为集成测试(Integration Testing)。

当然这两个类型有着不同的粒度, 单元测试通常是白盒测试而集成测试是对完整的actor网络进行的功能测试。 其中重要的区别是并发的考虑是否是测试是一部分。我们提供的工具将在下面的章节中详细介绍。

2 用TestActorRef做同步单元测试

测试Actor 类中的业务逻辑分为两部分: 首先,每个原子操作必须独立动作,然后输入的事件序列必须被正确处理, 即使事件的次序存在一些可能的变化。 前者是单线程单元测试的主要使用场景,而后者可以在集成测试中进行确认。

通常,ActorRef将实际的Actor实例与外界隔离开, 唯一的通信通道是actor的邮箱。 这个限制是单元测试的障碍,所以我们引进了TestActorRef。这个特殊类型的引用是专门为测试设计的,它允许以两种方式访问actor: 通过获取实际actor实例的引用,通过调用或查询actor的行为(receive)。 以下每一种方式都有专门的部分介绍。

获取 Actor 的引用

能够访问到实际的 Actor 对象使得所有传统的单元测试方法可以用于测试其中的方法。 获取引用的方法:

  1. import akka.testkit.TestActorRef
  2. val actorRef = TestActorRef[MyActor]
  3. val actor = actorRef.underlyingActor

由于TestActorRef是actor类型的高阶类型,它返回实际actor及其正确的静态类型。这之后你就可以像平常一样将你的任何单元测试工具用于你的actor。

测试有限状态机

如果你要测试的actor是一个FSM, 你可以使用专门的TestFSMRef,它拥有普通TestActorRef的所有功能,并且能够访问其内部状态:

  1. import akka.testkit.TestFSMRef
  2. import akka.actor.FSM
  3. import akka.util.duration._
  4. val fsm = TestFSMRef(new Actor with FSM[Int, String] {
  5. startWith(1, "")
  6. when(1) {
  7. case Event("go", _) => goto(2) using "go"
  8. }
  9. when(2) {
  10. case Event("back", _) => goto(1) using "back"
  11. }
  12. })
  13. assert(fsm.stateName == 1)
  14. assert(fsm.stateData == "")
  15. fsm ! "go" // being a TestActorRef, this runs also on the CallingThreadDispatcher
  16. assert(fsm.stateName == 2)
  17. assert(fsm.stateData == "go")
  18. fsm.setState(stateName = 1)
  19. assert(fsm.stateName == 1)
  20. assert(fsm.timerActive_?("test") == false)
  21. fsm.setTimer("test", 12, 10 millis, true)
  22. assert(fsm.timerActive_?("test") == true)
  23. fsm.cancelTimer("test")
  24. assert(fsm.timerActive_?("test") == false)

由于Scala类型推测的限制,只有一个如上所示的工厂方法,所以你可能需要写象TestFSMRef(new MyFSM)这样的代码,而不是想象中的类似ActorRefTestFSMRef[MyFSM]。上例所示的所有方法都直接访问FSM的状态,不作任何同步;这在使用CallingThreadDispatcher(TestFSMRef缺省使用它) 并且没有其它线程参与的情况下是合适的, 但如果你实际上需要处理定时器事件可能会导致意外的情形,因为它们是在Scheduler线程中执行的。

测试Actor的行为

当消息派发器调用actor中的逻辑来处理消息时,它实际上是对当前注册到actor的行为进行了应用。行为的初始值是代码中声明的receive方法的返回值, 但可以通过对外部消息的响应调用becomeunbecome来改变这个行为。所有这些特性使得actor的行为测试起来不太容易。因此TestActorRef提供了一种不同的操作方式来对Actor的测试进行补充: 它支持所有正常的ActorRef中的操作。发往actor的消息在当前线程中同步处理,应答像正常一样回送。这个技巧来自下面所介绍的CallingThreadDispatcher; 这个派发器被隐式地用于所有实例化为TestActorRef的actor。

  1. import akka.testkit.TestActorRef
  2. import scala.concurrent.duration._
  3. import scala.concurrent.Await
  4. import akka.pattern.ask
  5. val actorRef = TestActorRef(new MyActor)
  6. // hypothetical message stimulating a ’42’ answer
  7. val future = actorRef ? Say42
  8. val Success(result: Int) = future.value.get
  9. result should be(42)

由于TestActorRefLocalActorRef 的子类,只不过多加了一些特殊功能,所以像监管和重启也能正常工作,但是要知道只要所有的相关的actors都使用CallingThreadDispatcher那么所有的执行过程都是严格同步的。 一旦你增加了一些元素,其中包括比较复杂的定时任务,你就离开了单元测试的范畴,因为你必须要重新将异步性纳入考虑范围(在大多数情况下问题在于要等待希望的结果有机会发生)。

另一个在单线程测试中被覆盖的特殊点是receiveTimeout, 由于包含了它会产生异步的ReceiveTimeout消息队列, 因此与同步约定矛盾。

综上所述: TestActorRef 重写了两个成员: 它设置派发器为CallingThreadDispatcher.global,设置receiveTimeout为None

介于两者之间方法

如果你希望测试actor的行为,包括热替换,但是不包括消息派发器,也不希望TestActorRef吞掉所有抛出的异常, 那么为你准备了另一种模式: 只要使用TestActorRefreceive方法 , 这将会把消息转发给内部的actor:

  1. import akka.testkit.TestActorRef
  2. val actorRef = TestActorRef(new Actor {
  3. def receive = {
  4. case "hello" => throw new IllegalArgumentException("boom")
  5. }
  6. })
  7. intercept[IllegalArgumentException] { actorRef.receive("hello") }

使用场景

当然你也可以根据自己的测试需求来混合使用TestActorRef的不同用法:

  • 一个常见的使用场景是在发送测试消息之前设置actor进入某个特定的内部状态
  • 另一个场景是发送了测试消息之后确认正确的内部状态转换

放心大胆地对各种可能性进行实验,如果你发现了有用的模式,快让Akka论坛知道它!常用操作也许能放进优美的DSL中。

3 用TestKit进行并发的集成测试

当你基本确定你的actor的业务逻辑是正确的, 下一个步骤就是确认它在目标环境中也能正确工作 (如果actor分别都比较简单,可能是因为他们使用了 FSM 模块, 这也可能是第一步)。关于环境的定义当然很大程度上由手上的问题和你打算测试的程度决定, 从功能/集成测试到完整的系统测试。 最简单的步骤包括测试流程(说明测试条件)、要测试的actor和接收应答的actor。大一些的系统将被测的actor替换成一组actor网络,将测试条件应用于不同的切入点,并整理将会从不同的输出位置发送的结果,但基本的原则仍然是测试由一个单独的流程来驱动。

TestKit类包含一组工具来简化这些常用的工作。

  1. import akka.actor.ActorSystem
  2. import akka.actor.Actor
  3. import akka.actor.Props
  4. import akka.testkit.{ TestActors, TestKit, ImplicitSender }
  5. import org.scalatest.WordSpecLike
  6. import org.scalatest.Matchers
  7. import org.scalatest.BeforeAndAfterAll
  8. class MySpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
  9. with WordSpecLike with Matchers with BeforeAndAfterAll {
  10. def this() = this(ActorSystem("MySpec"))
  11. override def afterAll {
  12. TestKit.shutdownActorSystem(system)
  13. }
  14. "An Echo actor" must {
  15. "send back messages unchanged" in {
  16. val echo = system.actorOf(TestActors.echoActorProps)
  17. echo ! "hello world"
  18. expectMsg("hello world")
  19. }
  20. } }

TestKit 中有一个名为testActor的actor作为将要被不同的 expectMsg...断言检查的消息的入口,下面会详细介绍这些断言。 当混入了ImplicitSender trait后 这个actor在从测试过程中派发消息时将被隐式地用作发送引用。testActor也可以被像平常一样发送给其它的actor,通常是订阅成为通知监听器。 有一堆检查方法, 如接收所有匹配某些条件的消息,接收固定的消息序列或类,在某段时间内收不到消息等。

记得在测试完成后关闭actor系统 (即使是在测试失败的情况下) 以保证所有的actor-包括测试actor-被停止。

内置断言

上面提到的expectMsg并不是唯一的对收到的消息进行断言的方法。 以下是完整的列表:

  • expectMsg[T](d: Duration, msg: T): T

给定的消息必须在指定的时间内到达;返回此消息

  • expectMsgPF[T](d: Duration)(pf: PartialFunction[Any, T]): T

在给定的时间内,必须有消息到达,必须为这类消息定义偏函数;返回应用收到消息的偏函数的结果。可以不指定时间段(这时需要一对空的括号),这时使用最深层的within块中的期限。

  • expectMsgClass[T](d: Duration, c: Class[T]): T

在指定的时间内必须接收到Class类型的对象;返回收到的对象。注意它的类型匹配是子类兼容的;如果需要类型是相等的,参考使用单个class参数的expectMsgAllClassOf

  • expectMsgType[T: Manifest](d: Duration)

在指定的时间内必须收到指定类型 (擦除后)的对象; 返回收到的对象。这个方法基本上与expectMsgClass(manifest[T].erasure)等价。

  • expectMsgAnyOf[T](d: Duration, obj: T*): T

在指定的时间内必须收到一个对象,而且此对象必须与传入的对象引用中的一个相等( 用 == 进行比较); 返回收到的对象。

  • expectMsgAnyClassOf[T](d: Duration, obj: Class[_ <: T]*): T

在指定的时间内必须收到一个对象,它必须至少是指定的某Class对象的实例; 返回收到的对象。

  • expectMsgAllOf[T](d: Duration, obj: T*): Seq[T]

在指定时间内必须收到与指定的数组中相等数量的对象, 对每个收到的对象,必须至少有一个数组中的对象与它相等(用==进行比较)。 返回收到的整个对象集合。

  • expectMsgAllClassOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]

在指定时间内必须收到与指定的Class数组中相等数量的对象,对数组中的每一个Class, 必须至少有一个对象的Class与它相等(用 ==进行比较) (这不是子类兼容的类型检查)。 返回收到的整个对象集合。

  • expectMsgAllConformingOf[T](d: Duration, c: Class[_ <: T]*): Seq[T]

在指定时间内必须收到与指定的 Class 数组中相等数量的对象,对数组中的每个Class 必须至少有一个对象是这个Class的实例。返回收到的整个对象集合。

  • expectNoMsg(d: Duration)

在指定时间内不能收到消息。如果在这个方法被调用之前已经收到了消息,并且没有用其它的方法将这些消息从队列中删除,这个断言也会失败。

  • receiveN(n: Int, d: Duration): Seq[AnyRef]

指定的时间内必须收到n 条消息; 返回收到的消息。

  • fishForMessage(max: Duration, hint: String)(pf: PartialFunction[Any, Boolean]): Any

只要时间没有用完,并且偏函数匹配消息并返回false就一直接收消息. 返回使偏函数返回true 的消息或抛出异常, 异常中会提供一些提示以供debug使用。

除了接收消息的断言,还有一些方法来对消息流提供帮助:

  • receiveOne(d: Duration): AnyRef

尝试等待给定的时间以等待收到一个消息,如果失败则返回null 。 如果给定的 Duration 是0,这一调用是非阻塞的(轮询模式)。

  • receiveWhile[T](max: Duration, idle: Duration, messages: Int)(pf: PartialFunction[Any, T]): Seq[T]

只要满足

  1. - 消息与偏函数匹配
  2. - 指定的时间还没用完
  3. - 在空闲的时间内收到了下一条消息
  4. - 消息数量还没有到上限

就收集消息并返回收集到的所有消息。 时间上限缺省值是最深层的within块中剩余的时间,空闲时间缺省为无限 (也就是禁止空闲超时功能)。 期望的消息数量缺省值为Int.MaxValue, 也就是不作这个限制。

  • awaitCond(p: => Boolean, max: Duration, interval: Duration)

每经过interval时间就检查一下给定的条件,直到它返回true或者max时间用完了. 时间间隔缺省为100 ms而最大值缺省为最深层的within块中的剩余时间。

  • ignoreMsg(pf: PartialFunction[AnyRef, Boolean]) ignoreNoMsg

内部的testActor 包含一个偏函数用来忽略消息: 它只会将与偏函数不匹配或使函数返回false的消息放进队列。 这个函数可以用上面的方法进行设置和重设; 每一次调用都会覆盖之前的函数,而不会迭加。

这个功能在你想到忽略正常的消息而只对你指定的一些消息感兴趣时(如测试日志系统时)比较有用。

预料的异常

由于集成测试无法进入参与测试的actor的内部处理流程, 无法直接确认预料中的异常。为了做这件事,只能使用日志系统:将普通的事件处理器替换成TestEventListener然后使用EventFilter可以对日志信息,包括由于异常产生的日志,做断言:

  1. import akka.testkit.EventFilter
  2. import com.typesafe.config.ConfigFactory
  3. implicit val system = ActorSystem("testsystem", ConfigFactory.parseString("""
  4. akka.loggers = ["akka.testkit.TestEventListener"]
  5. """))
  6. try {
  7. val actor = system.actorOf(Props.empty)
  8. EventFilter[ActorKilledException](occurrences = 1) intercept {
  9. actor ! Kill }
  10. } finally {
  11. shutdown(system)
  12. }

如果occurrences指定了大小,那么intercept将会阻塞直到接收到匹配数量的消息或者超过akka.test.filter-leeway中配置的时间。超时时,测试将会失败。

对定时进行断言

功能测试的另一个重要部分与定时器有关:有些事件不能立即发生(如定时器), 另外一些需要在时间期限内发生。 因此所有的进行检查的方法都接受一个时间上限,不论是正面还是负面的结果都应该在这个时间之前获得。时间下限需要在这个检测方法之外进行检查,我们有一个新的工具来管理时间期限:

  1. within([min, ]max) {
  2. ...
  3. }

within所带的代码块必须在一个介于minmax之间的Duration之前完成, 其中min缺省值为0。将max参数与块的启动时间相加得到的时间期限在所有检查方法块内部都可以隐式得获得,如果你没有指定max值,它会从最深层的within块继承这个值。

应注意如果代码块的最后一条接收消息断言是expectNoMsgreceiveWhile, 对within的最终检查将被跳过,以避免由于唤醒延迟导致的假正值(false positives)。这意味着虽然其中每一个独立的断言仍然使用时间上限,整个代码块在这种情况下会有长度随机的延迟。

  1. import akka.actor.Props
  2. import akka.util.duration._
  3. val worker = system.actorOf(Props[Worker])
  4. within(200 millis) {
  5. worker ! "some work"
  6. expectMsg("some result")
  7. expectNoMsg // 在剩下的200ms中会阻塞
  8. Thread.sleep(300) // 不会使当前代码块失败
  9. }

所有的时间都以 System.nanoTime为单位, 它们描述的是墙上时间,而非CPU时间。

Ray Roestenburg 写了一篇关于使用 TestKit 的好文: http://roestenburg.agilesquad.com/2011/02/unit-testing-akka-actors-with-testkit_12.html. 完整的示例也可以在第一章找到。

考虑很慢的测试系统

你在跑得飞快的笔记本上使用的超时设置在高负载的Jenkins(或类似的)服务器上通常都会导致错误的测试失败。为了考虑这种情况,所有的时间上限都在内部乘以一个系数,这个系数来自配置文件中的akka.test.timefactor, 缺省值为 1。

你也可以用akka.testkit包对象中的隐式转换来将同样的系数来作用于其它的时限,为Duration添加dilated函数。

  1. import scala.concurrent.duration._
  2. import akka.testkit._
  3. 10.milliseconds.dilated

用隐式的ActorRef解决冲突

如果你希望在基于TestKit的测试的消息发送者为testActor, 只需要为你的测试代码混入ImplicitSender

  1. class MySpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
  2. with WordSpecLike with Matchers with BeforeAndAfterAll {

使用多个探针 Actors

如果待测的actor会发送多个消息到不同的目标,在使用TestKit时可能会难以分辨到达testActor的消息流。 另一种方法是用它来创建简单的探针actor,将它们插入到消息流中。 为了让这种方法更加强大和方便,我们提供了一个具体实现,称为TestProbe。 它的功能可以用下面的小例子说明:

  1. import scala.concurrent.duration._
  2. import akka.actor._
  3. import scala.concurrent.Future
  4. class MyDoubleEcho extends Actor {
  5. var dest1: ActorRef = _
  6. var dest2: ActorRef = _
  7. def receive = {
  8. case (d1: ActorRef, d2: ActorRef) =>
  9. dest1 = d1
  10. dest2 = d2
  11. case x =>
  12. dest1 ! x
  13. dest2 ! x
  14. } }
  15. val probe1 = TestProbe()
  16. val probe2 = TestProbe()
  17. val actor = system.actorOf(Props[MyDoubleEcho])
  18. actor ! ((probe1.ref, probe2.ref))
  19. actor ! "hello"
  20. probe1.expectMsg(500 millis, "hello")
  21. probe2.expectMsg(500 millis, "hello")

这里我们用MyDoubleEcho来仿真一个待测系统, 它会将输入镜像为两个输出。关联两个测试探针来进行(最简单)行为的确认。 还有一个例子是两个actor A,B, A 发送消息给 B。 为了确认这个消息流,可以插入TestProbe作为A的目标, 使用转发功能或下文中的自动导向功能在测试上下文中包含真实的B.

还可以为探针配备自定义的断言来使测试代码更简洁清晰:

  1. case class Update(id: Int, value: String)
  2. val probe = new TestProbe(system) {
  3. def expectUpdate(x: Int) = {
  4. expectMsgPF() {
  5. case Update(id, _) if id == x => true
  6. }
  7. sender() ! "ACK"
  8. }
  9. }

这里你拥有完全的灵活性,可以将TestKit 提供的工具与你自己的检测代码混合和匹配,并为它取一个有意义的名字。在实际中你的代码可能比上面的示例要复杂;要充分利用工具!

对探针收到的消息进行应答

探针在可能的条件下,会记录通讯通道以便进行应答:

  1. val probe = TestProbe()
  2. val future = probe.ref ? "hello"
  3. probe.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
  4. probe.reply("world")
  5. assert(future.isCompleted && future.value == Some(Success("world")))

对探针收到的消息进行转发

假定一个象征性的actor网络中某目标 actor dest 从 actor source收到一条消息。 如果你使消息先发往TestProbe probe, 你可以在保持网络功能的同时对消息流的容量和时限进行断言:

  1. class Source(target: ActorRef) extends Actor {
  2. def receive = {
  3. case "start" => target ! "work"
  4. }
  5. }
  6. class Destination extends Actor {
  7. def receive = {
  8. case x => // Do something..
  9. }
  10. }
  11. val probe = TestProbe()
  12. val source = system.actorOf(Props(classOf[Source], probe.ref))
  13. val dest = system.actorOf(Props[Destination])
  14. source ! "start"
  15. probe.expectMsg("work")
  16. probe.forward(dest)

dest actor 将收到同样的消息,就象没有插入探针一样。

自动导向

将收到的消息放进队列以便以后处理,这种方法不错,但要保持测试运行并对其运行过程进行跟踪,你也可以为参与测试的探针(事实上是任何 TestKit)安装一个AutoPilot(自动导向)。 自动导向在消息进入检查队列之前启动。 以下代码可以用来转发消息, 例如A --> Probe --> B, 只要满足一定的协约。

  1. val probe = TestProbe()
  2. probe.setAutoPilot(new TestActor.AutoPilot {
  3. def run(sender: ActorRef, msg: Any): TestActor.AutoPilot =
  4. msg match {
  5. case "stop" => TestActor.NoAutoPilot
  6. case x => testActor.tell(x, sender); TestActor.KeepRunning }
  7. })

run 方法必须返回包含在Option中的auto-pilot供下一条消息使用, 设置成None表示终止自动导向。

小心定时器断言

在使用测试探针时,within 块的行为可能会不那么直观:你需要记住上文所描述的期限仅对每一个探针的局部作用域有效。因此,探针 不会响应别的探针的期限,也不响应包含它的TestKit实例的期限:

  1. val probe = TestProbe()
  2. within(1 second) {
  3. probe.expectMsg("hello")
  4. }

这里,expectMsg调用将会使用缺省的timeout。

测试父子关系

一个actor的父actor创建该actor。这造成了两者直接的耦合,使其不可以直接测试。明显地,有三种方法可以提高父子关系的可测性。

  • 当创建一个子actor时,传递一个直接的引用到它的父actor。
  • 当创建一个父actor时,告诉父actor如何创建它的子actor。
  • 当测试时,创建一个虚构(fabricated)的父actor。

例如,你想测试的代码结构如下所示:

  1. class Parent extends Actor {
  2. val child = context.actorOf(Props[Child], "child")
  3. var ponged = false
  4. def receive = {
  5. case "pingit" => child ! "ping"
  6. case "pong" => ponged = true
  7. } }
  8. class Child extends Actor {
  9. def receive = {
  10. case "ping" => context.parent ! "pong"
  11. }
  12. }

使用依赖注入

第一个选项是避免使用context.parent函数。用一个自定义的父actor创建子actor,这个自定义的父actor通过传递一个直接的引用到它的父actor。

  1. class DependentChild(parent: ActorRef) extends Actor {
  2. def receive = {
  3. case "ping" => parent ! "pong"
  4. }
  5. }

另一种方法是,你可以告诉父节点怎样创建它的子节点。有两种方式可以做到这件事:给它一个Props对象或者给它一个关注创建子actor的函数。

  1. class DependentParent(childProps: Props) extends Actor {
  2. val child = context.actorOf(childProps, "child")
  3. var ponged = false
  4. def receive = {
  5. case "pingit" => child ! "ping"
  6. case "pong" => ponged = true
  7. } }
  8. class GenericDependentParent(childMaker: ActorRefFactory => ActorRef) extends Actor {
  9. val child = childMaker(context)
  10. var ponged = false
  11. def receive = {
  12. case "pingit" => child ! "ping"
  13. case "pong" => ponged = true
  14. } }

创建Props是直接的,但是创建函数需要像如下的测试代码:

  1. val maker = (_: ActorRefFactory) => probe.ref
  2. val parent = system.actorOf(Props(classOf[GenericDependentParent], maker))

在你的应用程序中,可能如下面这样:

  1. val maker = (f: ActorRefFactory) => f.actorOf(Props[Child])
  2. val parent = system.actorOf(Props(classOf[GenericDependentParent], maker))

使用一个虚拟的父actor

如果你不愿改变一个父actor或者子actor的构造器,你可以在你的测试中创建一个虚拟父actor。但是,这不允许你独立测试父actor。

  1. "A fabricated parent" should {
  2. "test its child responses" in {
  3. val proxy = TestProbe()
  4. val parent = system.actorOf(Props(new Actor {
  5. val child = context.actorOf(Props[Child], "child")
  6. def receive = {
  7. case x if sender == child => proxy.ref forward x
  8. case x =>child forward x
  9. }
  10. }))
  11. proxy.send(parent, "ping")
  12. proxy.expectMsg("pong")
  13. }
  14. }

4 CallingThreadDispatcher

如上文所述,CallingThreadDispatcher在单元测试中非常重要, 但最初它出现是为了在出错的时候能够生成连续的stacktrace。 由于这个特殊的派发器将任何消息直接运行在当前线程中,所以消息处理的完整历史信息在调用堆栈上有记录,只要所有的actor都是在这个派发器上运行。

如何使用它

只要象平常一样设置派发器:

  1. import akka.testkit.CallingThreadDispatcher
  2. val ref = system.actorOf(Props[MyActor].withDispatcher(CallingThreadDispatcher.Id))

它是如何运作的

在被调用时,CallingThreadDispatcher会检查接收消息的actor是否已经在当前线程中了。 这种情况的最简单的例子是actor向自己发送消息。 这时,不能马上对它进行处理,因为这违背了actor模型, 于是这个消息被放进队列,直到actor的当前消息被处理完毕;这样,新消息会被在调用的线程上处理,只是在actor完成其先前的工作之后。 在别的情况下,消息会在当前线程中立即得到处理。 通过这个派发器规划的Future也会立即执行。

这种工作方式使CallingThreadDispatcher像一个为永远不会因为外部事件而阻塞的actor设计的通用派发器。

在有多个线程的情况下,有可能同时有两个使用这个派发器的actor在不同线程中收到消息,它们会竞争actor锁,竞争失败的那个必须等待。 这样我们保持了actor模型,但由于使用了受限的调度我们损失了一些并发性。从这个意义上说,它等同于使用传统的基于互斥的并发。

另一个困难是正确地处理挂起和继续: 当actor被挂起时,后续的消息将被放进一个thread-local的队列中(和正常情况下使用的队列是同一个)。 但是对resume的调用, 是由一个特定的线程执行的,系统中所有其它的线程可能并没有运行这个特定的actor,这会导致thread-local队列无法被它们的本地线程清空。于是,调用resume的线程会从所有线程收集所有当前在队列中的消息到自己的队列中,然后进行处理。

限制

如果一个actor发送完消息后由于某种原因(通常是被调用actor所影响)阻塞了, 如果使用这个派发器时显然将导致死锁。这在使用基于CountDownLatch 同步的actor测试中很常见:

  1. val latch = new CountDownLatch(1)
  2. actor ! startWorkAfter(latch) // actor will call latch.await() before proceeding
  3. doSomeSetupStuff()
  4. latch.countDown()

这个例子将无限挂起,消息处理到达第二行永远到不了第四行,而只有在第四行才能在一个普通的派发器上取消它的阻塞。

因此,记住CallingThreadDispatcher不是普通派发器的通用用途的替换。另一方面,在它上面允许你的actor用于测试,它是很有用的。因为如果它在机率特别高的条件下都能不死锁,那么在生产环境中也不会。

上面这句话很遗憾并不是一个有力的保证,因为你的代码运行在不同的派发器上时可能直接或间接地改变它的行为。 如果你想要寻找帮助你debug死锁的工具, CallingThreadDispatcher在有些错误场合下可能会有用,但要记住它既可能给出错误的正面结果也可能给出错误的负面结果。

好处

总结下来,CallingThreadDispatcher提供了如下特征。

  • 确定地执行单线程测试,同时保持几乎所有的actor语义
  • 在异常stacktrace中记录从失败点开始的完整的消息处理历史
  • 排除某些类的死锁情景

5 跟踪Actor调用

到目前为止所有的测试工具都针对系统的行为构造断言。 当测试失败时,通常是由你来查找原因,进行修改并进行下一轮测试。这个过程既有debugger支持,又有日志支持,又Akka工具箱提供以下日志选项:

  • 对Actor实例中抛出的异常记录日志

相比其它的日志机制,这一条是永远打开的;它的日志级别是ERROR

  • 对某些actor的消息记录日志

这是通过在配置文件里添加设置项来打开设置项akka.actor.debug.receive ,它使得loggable语句作用于actor的receive函数:

  1. import akka.event.LoggingReceive
  2. def receive = LoggingReceive {
  3. case msg => // Do something ...
  4. }
  5. def otherState: Receive = LoggingReceive.withLabel("other") {
  6. case msg => // Do something else ...
  7. }
  • 如果在配置文件中没有给出上面的配置, 这个方法将直接移交给给定的Receive函数, 也就是说如果不打开,就没有运行时开销。

这个日志功能是与指定的局部标记绑定的,因为将其应用于所有的actor可能不是你所需要的, 如果被用于EventHandler监听器它还可能导致无限循环。

  1. - 对特殊的消息记录日志
  2. Actor会自动处理某些特殊消息,如 Kill, PoisonPill等等。打开对这些消息的跟踪只需要设置` akka.actor.debug.autoreceive`, 这对所有actor都有效。
  3. - actor生命周期记录日志
  4. Actor的创建、启动、重启、开始监控、停止监控和终止可以通过打开` akka.actor.debug.lifecycle `来跟踪; 这也是对所有 actor都有效的。

所有这些日志消息都记录在 DEBUG 级别. 总结一下, 你可以用以下配置打开对actor活动的完整日志:

  1. akka {
  2. loglevel = "DEBUG"
  3. actor {
  4. debug {
  5. receive = on
  6. autoreceive = on
  7. lifecycle = on
  8. } }
  9. }