1.引言

简介

  • Hystrix是一个开源的容错框架,可以用于隔离各个远程服务调用,防止因为某一个节点调用失败导致整个调用链路都崩溃

    部分概念

  • 服务降级:指的是当调用的服务出现异常/超时时,可以返回一个备用的方案,不至于直接抛异常

  • 服务熔断:和服务降级有点类似,但是在Hystrix中还是比较容易区别的;服务熔断指的是当调用的服务异常/超时达到一定的阈值后,服务就会熔断;这时候再去调用服务,服务直接返回备用方案(降级);
  • 熔断和降级区别:服务降级是有实际的去调用服务,但是因为服务故障,所以再去调用备用方案;而服务熔断则不会去调用真实服务,而直接调用备用方案
  • 服务隔离:Hystrix中的服务隔离分为线程隔离和信号量隔离;当系统中某个服务故障后,如果没有隔离,而且访问量很大,会导致所有线程都去调用这个服务,而请求其他可用服务时,没有线程(资源)可以去执行了;所以需要对服务做隔离;以线程隔离为例,每种服务都有一个线程池,调用该服务时,会把请求交给它的线程池去执行;这样,即使该服务挂了,大量请求涌入,最终还是交给该服务的线程池执行,而其他服务的线程池(资源)不受影响,还是可用的

    前置知识

  • Hystrix的源码中涉及到了大量的RxJava方面的内容,所以如果要有比较好的阅读体验,最好先了解一下RxJava的一些内容;

  • RxJava运用了类似发布-订阅的模式,事件源发布事件后,可以对事件流做流式处理;在Android中应用的比较多
  • 本文不会对RxJava做过多介绍(因为博主也不大会。。。),觉得有需要的可以先了解一下再看源码可能比较好理解一些

2.Hystrix总览

执行流程

  • 先看一下官方给出的执行流程图

hystrix1.png

  • 总体执行步骤如下
    • 根据要调用的服务构造HystrixCommand,代表本次调用的服务;toObservable()方法会将服务构造成一个Observable对象,这样可以在该事件上做一些处理;这部分涉及RxJava知识
    • 接下来执行HystrixCommand,先判断该服务是否有缓存结果,有则执行返回;没有则执行下一步
    • 判断断路器CircuitBreaker是否打开,即服务的熔断状态,打开则执行调用fallback降级方法并返回;否则下一步
    • 判断是否能够拿到资源(线程/信号量),不能则直接降级;否则下一步
    • 执行服务,如果服务执行失败/超时,则调用降级方法返回;成功则直接返回
    • 执行服务时,成功或失败都会反馈到图中的calculate circuit health,它的作用是监控服务的执行情况,并根据异常情况来决定打开还是关闭熔断器

源码分析

  • 通过上面的分析可以看出,Hystrix的入口是HystrixCommand;测试时,可以自己构造一个HystrixCommand来执行服务,或者直接使用注解方式;
  • 如果使用手动创建,只需要创建HystrixCommand并实现run方法即可
  • 如果使用注解@HystrixCommand,那么Hystrix会在IOC容器中注入一个HystrixCommandAspect切面类,该切面会对含有@HystrixCommand注解的方法做代理;大致的逻辑就是封装该方法调用成为HystrixCommand对象,然后执行即可;源码可参考如下

    1. //HystrixCommandAspect#methodsAnnotatedWithHystrixCommand
    2. //这是一个环绕增强,切点是@HystrixCommand注解的方法
    3. @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    4. public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    5. Method method = getMethodFromTarget(joinPoint);
    6. ...
    7. MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    8. //创建一个methodHolder,把joinPoint封装进来,这样后续joinPoint的执行都在HystrixCommand中
    9. MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
    10. //这里会创建一个HystrixCommand对象,即本次调用服务,HystrixCommand同时也是HystrixInvokable类型
    11. HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    12. ...
    13. try {
    14. if (!metaHolder.isObservable()) {
    15. //接着就是执行了,这个方法调用最终会调用到HystrixCommand.execute()方法
    16. result = CommandExecutor.execute(invokable, executionType, metaHolder);
    17. } else {
    18. result = executeObservable(invokable, executionType, metaHolder);
    19. }
    20. }...
    21. return result;
    22. }
  • 创建HystrixCommand时,由于它是抽象类,常用的实现类是GenericCommand;不过重点逻辑都在HystrixCommand和它的父类AbstractCommand中;接下来看看AbstractCommand的构造方法

    1. protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
    2. HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
    3. HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
    4. HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
    5. //初始化命令及其组的key,这个key用于标识该服务,后面很多组件就可以通过key来拿到
    6. this.commandGroup = initGroupKey(group);
    7. this.commandKey = initCommandKey(key, getClass());
    8. //初始化一些属性
    9. this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
    10. this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    11. //metrics会负责统计该服务的各种调用情况
    12. this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    13. //该服务的熔断器
    14. this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
    15. //该服务的线程池
    16. this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
    17. //Strategies from plugins
    18. this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    19. this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    20. HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    21. this.executionHook = initExecutionHook(executionHook);
    22. this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    23. this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
    24. /* fallback semaphore override if applicable */
    25. this.fallbackSemaphoreOverride = fallbackSemaphore;
    26. /* execution semaphore override if applicable */
    27. this.executionSemaphoreOverride = executionSemaphore;
    28. }
  • 需要注意的是,每次服务调用都会生成一个HystrixCommand,但是该对象中的很多依赖都是有缓存的,所以只有第一次生成该服务的HystrixCommand时,才会创建,后面直接从缓存取;

  • 接下来看看HystrixCommand的execute方法,它是Hystrix方法执行的入口 ```java public R execute() { try {
    1. //queue()方法返回future,再调用get阻塞等待
    2. return queue().get();
    } catch (Exception e) {
    1. throw Exceptions.sneakyThrow(decomposeException(e));
    } }

public Future queue() { //toObservable会把服务封装为Observable final Future delegate = toObservable().toBlocking().toFuture(); … }

  1. - 接下来就是重点的toObservable()了,它会对服务进行封装,并且添加各种处理,这里面涉及很多RxJava知识;并且该方法比较长,此处做大量省略
  2. ```java
  3. public Observable<R> toObservable() {
  4. final AbstractCommand<R> _cmd = this;
  5. //命令执行完的回调操作
  6. final Action0 terminateCommandCleanup = new Action0() {
  7. ...
  8. };
  9. //unsubscribe取消订阅时执行的操作
  10. final Action0 unsubscribeCommandCleanup = new Action0() {
  11. ...
  12. };
  13. //这个是执行命令时的回调函数,重点
  14. final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
  15. @Override
  16. public Observable<R> call() {
  17. if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
  18. return Observable.never();
  19. }
  20. return applyHystrixSemantics(_cmd);
  21. }
  22. };
  23. final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
  24. ...
  25. };
  26. final Action0 fireOnCompletedHook = new Action0() {
  27. ...
  28. };
  29. //创建Observable,设置各种处理操作
  30. return Observable.defer(new Func0<Observable<R>>() {
  31. @Override
  32. public Observable<R> call() {
  33. //设置已启动标志
  34. if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
  35. IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
  36. //TODO make a new error type for this
  37. throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
  38. }
  39. commandStartTimestamp = System.currentTimeMillis();
  40. ...
  41. //缓存处理
  42. final boolean requestCacheEnabled = isRequestCachingEnabled();
  43. final String cacheKey = getCacheKey();
  44. /* try from cache first */
  45. if (requestCacheEnabled) {
  46. HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
  47. if (fromCache != null) {
  48. isResponseFromCache = true;
  49. return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
  50. }
  51. }
  52. //创建Observable,注意内部使用applyHystrixSemantics来生成Observable
  53. Observable<R> hystrixObservable =
  54. Observable.defer(applyHystrixSemantics)
  55. .map(wrapWithAllOnNextHooks);
  56. Observable<R> afterCache;
  57. // put in cache 根据是否缓存来做处理
  58. if (requestCacheEnabled && cacheKey != null) {
  59. // wrap it for caching
  60. HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
  61. HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
  62. if (fromCache != null) {
  63. // another thread beat us so we'll use the cached value instead
  64. toCache.unsubscribe();
  65. isResponseFromCache = true;
  66. return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
  67. } else {
  68. // we just created an ObservableCommand so we cast and return it
  69. afterCache = toCache.toObservable();
  70. }
  71. } else {
  72. afterCache = hystrixObservable;
  73. }
  74. //这个afterCache最终是前面的hystrixObservable,或由他封装而来,
  75. //这里设置各种监听处理操作
  76. return afterCache
  77. .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
  78. .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
  79. .doOnCompleted(fireOnCompletedHook);
  80. }
  81. });
  82. }
  • 最终生成Observble的逻辑是applyHystrixSemantics由这个匿名内部类来执行,再看一下上面的这个类 ```java final Func0> applyHystrixSemantics = new Func0>() { @Override public Observable call() {
    1. if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
    2. return Observable.never();
    3. }
    4. //调用AbstractCommand的方法
    5. return applyHystrixSemantics(_cmd);
    } };

private Observable applyHystrixSemantics(final AbstractCommand _cmd) { // mark that we’re starting execution on the ExecutionHook // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent executionHook.onStart(_cmd);

  1. //执行前判断断路器是否打开
  2. if (circuitBreaker.allowRequest()) {
  3. //拿到该服务的信号量
  4. final TryableSemaphore executionSemaphore = getExecutionSemaphore();
  5. final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
  6. final Action0 singleSemaphoreRelease = new Action0() {
  7. @Override
  8. public void call() {
  9. if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
  10. executionSemaphore.release();
  11. }
  12. }
  13. };
  14. final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
  15. @Override
  16. public void call(Throwable t) {
  17. eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
  18. }
  19. };
  20. //尝试获取信号量
  21. if (executionSemaphore.tryAcquire()) {
  22. try {
  23. /* used to track userThreadExecutionTime */
  24. executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
  25. //调用executeCommandAndObserve创建Observable,并设置各种回调
  26. return executeCommandAndObserve(_cmd)
  27. .doOnError(markExceptionThrown)
  28. .doOnTerminate(singleSemaphoreRelease)
  29. .doOnUnsubscribe(singleSemaphoreRelease);
  30. } catch (RuntimeException e) {
  31. return Observable.error(e);
  32. }
  33. } else {
  34. //获取失败则降级
  35. return handleSemaphoreRejectionViaFallback();
  36. }
  37. } else {
  38. //断路器已打开,直接降级
  39. return handleShortCircuitViaFallback();
  40. }

}

  1. - 接下来是AbstractCommand#executeCommandAndObserve方法创建Observable
  2. ```java
  3. private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
  4. final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
  5. //下面仍然是几个回调匿名内部类
  6. final Action1<R> markEmits = new Action1<R>() {
  7. ...
  8. };
  9. final Action0 markOnCompleted = new Action0() {
  10. @Override
  11. public void call() {
  12. if (!commandIsScalar()) {
  13. long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
  14. eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
  15. eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
  16. executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
  17. circuitBreaker.markSuccess();
  18. }
  19. }
  20. };
  21. final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
  22. @Override
  23. public Observable<R> call(Throwable t) {
  24. Exception e = getExceptionFromThrowable(t);
  25. executionResult = executionResult.setExecutionException(e);
  26. if (e instanceof RejectedExecutionException) {
  27. return handleThreadPoolRejectionViaFallback(e);
  28. } else if (t instanceof HystrixTimeoutException) {
  29. return handleTimeoutViaFallback();
  30. } else if (t instanceof HystrixBadRequestException) {
  31. return handleBadRequestByEmittingError(e);
  32. } else {
  33. /*
  34. * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
  35. */
  36. if (e instanceof HystrixBadRequestException) {
  37. eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
  38. return Observable.error(e);
  39. }
  40. return handleFailureViaFallback(e);
  41. }
  42. }
  43. };
  44. final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
  45. ..
  46. };
  47. Observable<R> execution;
  48. //判断超时功能是否打开
  49. if (properties.executionTimeoutEnabled().get()) {
  50. //如果打开了,则通过lift操作来加入超时控制
  51. execution = executeCommandWithSpecifiedIsolation(_cmd)
  52. .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
  53. } else {
  54. execution = executeCommandWithSpecifiedIsolation(_cmd);
  55. }
  56. return execution.doOnNext(markEmits)
  57. .doOnCompleted(markOnCompleted)
  58. .onErrorResumeNext(handleFallback)
  59. .doOnEach(setRequestContext);
  60. }
  • executeCommandWithSpecifiedIsolation()方法会根据不同的隔离策略,生成不同的Observable;主要有按线程隔离和按信号量隔离

    1. private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    2. //线程隔离
    3. if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
    4. //创建一个Observable
    5. return Observable.defer(new Func0<Observable<R>>() {
    6. @Override
    7. public Observable<R> call() {
    8. ...
    9. //设置线程启动
    10. if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
    11. //we have not been unsubscribed, so should proceed
    12. HystrixCounters.incrementGlobalConcurrentThreads();
    13. threadPool.markThreadExecution();
    14. // store the command that is being run
    15. endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
    16. executionResult = executionResult.setExecutedInThread();
    17. /**
    18. * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
    19. */
    20. try {
    21. executionHook.onThreadStart(_cmd);
    22. executionHook.onRunStart(_cmd);
    23. executionHook.onExecutionStart(_cmd);
    24. //返回一个Observable,这个函数最终会返回一个封装了我们的run()逻辑的Observable
    25. return getUserExecutionObservable(_cmd);
    26. } catch (Throwable ex) {
    27. return Observable.error(ex);
    28. }
    29. } else {
    30. //command has already been unsubscribed, so return immediately
    31. return Observable.error(new RuntimeException("unsubscribed before executing run()"));
    32. }
    33. }
    34. //回调设置
    35. }).doOnTerminate(new Action0() {
    36. ...
    37. }).doOnUnsubscribe(new Action0() {
    38. ...
    39. }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
    40. ...
    41. }));
    42. } else {
    43. //信号量隔离
    44. ...
    45. }
    46. }
  • 到这里,就创建完成一个Observable对象了,创建过程中设置了各种回调操作;这些操作包括了超时监控,服务隔离执行,调用结果反馈等等;而这些才是Hystrix的重点,后面会慢慢解析;

3.Hystrix断路器

核心类

  • Hystrix对于每一个命令(HystrixCommand)都有一个相关的断路器HystrixCircuitBreaker;断路器负责管理该命令是否熔断
  • HystrixCircuitBreaker是断路器的接口,声明的方法比较简单

    1. public interface HystrixCircuitBreaker {
    2. //HystrixCommand命令执行前,会调用该方法,判断是否可以执行,不行就直接降级
    3. public boolean allowRequest();
    4. //判断当前断路器是否打开,打开则服务熔断
    5. public boolean isOpen();
    6. //标记命令执行成功,这个方法会关闭掉断路器
    7. void markSuccess();
    8. }
  • 每一个HystrixCommand命令都有一个断路器,在构造该对象的时候会从缓存中取出该命令的断路器;可以翻一下第二节中的源码,在这里再次贴一下HystrixCommand构造时的部分内容

    1. protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
    2. HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
    3. HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
    4. HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
    5. this.commandGroup = initGroupKey(group);
    6. this.commandKey = initCommandKey(key, getClass());
    7. this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
    8. this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    9. this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    10. //获取该命令的断路器
    11. this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
    12. this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
    13. }
  • HystrixCircuitBreakerImpl:是断路器的默认实现,看一下其成员变量

    1. static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    2. //存储相关属性
    3. private final HystrixCommandProperties properties;
    4. //监控指标,metrics负责该命令调用的各项指标信息,断路器会根据这些指标信息决定是否熔断
    5. private final HystrixCommandMetrics metrics;
    6. //标志断路器是否打开
    7. private AtomicBoolean circuitOpen = new AtomicBoolean(false);
    8. private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
    9. }

源码分析

  • 前面第二节分析HystrixCommand执行流程的时候,有如下过程会判断断路器是否打开

    1. private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    2. // mark that we're starting execution on the ExecutionHook
    3. // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
    4. executionHook.onStart(_cmd);
    5. //执行前判断断路器是否打开
    6. if (circuitBreaker.allowRequest()) {
    7. //拿到该服务的信号量
    8. final TryableSemaphore executionSemaphore = getExecutionSemaphore();
    9. final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
    10. final Action0 singleSemaphoreRelease = new Action0() {
    11. @Override
    12. public void call() {
    13. if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
    14. executionSemaphore.release();
    15. }
    16. }
    17. };
    18. }
    19. ...
    20. }
  • 接下来看一下HystrixCircuitBreakerImpl的allowRequest方法

    1. @Override
    2. public boolean allowRequest() {
    3. //检查是否配置了forceOpen属性,是则返回false,代表不允许执行
    4. if (properties.circuitBreakerForceOpen().get()) {
    5. // properties have asked us to force the circuit open so we will allow NO requests
    6. return false;
    7. }
    8. //检查是否配置了forceClose属性,是则返回true,代表允许执行
    9. if (properties.circuitBreakerForceClosed().get()) {
    10. // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
    11. isOpen();
    12. // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
    13. return true;
    14. }
    15. //如果没配置以上属性,那么断路器会根据当前命令调用情况判断断路器是否打开
    16. return !isOpen() || allowSingleTest();
    17. }
  • 看一下isOpen方法

    1. @Override
    2. public boolean isOpen() {
    3. //若断路器已打开,直接返回true
    4. if (circuitOpen.get()) {
    5. // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
    6. return true;
    7. }
    8. //如果现在断路器关闭,还需要通过metrics的指标判断以下是否已经达到了打开的标准
    9. HealthCounts health = metrics.getHealthCounts();
    10. // 判断请求次数是否到达阈值,未到达则不进行判断,直接返回false
    11. if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
    12. // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
    13. return false;
    14. }
    15. //判断异常率是否小于阈值
    16. if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
    17. return false;
    18. } else {
    19. //到这里说明异常率高于阈值了,需要打开断路器
    20. if (circuitOpen.compareAndSet(false, true)) {
    21. // if the previousValue was false then we want to set the currentTime
    22. circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
    23. return true;
    24. } else {
    25. //这里说明其他线程已经设置了断路器打开,直接返回true
    26. return true;
    27. }
    28. }
    29. }
  • isOpen会根据metrics统计的信息,判断断路器是否应该打开;如果isOpen返回true,代表断路器打开;但是打开了也可能会执行该命令(不然断路器一直打开,就永远没法执行了);因为Hystrix在断路器打开过了一定的时间窗口后,会放过去少量请求去执行,探测一下是否执行成功,成功就会关闭断路器;这就是接下来allowSingleTest方法的内容

    1. public boolean allowSingleTest() {
    2. long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
    3. //如果当前断路器已经打开,并且已经过了时间窗口指定的时间,则允许执行
    4. if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
    5. // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
    6. // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
    7. if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
    8. // if this returns true that means we set the time so we'll return true to allow the singleTest
    9. // if it returned false it means another thread raced us and allowed the singleTest before we did
    10. return true;
    11. }
    12. }
    13. return false;
    14. }
  • 当命令执行成功后,会调用markSuccess命令,关闭断路器,并重置Metrics的统计信息

    1. public void markSuccess() {
    2. if (circuitOpen.get()) {
    3. if (circuitOpen.compareAndSet(true, false)) {
    4. //win the thread race to reset metrics
    5. //Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,
    6. //and all other metric consumers are unaffected by the reset
    7. metrics.resetStream();
    8. }
    9. }
    10. }

4.Hystrix隔离策略

概述

  • Hystrix的隔离策略主要有
    • 信号量隔离:每个命令执行前必须拿到信号量,如果拿不到,就调用fallback方法降级
    • 线程隔离:任务执行在单独的线程里

源码分析

  • 在第二节分析命令执行过程中,有如下过程

    1. private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    2. ...
    3. //判断断路器是否打开
    4. if (circuitBreaker.allowRequest()) {
    5. //拿到执行的信号量,不同隔离策略的信号量实现类不同
    6. final TryableSemaphore executionSemaphore = getExecutionSemaphore();
    7. final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
    8. ...
    9. //尝试获取信号量
    10. if (executionSemaphore.tryAcquire()) {
    11. try {
    12. /* used to track userThreadExecutionTime */
    13. executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
    14. return executeCommandAndObserve(_cmd)
    15. .doOnError(markExceptionThrown)
    16. .doOnTerminate(singleSemaphoreRelease)
    17. .doOnUnsubscribe(singleSemaphoreRelease);
    18. } catch (RuntimeException e) {
    19. return Observable.error(e);
    20. }
    21. } else {
    22. //获取失败则直接降级
    23. return handleSemaphoreRejectionViaFallback();
    24. }
    25. } else {
    26. return handleShortCircuitViaFallback();
    27. }
    28. }
  • 从上面可以看出,无论是哪种隔离策略,都必须拿到信号量才行;而不同的隔离策略,拿到的信号量是不同的

    1. protected TryableSemaphore getExecutionSemaphore() {
    2. //信号量隔离
    3. if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
    4. //若这个变量不为空,直接返回该信号量
    5. if (executionSemaphoreOverride == null) {
    6. //根据commandKey拿到属于这个命令的信号量,每个断路器都有一个信号量
    7. TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
    8. if (_s == null) {
    9. //如果没有则创建并缓存
    10. executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
    11. return executionSemaphorePerCircuit.get(commandKey.name());
    12. } else {
    13. return _s;
    14. }
    15. } else {
    16. return executionSemaphoreOverride;
    17. }
    18. } else {
    19. //线程隔离策略的话,返回NoOp类型的信号量,这种信号量其实是空的
    20. return TryableSemaphoreNoOp.DEFAULT;
    21. }
    22. }
  • 当策略是线程隔离时,返回NoOp类型的信号量,这种信号量直接是不操作,返回true

    1. static class TryableSemaphoreNoOp implements TryableSemaphore {
    2. public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
    3. //可以看到并没有维护信号量,而是直接返回true
    4. @Override
    5. public boolean tryAcquire() {
    6. return true;
    7. }
    8. @Override
    9. public void release() {
    10. }
    11. ...
    12. }
  • 拿到执行的信号量之后,executeCommandAndObserve方法里面会调用如下方法

    1. private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    2. //判断是否为线程隔离
    3. if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
    4. //返回Observable,注意最后一个方法调用subscribeOn()该方法是指定Observable在另外的线程中执行
    5. return Observable.defer(new Func0<Observable<R>>() {
    6. @Override
    7. public Observable<R> call() {
    8. ...
    9. }
    10. }).doOnTerminate(new Action0() {
    11. ...
    12. }).doOnUnsubscribe(new Action0() {
    13. ...
    14. }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
    15. ...
    16. }));
    17. } else {
    18. //信号量隔离
    19. return Observable.defer(new Func0<Observable<R>>() {
    20. ...
    21. }
    22. }
  • Observable.subscribeOn()方法是RxJava中的用法,该方法会传入一个Scheduler,让Observable.call()在该调度器的线程中执行,从而实现线程隔离

5.Hystrix超时

概述

  • Hystrix当开启超时控制时,会在命令执行时启动一个定时任务,当命令执行超过指定时间后,触发定时任务;
  • 该定时任务会把命令的状态切换为超时,并且发布超时事件,这样观察者就可以做出相应的操作了(如结束命令,反馈执行情况等)

源码分析

  • 在第二节执行HystrixCommand命令时,会经过如下方法

    1. private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    2. final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
    3. ...
    4. Observable<R> execution;
    5. //判断是否开启超时控制
    6. if (properties.executionTimeoutEnabled().get()) {
    7. //超时控制时,会调用lift方法,传入超时控制器HystrixObservableTimeoutOperator
    8. execution = executeCommandWithSpecifiedIsolation(_cmd)
    9. .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    10. } else {
    11. execution = executeCommandWithSpecifiedIsolation(_cmd);
    12. }
    13. return execution.doOnNext(markEmits)
    14. ...
    15. }
  • lift方法也是RxJava中的方法,会对Observable执行后的返回结果进行类型变换;重点看一下HystrixObservableTimeoutOperator类的call方法

    1. public Subscriber<? super R> call(final Subscriber<? super R> child) {
    2. final CompositeSubscription s = new CompositeSubscription();
    3. // if the child unsubscribes we unsubscribe our parent as well
    4. child.add(s);
    5. //超时的策略,调用onError发布超时事件
    6. final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
    7. @Override
    8. public void run() {
    9. child.onError(new HystrixTimeoutException());
    10. }
    11. });
    12. //当命令超时时,会调用listener的tick方法
    13. TimerListener listener = new TimerListener() {
    14. @Override
    15. public void tick() {
    16. //尝试将命令状态从未完成切换为超时
    17. if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
    18. //发布超时事件
    19. originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
    20. // shut down the original request
    21. s.unsubscribe();
    22. //调用上面创建的Runnable
    23. timeoutRunnable.run();
    24. //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
    25. }
    26. }
    27. //返回定义的超时时间
    28. @Override
    29. public int getIntervalTimeInMilliseconds() {
    30. return originalCommand.properties.executionTimeoutInMilliseconds().get();
    31. }
    32. };
    33. //拿到HystrixTimer定时器,添加定时任务
    34. final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
    35. ...
    36. return parent;
    37. }
  • 接下来看看HystrixTimer

    1. public class HystrixTimer {
    2. private static HystrixTimer INSTANCE = new HystrixTimer();
    3. private HystrixTimer() {
    4. // private to prevent public instantiation
    5. }
    6. //getInstance会返回一个静态实例
    7. public static HystrixTimer getInstance() {
    8. return INSTANCE;
    9. }
    10. public Reference<TimerListener> addTimerListener(final TimerListener listener) {
    11. startThreadIfNeeded();
    12. // add the listener
    13. //创建Runnable,内部调用传进来的listener.tick()方法,即超时时的操作
    14. Runnable r = new Runnable() {
    15. @Override
    16. public void run() {
    17. try {
    18. listener.tick();
    19. } catch (Exception e) {
    20. logger.error("Failed while ticking TimerListener", e);
    21. }
    22. }
    23. };
    24. //加入定时任务,这样当任务执行超过期限后,就会调用前面创建的runnable.run(),完成超时的控制
    25. ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
    26. return new TimerReference(listener, f);
    27. }
    28. }

6.Metrics

概述

  • 还记得在断路器中有一个Metrics成员变量吗,它是Hystrix中一个很重要的角色;Metrics中文意思可以理解为指标;
  • 每个CommandKey都会对应一个Metrics,负责监控统计命令执行的情况;内部维护了很多事件流;命令执行成功,失败,超时,异常等都会发布对应的事件,然后各种事件流接收到后做相应的统计
  • Metrics会将统计结果反馈给断路器,这样就可以根据统计结果,决定断路器打开还是关闭等操作
  • 先来看一下HystrixCommand创建时,有如下过程,会拿到属于它的metrics;到这里你会发现,构造一个HystrixCommand中需要的各种依赖,已经讲了不少了;需要注意的是,每次调用都会创建HystrixCommand,不必担心依赖这么多东西会导致效率问题,因为几乎所有的依赖都是有缓存的(可以debug一下看看);所以只有首次创建才会真正创建所有依赖

    1. protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
    2. HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
    3. HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
    4. HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
    5. this.commandGroup = initGroupKey(group);
    6. this.commandKey = initCommandKey(key, getClass());
    7. this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
    8. this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    9. //获取commnadKey对应的metrics
    10. this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    11. //获取断路器时,也传入了metrics
    12. this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
    13. this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
    14. }

源码分析(待更新)