• 容错示例
    • 容错示例完整源代码">容错示例完整源代码

    容错示例

    容错示例 - 图1

    以上图示了正常的消息流。

    正常流程:

    步骤 描述
    1 过程Listener启动工作。
    2 Worker通过定期向自己发送Do消息来计划工作
    3, 4, 5 接收到DoWorker通知CounterService更新计数器值,三次。Increment消息被转发给Counter,它会更新自己的计数器变量并将当前值发给Storage
    6, 7 WorkerCounterService请求计数器的当前值并将结果回送给Listener

    容错示例 - 图2

    以上图示了当发生数据库失败时的过程

    失败流程:

    步骤 描述
    1 Storage抛出 StorageException
    2 CounterServiceStorage的监管者,StorageException被抛出时它将重启Storage
    3, 4, 5, 6 Storage仍旧失败,又被重启.
    7 在5秒内三次失败和重启后Storage被它的监管者,即CounterService终止。
    8 CounterService同时监视着Storage并在Storage被终止时收到Terminated消息…
    9, 10, 11 告诉Counter当前没有可用的Storage
    12 CounterService计划一个Reconnect消息发给自己。
    13, 14 收到Reconnect消息后它创建一个新的Storage
    15, 16 并通知Counter使用新的Storage
    容错示例完整源代码
    1. import akka.actor._
    2. import akka.actor.SupervisorStrategy._
    3. import scala.concurrent.duration._
    4. import akka.util.Timeout
    5. import akka.event.LoggingReceive
    6. import akka.pattern.{ ask, pipe }
    7. import com.typesafe.config.ConfigFactory
    8. /**
    9. * Runs the sample
    10. */
    11. object FaultHandlingDocSample extends App {
    12. import Worker._
    13. val config = ConfigFactory.parseString("""
    14. akka.loglevel = "DEBUG"
    15. akka.actor.debug {
    16. receive = on
    17. lifecycle = on
    18. }
    19. """)
    20. val system = ActorSystem("FaultToleranceSample", config)
    21. val worker = system.actorOf(Props[Worker], name = "worker")
    22. val listener = system.actorOf(Props[Listener], name = "listener")
    23. // start the work and listen on progress
    24. // note that the listener is used as sender of the tell,
    25. // i.e. it will receive replies from the worker
    26. worker.tell(Start, sender = listener)
    27. }
    28. /**
    29. * Listens on progress from the worker and shuts down the system when enough
    30. * work has been done.
    31. */
    32. class Listener extends Actor with ActorLogging {
    33. import Worker._
    34. // If we don't get any progress within 15 seconds then the service is unavailable
    35. context.setReceiveTimeout(15 seconds)
    36. def receive = {
    37. case Progress(percent) =>
    38. log.info("Current progress: {} %", percent)
    39. if (percent >= 100.0) {
    40. log.info("That's all, shutting down")
    41. context.system.shutdown()
    42. }
    43. case ReceiveTimeout =>
    44. // No progress within 15 seconds, ServiceUnavailable
    45. log.error("Shutting down due to unavailable service")
    46. context.system.shutdown()
    47. }
    48. }
    49. object Worker {
    50. case object Start
    51. case object Do
    52. case class Progress(percent: Double)
    53. }
    54. /**
    55. * Worker performs some work when it receives the `Start` message.
    56. * It will continuously notify the sender of the `Start` message
    57. * of current ``Progress``. The `Worker` supervise the `CounterService`.
    58. */
    59. class Worker extends Actor with ActorLogging {
    60. import Worker._
    61. import CounterService._
    62. implicit val askTimeout = Timeout(5 seconds)
    63. // Stop the CounterService child if it throws ServiceUnavailable
    64. override val supervisorStrategy = OneForOneStrategy() {
    65. case _: CounterService.ServiceUnavailable => Stop
    66. }
    67. // The sender of the initial Start message will continuously be notified
    68. // about progress
    69. var progressListener: Option[ActorRef] = None
    70. val counterService = context.actorOf(Props[CounterService], name = "counter")
    71. val totalCount = 51
    72. import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
    73. def receive = LoggingReceive {
    74. case Start if progressListener.isEmpty =>
    75. progressListener = Some(sender)
    76. context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)
    77. case Do =>
    78. counterService ! Increment(1)
    79. counterService ! Increment(1)
    80. counterService ! Increment(1)
    81. // Send current progress to the initial sender
    82. counterService ? GetCurrentCount map {
    83. case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
    84. } pipeTo progressListener.get
    85. }
    86. }
    87. object CounterService {
    88. case class Increment(n: Int)
    89. case object GetCurrentCount
    90. case class CurrentCount(key: String, count: Long)
    91. class ServiceUnavailable(msg: String) extends RuntimeException(msg)
    92. private case object Reconnect
    93. }
    94. /**
    95. * Adds the value received in `Increment` message to a persistent
    96. * counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
    97. * `CounterService` supervise `Storage` and `Counter`.
    98. */
    99. class CounterService extends Actor {
    100. import CounterService._
    101. import Counter._
    102. import Storage._
    103. // Restart the storage child when StorageException is thrown.
    104. // After 3 restarts within 5 seconds it will be stopped.
    105. override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3,
    106. withinTimeRange = 5 seconds) {
    107. case _: Storage.StorageException => Restart
    108. }
    109. val key = self.path.name
    110. var storage: Option[ActorRef] = None
    111. var counter: Option[ActorRef] = None
    112. var backlog = IndexedSeq.empty[(ActorRef, Any)]
    113. val MaxBacklog = 10000
    114. import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
    115. override def preStart() {
    116. initStorage()
    117. }
    118. /**
    119. * The child storage is restarted in case of failure, but after 3 restarts,
    120. * and still failing it will be stopped. Better to back-off than continuously
    121. * failing. When it has been stopped we will schedule a Reconnect after a delay.
    122. * Watch the child so we receive Terminated message when it has been terminated.
    123. */
    124. def initStorage() {
    125. storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
    126. // Tell the counter, if any, to use the new storage
    127. counter foreach { _ ! UseStorage(storage) }
    128. // We need the initial value to be able to operate
    129. storage.get ! Get(key)
    130. }
    131. def receive = LoggingReceive {
    132. case Entry(k, v) if k == key && counter == None =>
    133. // Reply from Storage of the initial value, now we can create the Counter
    134. val c = context.actorOf(Props(classOf[Counter], key, v))
    135. counter = Some(c)
    136. // Tell the counter to use current storage
    137. c ! UseStorage(storage)
    138. // and send the buffered backlog to the counter
    139. for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
    140. backlog = IndexedSeq.empty
    141. case msg @ Increment(n) => forwardOrPlaceInBacklog(msg)
    142. case msg @ GetCurrentCount => forwardOrPlaceInBacklog(msg)
    143. case Terminated(actorRef) if Some(actorRef) == storage =>
    144. // After 3 restarts the storage child is stopped.
    145. // We receive Terminated because we watch the child, see initStorage.
    146. storage = None
    147. // Tell the counter that there is no storage for the moment
    148. counter foreach { _ ! UseStorage(None) }
    149. // Try to re-establish storage after while
    150. context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
    151. case Reconnect =>
    152. // Re-establish storage after the scheduled delay
    153. initStorage()
    154. }
    155. def forwardOrPlaceInBacklog(msg: Any) {
    156. // We need the initial value from storage before we can start delegate to
    157. // the counter. Before that we place the messages in a backlog, to be sent
    158. // to the counter when it is initialized.
    159. counter match {
    160. case Some(c) => c forward msg
    161. case None =>
    162. if (backlog.size >= MaxBacklog)
    163. throw new ServiceUnavailable(
    164. "CounterService not available, lack of initial value")
    165. backlog :+= (sender() -> msg)
    166. }
    167. }
    168. }
    169. object Counter {
    170. case class UseStorage(storage: Option[ActorRef])
    171. }
    172. /**
    173. * The in memory count variable that will send current
    174. * value to the `Storage`, if there is any storage
    175. * available at the moment.
    176. */
    177. class Counter(key: String, initialValue: Long) extends Actor {
    178. import Counter._
    179. import CounterService._
    180. import Storage._
    181. var count = initialValue
    182. var storage: Option[ActorRef] = None
    183. def receive = LoggingReceive {
    184. case UseStorage(s) =>
    185. storage = s
    186. storeCount()
    187. case Increment(n) =>
    188. count += n
    189. storeCount()
    190. case GetCurrentCount =>
    191. sender() ! CurrentCount(key, count)
    192. }
    193. def storeCount() {
    194. // Delegate dangerous work, to protect our valuable state.
    195. // We can continue without storage.
    196. storage foreach { _ ! Store(Entry(key, count)) }
    197. }
    198. }
    199. object Storage {
    200. case class Store(entry: Entry)
    201. case class Get(key: String)
    202. case class Entry(key: String, value: Long)
    203. class StorageException(msg: String) extends RuntimeException(msg)
    204. }
    205. /**
    206. * Saves key/value pairs to persistent storage when receiving `Store` message.
    207. * Replies with current value when receiving `Get` message.
    208. * Will throw StorageException if the underlying data store is out of order.
    209. */
    210. class Storage extends Actor {
    211. import Storage._
    212. val db = DummyDB
    213. def receive = LoggingReceive {
    214. case Store(Entry(key, count)) => db.save(key, count)
    215. case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
    216. }
    217. }
    218. object DummyDB {
    219. import Storage.StorageException
    220. private var db = Map[String, Long]()
    221. @throws(classOf[StorageException])
    222. def save(key: String, value: Long): Unit = synchronized {
    223. if (11 <= value && value <= 14)
    224. throw new StorageException("Simulated store failure " + value)
    225. db += (key -> value)
    226. }
    227. @throws(classOf[StorageException])
    228. def load(key: String): Option[Long] = synchronized {
    229. db.get(key)
    230. }
    231. }