- Introduction
- 1. 引言
- 2. 概述
- 3. Actors
- 4. Futures与Agents
- 5. 网络
- 6. 实用工具
- 7. 如何使用:常用模式
- 8. 实验模块
- 9. Akka开发者信息
- 10. 工程信息
- 11. 附加信息
- Published using GitBook
AKKA 2.3.6 Scala 文档
容错示例

以上图示了正常的消息流。
正常流程:
| 步骤 | 描述 |
|---|---|
| 1 | 过程Listener启动工作。 |
| 2 | Worker通过定期向自己发送Do消息来计划工作 |
| 3, 4, 5 | 接收到Do时Worker通知CounterService更新计数器值,三次。Increment消息被转发给Counter,它会更新自己的计数器变量并将当前值发给Storage。 |
| 6, 7 | Worker向CounterService请求计数器的当前值并将结果回送给Listener。 |

以上图示了当发生数据库失败时的过程
失败流程:
| 步骤 | 描述 |
|---|---|
| 1 | Storage抛出 StorageException。 |
| 2 | CounterService是Storage的监管者,StorageException被抛出时它将重启Storage。 |
| 3, 4, 5, 6 | Storage仍旧失败,又被重启. |
| 7 | 在5秒内三次失败和重启后Storage被它的监管者,即CounterService终止。 |
| 8 | CounterService同时监视着Storage并在Storage被终止时收到Terminated消息… |
| 9, 10, 11 | 告诉Counter当前没有可用的Storage。 |
| 12 | CounterService计划一个Reconnect消息发给自己。 |
| 13, 14 | 收到Reconnect消息后它创建一个新的Storage… |
| 15, 16 | 并通知Counter使用新的Storage。 |
容错示例完整源代码
import akka.actor._import akka.actor.SupervisorStrategy._import scala.concurrent.duration._import akka.util.Timeoutimport akka.event.LoggingReceiveimport akka.pattern.{ ask, pipe }import com.typesafe.config.ConfigFactory/*** Runs the sample*/object FaultHandlingDocSample extends App {import Worker._val config = ConfigFactory.parseString("""akka.loglevel = "DEBUG"akka.actor.debug {receive = onlifecycle = on}""")val system = ActorSystem("FaultToleranceSample", config)val worker = system.actorOf(Props[Worker], name = "worker")val listener = system.actorOf(Props[Listener], name = "listener")// start the work and listen on progress// note that the listener is used as sender of the tell,// i.e. it will receive replies from the workerworker.tell(Start, sender = listener)}/*** Listens on progress from the worker and shuts down the system when enough* work has been done.*/class Listener extends Actor with ActorLogging {import Worker._// If we don't get any progress within 15 seconds then the service is unavailablecontext.setReceiveTimeout(15 seconds)def receive = {case Progress(percent) =>log.info("Current progress: {} %", percent)if (percent >= 100.0) {log.info("That's all, shutting down")context.system.shutdown()}case ReceiveTimeout =>// No progress within 15 seconds, ServiceUnavailablelog.error("Shutting down due to unavailable service")context.system.shutdown()}}object Worker {case object Startcase object Docase class Progress(percent: Double)}/*** Worker performs some work when it receives the `Start` message.* It will continuously notify the sender of the `Start` message* of current ``Progress``. The `Worker` supervise the `CounterService`.*/class Worker extends Actor with ActorLogging {import Worker._import CounterService._implicit val askTimeout = Timeout(5 seconds)// Stop the CounterService child if it throws ServiceUnavailableoverride val supervisorStrategy = OneForOneStrategy() {case _: CounterService.ServiceUnavailable => Stop}// The sender of the initial Start message will continuously be notified// about progressvar progressListener: Option[ActorRef] = Noneval counterService = context.actorOf(Props[CounterService], name = "counter")val totalCount = 51import context.dispatcher // Use this Actors' Dispatcher as ExecutionContextdef receive = LoggingReceive {case Start if progressListener.isEmpty =>progressListener = Some(sender)context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)case Do =>counterService ! Increment(1)counterService ! Increment(1)counterService ! Increment(1)// Send current progress to the initial sendercounterService ? GetCurrentCount map {case CurrentCount(_, count) => Progress(100.0 * count / totalCount)} pipeTo progressListener.get}}object CounterService {case class Increment(n: Int)case object GetCurrentCountcase class CurrentCount(key: String, count: Long)class ServiceUnavailable(msg: String) extends RuntimeException(msg)private case object Reconnect}/*** Adds the value received in `Increment` message to a persistent* counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.* `CounterService` supervise `Storage` and `Counter`.*/class CounterService extends Actor {import CounterService._import Counter._import Storage._// Restart the storage child when StorageException is thrown.// After 3 restarts within 5 seconds it will be stopped.override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3,withinTimeRange = 5 seconds) {case _: Storage.StorageException => Restart}val key = self.path.namevar storage: Option[ActorRef] = Nonevar counter: Option[ActorRef] = Nonevar backlog = IndexedSeq.empty[(ActorRef, Any)]val MaxBacklog = 10000import context.dispatcher // Use this Actors' Dispatcher as ExecutionContextoverride def preStart() {initStorage()}/*** The child storage is restarted in case of failure, but after 3 restarts,* and still failing it will be stopped. Better to back-off than continuously* failing. When it has been stopped we will schedule a Reconnect after a delay.* Watch the child so we receive Terminated message when it has been terminated.*/def initStorage() {storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))// Tell the counter, if any, to use the new storagecounter foreach { _ ! UseStorage(storage) }// We need the initial value to be able to operatestorage.get ! Get(key)}def receive = LoggingReceive {case Entry(k, v) if k == key && counter == None =>// Reply from Storage of the initial value, now we can create the Counterval c = context.actorOf(Props(classOf[Counter], key, v))counter = Some(c)// Tell the counter to use current storagec ! UseStorage(storage)// and send the buffered backlog to the counterfor ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)backlog = IndexedSeq.emptycase msg @ Increment(n) => forwardOrPlaceInBacklog(msg)case msg @ GetCurrentCount => forwardOrPlaceInBacklog(msg)case Terminated(actorRef) if Some(actorRef) == storage =>// After 3 restarts the storage child is stopped.// We receive Terminated because we watch the child, see initStorage.storage = None// Tell the counter that there is no storage for the momentcounter foreach { _ ! UseStorage(None) }// Try to re-establish storage after whilecontext.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)case Reconnect =>// Re-establish storage after the scheduled delayinitStorage()}def forwardOrPlaceInBacklog(msg: Any) {// We need the initial value from storage before we can start delegate to// the counter. Before that we place the messages in a backlog, to be sent// to the counter when it is initialized.counter match {case Some(c) => c forward msgcase None =>if (backlog.size >= MaxBacklog)throw new ServiceUnavailable("CounterService not available, lack of initial value")backlog :+= (sender() -> msg)}}}object Counter {case class UseStorage(storage: Option[ActorRef])}/*** The in memory count variable that will send current* value to the `Storage`, if there is any storage* available at the moment.*/class Counter(key: String, initialValue: Long) extends Actor {import Counter._import CounterService._import Storage._var count = initialValuevar storage: Option[ActorRef] = Nonedef receive = LoggingReceive {case UseStorage(s) =>storage = sstoreCount()case Increment(n) =>count += nstoreCount()case GetCurrentCount =>sender() ! CurrentCount(key, count)}def storeCount() {// Delegate dangerous work, to protect our valuable state.// We can continue without storage.storage foreach { _ ! Store(Entry(key, count)) }}}object Storage {case class Store(entry: Entry)case class Get(key: String)case class Entry(key: String, value: Long)class StorageException(msg: String) extends RuntimeException(msg)}/*** Saves key/value pairs to persistent storage when receiving `Store` message.* Replies with current value when receiving `Get` message.* Will throw StorageException if the underlying data store is out of order.*/class Storage extends Actor {import Storage._val db = DummyDBdef receive = LoggingReceive {case Store(Entry(key, count)) => db.save(key, count)case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))}}object DummyDB {import Storage.StorageExceptionprivate var db = Map[String, Long]()@throws(classOf[StorageException])def save(key: String, value: Long): Unit = synchronized {if (11 <= value && value <= 14)throw new StorageException("Simulated store failure " + value)db += (key -> value)}@throws(classOf[StorageException])def load(key: String): Option[Long] = synchronized {db.get(key)}}
