作为一个例子,我们有一个web 应用程序与远程的第三方web服务进行交互。假如第三方已用完了他们的容量,他们的数据库也在高荷载作用下熔化。假设数据库在这种情况下失败,第三方 web 服务用了很长的时间来返回一个错误。这进一步使调用在长一段时间后失败。回到我们的 web 应用程序,用户已注意到其表单提交似乎比需要的使用了更长的时间。当然用户知道要做的是点击刷新按钮,将更多的请求添加到其已在运行的请求中。这最终导致 web 应用程序由于资源枯竭而失败。这将会影响所有用户,甚至是那些没有使用依赖于此第三方 web 服务的功能的。

为 web 服务的调用引入断路器会导致请求开始快速失败,让用户知道有什么地方不对劲,并且他们不需要刷新他们的请求。这还局限失效行为只影响到那些正在使用此第三方功能依赖的用户,因为没有资源枯竭,其他用户不再受影响。电路断路器还可以允许聪明的开发者来标记使用不可用功能的网站部分,或也许在断路器处于打开状态时,显示出一些适当的缓存内容。

Akka库提供名为 akka.pattern.CircuitBreaker的断路器实现,具有如下所述的行为。


  • 在正常操作期间,断路器处于Closed状态:
    • 异常或超过配置的 callTimeout 的调用增加一个失败计数
    • 成功重置失败计数为零
    • 当失败计数达到 maxFailures 时,断路器跳入Open状态
  • Open状态:
    • 所有调用通过 CircuitBreakerOpenException 快速失败
    • 经过配置的 resetTimeout,断路器进入Half-Open状态
  • Half-Open状态:
    • 第一个调用被允许尝试,而不通过快速失败
    • 其他调用就像在Open状态一样快速失败
    • 如果第一次调用成功,断路器是重置回Closed状态
    • 如果第一次调用失败,断路器再次跳入Open状态并经历另一个完整的resetTimeout
  • 状态转换监听器:
    • 可以为每个状态条目通过 onOpenonCloseonHalfOpen 提供回调
    • 这些都在提供的 ExecutionContext 中执行。

  • 最多失败5次
  • 调用超时时间为 10 秒
  • 重置超时时间为 1 分钟
  1. import scala.concurrent.duration._
  2. import akka.pattern.CircuitBreaker
  3. import akka.pattern.pipe
  4. import akka.actor.Actor
  5. import akka.actor.ActorLogging
  6. import scala.concurrent.Future
  7. import akka.event.Logging
  8. class DangerousActor extends Actor with ActorLogging {
  9. import context.dispatcher
  10. val breaker =
  11. new CircuitBreaker(context.system.scheduler,
  12. maxFailures = 5,
  13. callTimeout = 10.seconds,
  14. resetTimeout = 1.minute).onOpen(notifyMeOnOpen())
  15. def notifyMeOnOpen(): Unit =
  16. log.warning("My CircuitBreaker is now open, and will not close for one minute")
  1. import akka.actor.UntypedActor;
  2. import scala.concurrent.Future;
  3. import akka.event.LoggingAdapter;
  4. import scala.concurrent.duration.Duration;
  5. import akka.pattern.CircuitBreaker;
  6. import akka.event.Logging;
  7. import static akka.pattern.Patterns.pipe;
  8. import static akka.dispatch.Futures.future;
  9. import java.util.concurrent.Callable;
  10. public class DangerousJavaActor extends UntypedActor {
  11. private final CircuitBreaker breaker;
  12. private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
  13. public DangerousJavaActor() {
  14. this.breaker = new CircuitBreaker(
  15. getContext().dispatcher(), getContext().system().scheduler(),
  16. 5, Duration.create(10, "s"), Duration.create(1, "m"))
  17. .onOpen(new Runnable() {
  18. public void run() {
  19. notifyMeOnOpen();
  20. }
  21. });
  22. }
  23. public void notifyMeOnOpen() {
  24. log.warning("My CircuitBreaker is now open, and will not close for one minute");
  25. }


  1. def dangerousCall: String = "This really isn't that dangerous of a call after all"
  2. def receive = {
  3. case "is my middle name" =>
  4. breaker.withCircuitBreaker(Future(dangerousCall)) pipeTo sender()
  5. case "block for me" =>
  6. sender() ! breaker.withSyncCircuitBreaker(dangerousCall)
  7. }
  1. public String dangerousCall() {
  2. return "This really isn't that dangerous of a call after all";
  3. }
  4. @Override
  5. public void onReceive(Object message) {
  6. if (message instanceof String) {
  7. String m = (String) message;
  8. if ("is my middle name".equals(m)) {
  9. pipe(breaker.callWithCircuitBreaker(
  10. new Callable<Future<String>>() {
  11. public Future<String> call() throws Exception {
  12. return future(
  13. new Callable<String>() {
  14. public String call() {
  15. return dangerousCall();
  16. }
  17. }, getContext().dispatcher());
  18. }
  19. }), getContext().dispatcher()).to(getSender());
  20. }
  21. if ("block for me".equals(m)) {
  22. getSender().tell(breaker
  23. .callWithSyncCircuitBreaker(
  24. new Callable<String>() {
  25. @Override
  26. public String call() throws Exception {
  27. return dangerousCall();
  28. }
  29. }), getSelf());
  30. }
  31. }
  32. }

