AKKA 2.3.6 Scala 文档

容错示例

AKKA 2.3.6 Scala 文档 - 图1

以上图示了正常的消息流。

正常流程:

步骤 描述
1 过程Listener启动工作。
2 Worker通过定期向自己发送Do消息来计划工作
3, 4, 5 接收到DoWorker通知CounterService更新计数器值,三次。Increment消息被转发给Counter,它会更新自己的计数器变量并将当前值发给Storage
6, 7 WorkerCounterService请求计数器的当前值并将结果回送给Listener

AKKA 2.3.6 Scala 文档 - 图2

以上图示了当发生数据库失败时的过程

失败流程:

步骤 描述
1 Storage抛出 StorageException
2 CounterServiceStorage的监管者,StorageException被抛出时它将重启Storage
3, 4, 5, 6 Storage仍旧失败,又被重启.
7 在5秒内三次失败和重启后Storage被它的监管者,即CounterService终止。
8 CounterService同时监视着Storage并在Storage被终止时收到Terminated消息…
9, 10, 11 告诉Counter当前没有可用的Storage
12 CounterService计划一个Reconnect消息发给自己。
13, 14 收到Reconnect消息后它创建一个新的Storage
15, 16 并通知Counter使用新的Storage
容错示例完整源代码
  1. import akka.actor._
  2. import akka.actor.SupervisorStrategy._
  3. import scala.concurrent.duration._
  4. import akka.util.Timeout
  5. import akka.event.LoggingReceive
  6. import akka.pattern.{ ask, pipe }
  7. import com.typesafe.config.ConfigFactory
  8. /**
  9. * Runs the sample
  10. */
  11. object FaultHandlingDocSample extends App {
  12. import Worker._
  13. val config = ConfigFactory.parseString("""
  14. akka.loglevel = "DEBUG"
  15. akka.actor.debug {
  16. receive = on
  17. lifecycle = on
  18. }
  19. """)
  20. val system = ActorSystem("FaultToleranceSample", config)
  21. val worker = system.actorOf(Props[Worker], name = "worker")
  22. val listener = system.actorOf(Props[Listener], name = "listener")
  23. // start the work and listen on progress
  24. // note that the listener is used as sender of the tell,
  25. // i.e. it will receive replies from the worker
  26. worker.tell(Start, sender = listener)
  27. }
  28. /**
  29. * Listens on progress from the worker and shuts down the system when enough
  30. * work has been done.
  31. */
  32. class Listener extends Actor with ActorLogging {
  33. import Worker._
  34. // If we don't get any progress within 15 seconds then the service is unavailable
  35. context.setReceiveTimeout(15 seconds)
  36. def receive = {
  37. case Progress(percent) =>
  38. log.info("Current progress: {} %", percent)
  39. if (percent >= 100.0) {
  40. log.info("That's all, shutting down")
  41. context.system.shutdown()
  42. }
  43. case ReceiveTimeout =>
  44. // No progress within 15 seconds, ServiceUnavailable
  45. log.error("Shutting down due to unavailable service")
  46. context.system.shutdown()
  47. }
  48. }
  49. object Worker {
  50. case object Start
  51. case object Do
  52. case class Progress(percent: Double)
  53. }
  54. /**
  55. * Worker performs some work when it receives the `Start` message.
  56. * It will continuously notify the sender of the `Start` message
  57. * of current ``Progress``. The `Worker` supervise the `CounterService`.
  58. */
  59. class Worker extends Actor with ActorLogging {
  60. import Worker._
  61. import CounterService._
  62. implicit val askTimeout = Timeout(5 seconds)
  63. // Stop the CounterService child if it throws ServiceUnavailable
  64. override val supervisorStrategy = OneForOneStrategy() {
  65. case _: CounterService.ServiceUnavailable => Stop
  66. }
  67. // The sender of the initial Start message will continuously be notified
  68. // about progress
  69. var progressListener: Option[ActorRef] = None
  70. val counterService = context.actorOf(Props[CounterService], name = "counter")
  71. val totalCount = 51
  72. import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
  73. def receive = LoggingReceive {
  74. case Start if progressListener.isEmpty =>
  75. progressListener = Some(sender)
  76. context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)
  77. case Do =>
  78. counterService ! Increment(1)
  79. counterService ! Increment(1)
  80. counterService ! Increment(1)
  81. // Send current progress to the initial sender
  82. counterService ? GetCurrentCount map {
  83. case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
  84. } pipeTo progressListener.get
  85. }
  86. }
  87. object CounterService {
  88. case class Increment(n: Int)
  89. case object GetCurrentCount
  90. case class CurrentCount(key: String, count: Long)
  91. class ServiceUnavailable(msg: String) extends RuntimeException(msg)
  92. private case object Reconnect
  93. }
  94. /**
  95. * Adds the value received in `Increment` message to a persistent
  96. * counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
  97. * `CounterService` supervise `Storage` and `Counter`.
  98. */
  99. class CounterService extends Actor {
  100. import CounterService._
  101. import Counter._
  102. import Storage._
  103. // Restart the storage child when StorageException is thrown.
  104. // After 3 restarts within 5 seconds it will be stopped.
  105. override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3,
  106. withinTimeRange = 5 seconds) {
  107. case _: Storage.StorageException => Restart
  108. }
  109. val key = self.path.name
  110. var storage: Option[ActorRef] = None
  111. var counter: Option[ActorRef] = None
  112. var backlog = IndexedSeq.empty[(ActorRef, Any)]
  113. val MaxBacklog = 10000
  114. import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
  115. override def preStart() {
  116. initStorage()
  117. }
  118. /**
  119. * The child storage is restarted in case of failure, but after 3 restarts,
  120. * and still failing it will be stopped. Better to back-off than continuously
  121. * failing. When it has been stopped we will schedule a Reconnect after a delay.
  122. * Watch the child so we receive Terminated message when it has been terminated.
  123. */
  124. def initStorage() {
  125. storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
  126. // Tell the counter, if any, to use the new storage
  127. counter foreach { _ ! UseStorage(storage) }
  128. // We need the initial value to be able to operate
  129. storage.get ! Get(key)
  130. }
  131. def receive = LoggingReceive {
  132. case Entry(k, v) if k == key && counter == None =>
  133. // Reply from Storage of the initial value, now we can create the Counter
  134. val c = context.actorOf(Props(classOf[Counter], key, v))
  135. counter = Some(c)
  136. // Tell the counter to use current storage
  137. c ! UseStorage(storage)
  138. // and send the buffered backlog to the counter
  139. for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
  140. backlog = IndexedSeq.empty
  141. case msg @ Increment(n) => forwardOrPlaceInBacklog(msg)
  142. case msg @ GetCurrentCount => forwardOrPlaceInBacklog(msg)
  143. case Terminated(actorRef) if Some(actorRef) == storage =>
  144. // After 3 restarts the storage child is stopped.
  145. // We receive Terminated because we watch the child, see initStorage.
  146. storage = None
  147. // Tell the counter that there is no storage for the moment
  148. counter foreach { _ ! UseStorage(None) }
  149. // Try to re-establish storage after while
  150. context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
  151. case Reconnect =>
  152. // Re-establish storage after the scheduled delay
  153. initStorage()
  154. }
  155. def forwardOrPlaceInBacklog(msg: Any) {
  156. // We need the initial value from storage before we can start delegate to
  157. // the counter. Before that we place the messages in a backlog, to be sent
  158. // to the counter when it is initialized.
  159. counter match {
  160. case Some(c) => c forward msg
  161. case None =>
  162. if (backlog.size >= MaxBacklog)
  163. throw new ServiceUnavailable(
  164. "CounterService not available, lack of initial value")
  165. backlog :+= (sender() -> msg)
  166. }
  167. }
  168. }
  169. object Counter {
  170. case class UseStorage(storage: Option[ActorRef])
  171. }
  172. /**
  173. * The in memory count variable that will send current
  174. * value to the `Storage`, if there is any storage
  175. * available at the moment.
  176. */
  177. class Counter(key: String, initialValue: Long) extends Actor {
  178. import Counter._
  179. import CounterService._
  180. import Storage._
  181. var count = initialValue
  182. var storage: Option[ActorRef] = None
  183. def receive = LoggingReceive {
  184. case UseStorage(s) =>
  185. storage = s
  186. storeCount()
  187. case Increment(n) =>
  188. count += n
  189. storeCount()
  190. case GetCurrentCount =>
  191. sender() ! CurrentCount(key, count)
  192. }
  193. def storeCount() {
  194. // Delegate dangerous work, to protect our valuable state.
  195. // We can continue without storage.
  196. storage foreach { _ ! Store(Entry(key, count)) }
  197. }
  198. }
  199. object Storage {
  200. case class Store(entry: Entry)
  201. case class Get(key: String)
  202. case class Entry(key: String, value: Long)
  203. class StorageException(msg: String) extends RuntimeException(msg)
  204. }
  205. /**
  206. * Saves key/value pairs to persistent storage when receiving `Store` message.
  207. * Replies with current value when receiving `Get` message.
  208. * Will throw StorageException if the underlying data store is out of order.
  209. */
  210. class Storage extends Actor {
  211. import Storage._
  212. val db = DummyDB
  213. def receive = LoggingReceive {
  214. case Store(Entry(key, count)) => db.save(key, count)
  215. case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
  216. }
  217. }
  218. object DummyDB {
  219. import Storage.StorageException
  220. private var db = Map[String, Long]()
  221. @throws(classOf[StorageException])
  222. def save(key: String, value: Long): Unit = synchronized {
  223. if (11 <= value && value <= 14)
  224. throw new StorageException("Simulated store failure " + value)
  225. db += (key -> value)
  226. }
  227. @throws(classOf[StorageException])
  228. def load(key: String): Option[Long] = synchronized {
  229. db.get(key)
  230. }
  231. }