容错

Actor系统 中所述,每一个actor是其子actor的监管者 , 而且每一个actor会定义一个处理错误的监管策略。这个策略制定以后不能修改,因为它集成为actor系统结构的一部分。

1 实际中的错误处理

首先我们来看一个例子,演示处理数据存储错误的一种方法,数据存储错误是真实应用中的典型错误类型。当然在实际的应用中这要依赖于当数据存储发生错误时能做些什么,在这个例子中,我们使用尽量重新连接的方法。

阅读以下源码。其中的注释解释了错误处理的各个片段以及为什么要加上它们。我们还强烈建议运行这个例子,因为根据日志输出来理解运行时都发生了什么会比较容易。

容错例子的图解

faulttolerancesample-normal-flow

上图展示了正常的消息流。

正常流程:

步骤 描述
1 Listener启动工作的过程。
2 Worker 通过定期向自己发送 Do 消息来安排工作
3、4、5 接收到DoWorker通知CounterService更新计数器值, 三次。Increment消息被转发给Counter, 它会更新自己的计数器变量并将当前值发给Storage
6、7 WorkerCounterService请求计数器的当前值并将结果回送给Listener

faulttolerancesample-failure-flow

上图展示了当发生数据存储失败时的过程。

失败流程:

步骤 描述
1 Storage 抛出 StorageException异常
2 CounterServiceStorage 的监管者, StorageException被抛出时它将 Storage 重启。
3,4,5,6 Storage 仍旧失败,又被重启
7 在5秒内三次失败和重启后 Storage 被它的监管者CounterService终止。
8 CounterService 同时观察着 Storage 并在Storage被终止时收到 Terminated消息
9,10,11 告诉 Counter 当前没有可用的 Storage
12 CounterService 计划给自己发一个Reconnect 消息
13,14 收到 Reconnect 消息后它创建一个新的 Storage
15,16 通知 Counter 使用新的 Storage

容错例子的完整源代码

  1. import akka.actor._
  2. import akka.actor.SupervisorStrategy._
  3. import scala.concurrent.duration._
  4. import akka.util.Timeout
  5. import akka.event.LoggingReceive
  6. import akka.pattern.{ ask, pipe }
  7. import com.typesafe.config.ConfigFactory
  8. //运行这个例子
  9. object FaultHandlingDocSample extends App {
  10. import Worker._
  11. val config = ConfigFactory.parseString("""
  12. akka.loglevel = DEBUG
  13. akka.actor.debug {
  14. receive = on
  15. lifecycle = on
  16. }
  17. """)
  18. val system = ActorSystem("FaultToleranceSample", config)
  19. val worker = system.actorOf(Props[Worker], name = "worker")
  20. val listener = system.actorOf(Props[Listener], name = "listener")
  21. // 开始工作并监听进展
  22. // 注意监听者被用作tell的sender,
  23. // i.e. 它会接收从worker发来的应答
  24. worker.tell(Start, sender = listener)
  25. }
  26. //监听worker的进展,当完成到足够的程度时关闭整个系统
  27. class Listener extends Actor with ActorLogging {
  28. import Worker._
  29. // 如果15秒没有任何进展说明服务不可用
  30. context.setReceiveTimeout(15 seconds)
  31. def receive = {
  32. case Progress(percent) =>
  33. log.info("Current progress: {} %", percent)
  34. if (percent >= 100.0) {
  35. log.info("That's all, shutting down")
  36. context.system.shutdown()
  37. }
  38. case ReceiveTimeout =>
  39. // 15秒没有进展,服务不可用
  40. log.error("Shutting down due to unavailable service")
  41. context.system.shutdown()
  42. }
  43. }
  44. object Worker {
  45. case object Start
  46. case object Do
  47. case class Progress(percent: Double)
  48. }
  49. // Worker接收到 `Start` 消息时开始处理工作.
  50. // 它持续地将当前``进展``通知 `Start` 消息的发送方
  51. // `Worker` 监管 `CounterService`.
  52. class Worker extends Actor with ActorLogging {
  53. import Worker._
  54. import CounterService._
  55. implicit val askTimeout = Timeout(5 seconds)
  56. // 如果 CounterService 子actor抛出 ServiceUnavailable异常则终止它
  57. override val supervisorStrategy = OneForOneStrategy() {
  58. case _: CounterService.ServiceUnavailable =>Stop
  59. }
  60. // 最初 Start 消息的发送方将持续地收到进展通知
  61. var progressListener: Option[ActorRef] = None
  62. val counterService = context.actorOf(Props[CounterService], name = "counter")
  63. val totalCount = 51
  64. def receive = LoggingReceive {
  65. case Start if progressListener.isEmpty =>
  66. progressListener = Some(sender)
  67. context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)
  68. case Do =>
  69. counterService ! Increment(1)
  70. counterService ! Increment(1)
  71. counterService ! Increment(1)
  72. // 将当前进展发送给最初的发送方
  73. counterService ? GetCurrentCount map {
  74. case CurrentCount(_, count) =>Progress(100.0 * count / totalCount)
  75. } pipeTo progressListener.get
  76. }
  77. }
  78. object CounterService {
  79. case class Increment(n: Int)
  80. case object GetCurrentCount
  81. case class CurrentCount(key: String, count: Long)
  82. class ServiceUnavailable(msg: String) extends RuntimeException(msg)
  83. private case object Reconnect
  84. }
  85. // 将从 `Increment` 消息中获取的值加到持久的计数器上。
  86. // 在被请求 `CurrentCount` 时将 `CurrentCount` 作为应答。
  87. // `CounterService` 监管 `Storage` 和 `Counter`。
  88. class CounterService extends Actor {
  89. import CounterService._
  90. import Counter._
  91. import Storage._
  92. // 在抛出 StorageException 时重启 Storage 子actor.
  93. // 如果5秒内有3次重启,它将被终止.
  94. override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) {
  95. case _: Storage.StorageException => Restart
  96. }
  97. val key = self.path.name
  98. var storage: Option[ActorRef] = None
  99. var counter: Option[ActorRef] = None
  100. var backlog = IndexedSeq.empty[(ActorRef, Any)]
  101. val MaxBacklog = 10000
  102. override def preStart() {
  103. initStorage()
  104. }
  105. //storage 子actor在失败时将被重启,但是3次重启后如果仍失败,它将被终止。
  106. //这样比一直不断的失败要好
  107. //它被终止后我们将在一段延时后安排重新连接.
  108. //监视子actor,这样在它终止后我们能收到`Terminated` 消息.
  109. def initStorage() {
  110. storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
  111. // 告诉计数器(如果有的话),使用新的storage
  112. counter foreach { _ ! UseStorage(storage) }
  113. // 我们需要初始值来开始操作
  114. storage.get ! Get(key)
  115. }
  116. def receive = LoggingReceive {
  117. case Entry(k, v) if k == key && counter == None =>
  118. // 从Storage应答得到初始值, 现在我们可以创建计数器了
  119. val c = context.actorOf(Props(new Counter(key, v)))
  120. counter = Some(c)
  121. // 告诉计数器使用当前 storage
  122. c ! UseStorage(storage)
  123. // 并将缓存的积压消息发给计数器
  124. for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
  125. backlog = IndexedSeq.empty
  126. case msg @ Increment(n) => forwardOrPlaceInBacklog(msg)
  127. case msg @ GetCurrentCount => forwardOrPlaceInBacklog(msg)
  128. case Terminated(actorRef) if Some(actorRef) == storage =>
  129. // 3次重启后 storage 子actor被终止了.
  130. // 我们收到 Terminated 是因为我们监视了这个子actor.
  131. storage = None
  132. // 告诉计数器现在没有 storage 可用了
  133. counter foreach { _ ! UseStorage(None) }
  134. // 过一会尝试重新建立 storage
  135. context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
  136. case Reconnect =>
  137. // 在计划的延时后重新创建 storage
  138. initStorage()
  139. }
  140. def forwardOrPlaceInBacklog(msg: Any) {
  141. // 在开始委托给计数器之前我们需要从storage中获取初始值 .
  142. // 在那之前我们将消息放入积压缓存中,在计数器初始化完成后将这些消息发给它。
  143. counter match {
  144. case Some(c) => c forward msg
  145. case None =>
  146. if (backlog.size >= MaxBacklog)
  147. throw new ServiceUnavailable("CounterService not available, lack of initial value")
  148. backlog = backlog :+ (sender, msg)
  149. }
  150. }
  151. }
  152. object Counter {
  153. case class UseStorage(storage: Option[ActorRef])
  154. }
  155. //如果当前有可用的 storage的话, 计数器变量将当前的值发送给`Storage`
  156. class Counter(key: String, initialValue: Long) extends Actor {
  157. import Counter._
  158. import CounterService._
  159. import Storage._
  160. var count = initialValue
  161. var storage: Option[ActorRef] = None
  162. def receive = LoggingReceive {
  163. case UseStorage(s) =>
  164. storage = s
  165. storeCount()
  166. case Increment(n) =>
  167. count += n
  168. storeCount()
  169. case GetCurrentCount =>
  170. sender() ! CurrentCount(key, count)
  171. }
  172. def storeCount() {
  173. // 委托危险的工作,来保护我们宝贵的状态.
  174. // 没有 storage 我们也能继续工作.
  175. storage foreach { _ ! Store(Entry(key, count)) }
  176. }
  177. }
  178. object Storage {
  179. case class Store(entry: Entry)
  180. case class Get(key: String)
  181. case class Entry(key: String, value: Long)
  182. class StorageException(msg: String) extends RuntimeException(msg)
  183. }
  184. // 收到 `Store` 消息时将键/值对保存到持久storage中.
  185. // 收到 `Get` 消息时以当前值作为应答 .
  186. // 如果背后的数据存储出问题了,会抛出 StorageException.
  187. class Storage extends Actor {
  188. import Storage._
  189. val db = DummyDB
  190. def receive = LoggingReceive {
  191. case Store(Entry(key, count)) => db.save(key, count)
  192. case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
  193. } }
  194. object DummyDB {
  195. import Storage.StorageException
  196. private var db = Map[String, Long]()
  197. @throws(classOf[StorageException])
  198. def save(key: String, value: Long): Unit = synchronized {
  199. if (11 <= value && value <= 14)
  200. throw new StorageException("Simulated store failure " + value)
  201. db += (key -> value)
  202. }
  203. @throws(classOf[StorageException])
  204. def load(key: String): Option[Long] = synchronized {
  205. db.get(key)
  206. }
  207. }

2 创建一个监管策略

以下更加深入地讲解错误处理的机制和可选的方法。

为了演示我们假设有这样的策略:

  1. import akka.actor.OneForOneStrategy
  2. import akka.actor.SupervisorStrategy._
  3. import scala.concurrent.duration._
  4. override val supervisorStrategy =
  5. OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
  6. case _: ArithmeticException => Resume
  7. case _: NullPointerException => Restart
  8. case _: IllegalArgumentException => Stop
  9. case _: Exception => Escalate
  10. }

我选择了一种非常著名的异常类型来演示监管和监控 中描述的错误处理方式的使用. 首先,它是一个一对一的策略,意思是每一个子actor会被单独处理(多对一的策略与之相似,唯一的差别在于任何决策都应用于监管者的所有子actor,而不仅仅是出错的那一个)。 这里我们对重启的频率作了限制,最多每分钟能进行 10 次重启; 所有这样的设置都可以被忽略,也就是说,相应的限制并不被采用, 使设置重启频率的绝对上限值或让重启无限进行成为可能。

构成主体的 match 语句的类型是Decider, 它是一个PartialFunction[Throwable, Directive]。这一部分将子actor的失败类型映射到相应的指令上。

注意:如果策略定义在监控actor的内部(相反的是在一个组合对象的内部),它的Decider可以线程安全的访问actor的所有内部状态,包括获取当前失败子actor的一个引用

缺省的监管机制

如果定义的监管机制没有覆盖抛出的异常,将使用上溯(Escalate)机制。

如果某个actor没有定义监管机制,下列异常将被缺省地处理:

  • ActorInitializationException将终止出错的子actor
  • ActorKilledException 将终止出错的子 actor
  • Exception 将重启出错的子 actor
  • 其它的Throwable将被上溯传给父actor

如果异常一直被上溯到根监管者,在那儿也会用上述缺省方式进行处理。

你可以合并你自己的策略到默认策略中去。

  1. import akka.actor.OneForOneStrategy
  2. import akka.actor.SupervisorStrategy._
  3. import scala.concurrent.duration._
  4. override val supervisorStrategy =
  5. OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
  6. case _: ArithmeticException => Resume
  7. case t =>
  8. super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
  9. }

3 测试应用程序

以下部分展示了实际中不同的指令的效果,为此我们需要创建一个测试环境。首先我们需要一个合适的监管者:

  1. import akka.actor.Actor
  2. class Supervisor extends Actor {
  3. import akka.actor.OneForOneStrategy
  4. import akka.actor.SupervisorStrategy._
  5. import scala.concurrent.duration._
  6. override val supervisorStrategy =
  7. OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
  8. case _: ArithmeticException => Resume
  9. case _: NullPointerException => Restart
  10. case _: IllegalArgumentException => Stop
  11. case _: Exception => Escalate
  12. }
  13. def receive = {
  14. case p: Props => sender() ! context.actorOf(p)
  15. } }

该监管者将用来创建一个我们用来做试验的子actor:

  1. import akka.actor.Actor
  2. class Child extends Actor {
  3. var state = 0
  4. def receive = {
  5. case ex: Exception => throw ex
  6. case x: Int => state = x
  7. case "get" => sender() ! state
  8. }
  9. }

这个测试可以用测试Actor系统中的工具来进行简化, 比如AkkaSpecTestKit with WordSpec with MustMatchers的混合。

  1. import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter }
  2. import akka.actor.{ ActorRef, Props, Terminated }
  3. class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
  4. "A supervisor" must {
  5. "apply the chosen strategy for its child" in {
  6. // code here
  7. } }
  8. }

现在我们来创建 actor:

  1. val supervisor = system.actorOf(Props[Supervisor], "supervisor")
  2. supervisor ! Props[Child]
  3. val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor

第一个测试是为了演示 Resume 指令, 我们试着将actor设为非初始状态然后让它出错:

  1. child ! 42 // set state to 42
  2. child ! "get"
  3. expectMsg(42)
  4. child ! new ArithmeticException // crash it
  5. child ! "get"
  6. expectMsg(42)

可以看到错误处理指令完后仍能得到42的值. 现在如果我们将错误换成更严重的 NullPointerException, 情况就不同了:

  1. child ! new NullPointerException // crash it harder
  2. child ! "g
  3. expectMsg(0)

而最后当致命的 IllegalArgumentException 发生时子actor将被其监管者终止:

  1. watch(child) // have testActor watch “child”
  2. child ! new IllegalArgumentException // break it
  3. expectMsgPF() { case Terminated(‘child‘) => () }

到目前为止监管者完全没有被子actor的错误所影响, 因为指令集确实处理了这些错误。而对于 Exception, 就不是这么回事了, 监管者会将失败上溯传递。

  1. supervisor ! Props[Child] // create new child
  2. val child2 = expectMsgType[ActorRef]
  3. watch(child2)
  4. child2 ! "get" // verify it is alive
  5. expectMsg(0)
  6. child2 ! new Exception("CRASH") // escalate failure
  7. expectMsgPF() {
  8. case t @ Terminated(‘child2‘) if t.existenceConfirmed => ()
  9. }

监管者自己是被ActorSystem的顶级actor所监管的。顶级actor的缺省策略是对所有的Exception情况 (注意ActorInitializationExceptionActorKilledException是例外)进行重启. 由于缺省的重启指令会杀死所有的子actor,我们知道我们可怜的子actor最终无法从这个失败中幸免。

如果这不是我们希望的行为 (这取决于实际用例), 我们需要使用一个不同的监管者来覆盖这个行为。

  1. class Supervisor2 extends Actor {
  2. import akka.actor.OneForOneStrategy
  3. import akka.actor.SupervisorStrategy._
  4. import scala.concurrent.duration._
  5. override val supervisorStrategy =
  6. OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
  7. case _: ArithmeticException => Resume
  8. case _: NullPointerException => Restart
  9. case _: IllegalArgumentException => Stop
  10. case _: Exception => Escalate
  11. }
  12. def receive = {
  13. case p: Props => sender() ! context.actorOf(p)
  14. }
  15. // override default to kill all children during restart
  16. override def preRestart(cause: Throwable, msg: Option[Any]) {}
  17. }

在这个父actor之下,子actor在上溯的重启中得以幸免,如以下最后的测试:

  1. val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")
  2. supervisor2 ! Props[Child]
  3. val child3 = expectMsgType[ActorRef]
  4. child3 ! 23
  5. child3 ! "get"
  6. expectMsg(23)
  7. child3 ! new Exception("CRASH")
  8. child3 ! "get"
  9. expectMsg(0)