1.引言
简介
Hystrix是一个开源的容错框架,可以用于隔离各个远程服务调用,防止因为某一个节点调用失败导致整个调用链路都崩溃
部分概念
服务降级:指的是当调用的服务出现异常/超时时,可以返回一个备用的方案,不至于直接抛异常
- 服务熔断:和服务降级有点类似,但是在Hystrix中还是比较容易区别的;服务熔断指的是当调用的服务异常/超时达到一定的阈值后,服务就会熔断;这时候再去调用服务,服务直接返回备用方案(降级);
- 熔断和降级区别:服务降级是有实际的去调用服务,但是因为服务故障,所以再去调用备用方案;而服务熔断则不会去调用真实服务,而直接调用备用方案
服务隔离:Hystrix中的服务隔离分为线程隔离和信号量隔离;当系统中某个服务故障后,如果没有隔离,而且访问量很大,会导致所有线程都去调用这个服务,而请求其他可用服务时,没有线程(资源)可以去执行了;所以需要对服务做隔离;以线程隔离为例,每种服务都有一个线程池,调用该服务时,会把请求交给它的线程池去执行;这样,即使该服务挂了,大量请求涌入,最终还是交给该服务的线程池执行,而其他服务的线程池(资源)不受影响,还是可用的
前置知识
Hystrix的源码中涉及到了大量的RxJava方面的内容,所以如果要有比较好的阅读体验,最好先了解一下RxJava的一些内容;
- RxJava运用了类似发布-订阅的模式,事件源发布事件后,可以对事件流做流式处理;在Android中应用的比较多
- 本文不会对RxJava做过多介绍(因为博主也不大会。。。),觉得有需要的可以先了解一下再看源码可能比较好理解一些
2.Hystrix总览
执行流程
- 先看一下官方给出的执行流程图
- 总体执行步骤如下
- 根据要调用的服务构造HystrixCommand,代表本次调用的服务;toObservable()方法会将服务构造成一个Observable对象,这样可以在该事件上做一些处理;这部分涉及RxJava知识
- 接下来执行HystrixCommand,先判断该服务是否有缓存结果,有则执行返回;没有则执行下一步
- 判断断路器CircuitBreaker是否打开,即服务的熔断状态,打开则执行调用fallback降级方法并返回;否则下一步
- 判断是否能够拿到资源(线程/信号量),不能则直接降级;否则下一步
- 执行服务,如果服务执行失败/超时,则调用降级方法返回;成功则直接返回
- 执行服务时,成功或失败都会反馈到图中的calculate circuit health,它的作用是监控服务的执行情况,并根据异常情况来决定打开还是关闭熔断器
源码分析
- 通过上面的分析可以看出,Hystrix的入口是HystrixCommand;测试时,可以自己构造一个HystrixCommand来执行服务,或者直接使用注解方式;
- 如果使用手动创建,只需要创建HystrixCommand并实现run方法即可
如果使用注解
@HystrixCommand
,那么Hystrix会在IOC容器中注入一个HystrixCommandAspect
切面类,该切面会对含有@HystrixCommand
注解的方法做代理;大致的逻辑就是封装该方法调用成为HystrixCommand对象,然后执行即可;源码可参考如下//HystrixCommandAspect#methodsAnnotatedWithHystrixCommand
//这是一个环绕增强,切点是@HystrixCommand注解的方法
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
...
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
//创建一个methodHolder,把joinPoint封装进来,这样后续joinPoint的执行都在HystrixCommand中
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
//这里会创建一个HystrixCommand对象,即本次调用服务,HystrixCommand同时也是HystrixInvokable类型
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
...
try {
if (!metaHolder.isObservable()) {
//接着就是执行了,这个方法调用最终会调用到HystrixCommand.execute()方法
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
}...
return result;
}
创建HystrixCommand时,由于它是抽象类,常用的实现类是GenericCommand;不过重点逻辑都在
HystrixCommand
和它的父类AbstractCommand
中;接下来看看AbstractCommand
的构造方法protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
//初始化命令及其组的key,这个key用于标识该服务,后面很多组件就可以通过key来拿到
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
//初始化一些属性
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
//metrics会负责统计该服务的各种调用情况
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
//该服务的熔断器
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
//该服务的线程池
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
需要注意的是,每次服务调用都会生成一个HystrixCommand,但是该对象中的很多依赖都是有缓存的,所以只有第一次生成该服务的HystrixCommand时,才会创建,后面直接从缓存取;
- 接下来看看HystrixCommand的execute方法,它是Hystrix方法执行的入口
```java
public R execute() {
try {
} catch (Exception e) {//queue()方法返回future,再调用get阻塞等待
return queue().get();
} }throw Exceptions.sneakyThrow(decomposeException(e));
public Future
- 接下来就是重点的toObservable()了,它会对服务进行封装,并且添加各种处理,这里面涉及很多RxJava知识;并且该方法比较长,此处做大量省略
```java
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
//命令执行完的回调操作
final Action0 terminateCommandCleanup = new Action0() {
...
};
//unsubscribe取消订阅时执行的操作
final Action0 unsubscribeCommandCleanup = new Action0() {
...
};
//这个是执行命令时的回调函数,重点
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
...
};
final Action0 fireOnCompletedHook = new Action0() {
...
};
//创建Observable,设置各种处理操作
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//设置已启动标志
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
...
//缓存处理
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
//创建Observable,注意内部使用applyHystrixSemantics来生成Observable
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache 根据是否缓存来做处理
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
//这个afterCache最终是前面的hystrixObservable,或由他封装而来,
//这里设置各种监听处理操作
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
- 最终生成Observble的逻辑是applyHystrixSemantics由这个匿名内部类来执行,再看一下上面的这个类
```java
final Func0
> applyHystrixSemantics = new Func0 >() { @Override public Observable call() {
} };if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
//调用AbstractCommand的方法
return applyHystrixSemantics(_cmd);
private Observable
//执行前判断断路器是否打开
if (circuitBreaker.allowRequest()) {
//拿到该服务的信号量
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//调用executeCommandAndObserve创建Observable,并设置各种回调
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//获取失败则降级
return handleSemaphoreRejectionViaFallback();
}
} else {
//断路器已打开,直接降级
return handleShortCircuitViaFallback();
}
}
- 接下来是AbstractCommand#executeCommandAndObserve方法创建Observable
```java
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
//下面仍然是几个回调匿名内部类
final Action1<R> markEmits = new Action1<R>() {
...
};
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
..
};
Observable<R> execution;
//判断超时功能是否打开
if (properties.executionTimeoutEnabled().get()) {
//如果打开了,则通过lift操作来加入超时控制
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
executeCommandWithSpecifiedIsolation()方法会根据不同的隔离策略,生成不同的Observable;主要有按线程隔离和按信号量隔离
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
//线程隔离
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
//创建一个Observable
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
...
//设置线程启动
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
//返回一个Observable,这个函数最终会返回一个封装了我们的run()逻辑的Observable
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
//回调设置
}).doOnTerminate(new Action0() {
...
}).doOnUnsubscribe(new Action0() {
...
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
...
}));
} else {
//信号量隔离
...
}
}
到这里,就创建完成一个Observable对象了,创建过程中设置了各种回调操作;这些操作包括了超时监控,服务隔离执行,调用结果反馈等等;而这些才是Hystrix的重点,后面会慢慢解析;
3.Hystrix断路器
核心类
- Hystrix对于每一个命令(HystrixCommand)都有一个相关的断路器
HystrixCircuitBreaker
;断路器负责管理该命令是否熔断 HystrixCircuitBreaker
是断路器的接口,声明的方法比较简单public interface HystrixCircuitBreaker {
//HystrixCommand命令执行前,会调用该方法,判断是否可以执行,不行就直接降级
public boolean allowRequest();
//判断当前断路器是否打开,打开则服务熔断
public boolean isOpen();
//标记命令执行成功,这个方法会关闭掉断路器
void markSuccess();
}
每一个HystrixCommand命令都有一个断路器,在构造该对象的时候会从缓存中取出该命令的断路器;可以翻一下第二节中的源码,在这里再次贴一下HystrixCommand构造时的部分内容
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
//获取该命令的断路器
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
}
HystrixCircuitBreakerImpl
:是断路器的默认实现,看一下其成员变量static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
//存储相关属性
private final HystrixCommandProperties properties;
//监控指标,metrics负责该命令调用的各项指标信息,断路器会根据这些指标信息决定是否熔断
private final HystrixCommandMetrics metrics;
//标志断路器是否打开
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
}
源码分析
前面第二节分析HystrixCommand执行流程的时候,有如下过程会判断断路器是否打开
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
//执行前判断断路器是否打开
if (circuitBreaker.allowRequest()) {
//拿到该服务的信号量
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
}
...
}
接下来看一下
HystrixCircuitBreakerImpl
的allowRequest方法@Override
public boolean allowRequest() {
//检查是否配置了forceOpen属性,是则返回false,代表不允许执行
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
//检查是否配置了forceClose属性,是则返回true,代表允许执行
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
//如果没配置以上属性,那么断路器会根据当前命令调用情况判断断路器是否打开
return !isOpen() || allowSingleTest();
}
看一下
isOpen
方法@Override
public boolean isOpen() {
//若断路器已打开,直接返回true
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
//如果现在断路器关闭,还需要通过metrics的指标判断以下是否已经达到了打开的标准
HealthCounts health = metrics.getHealthCounts();
// 判断请求次数是否到达阈值,未到达则不进行判断,直接返回false
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}
//判断异常率是否小于阈值
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
//到这里说明异常率高于阈值了,需要打开断路器
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
//这里说明其他线程已经设置了断路器打开,直接返回true
return true;
}
}
}
isOpen
会根据metrics统计的信息,判断断路器是否应该打开;如果isOpen返回true,代表断路器打开;但是打开了也可能会执行该命令(不然断路器一直打开,就永远没法执行了);因为Hystrix在断路器打开过了一定的时间窗口后,会放过去少量请求去执行,探测一下是否执行成功,成功就会关闭断路器;这就是接下来allowSingleTest
方法的内容public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
//如果当前断路器已经打开,并且已经过了时间窗口指定的时间,则允许执行
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}
当命令执行成功后,会调用markSuccess命令,关闭断路器,并重置Metrics的统计信息
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
//win the thread race to reset metrics
//Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,
//and all other metric consumers are unaffected by the reset
metrics.resetStream();
}
}
}
4.Hystrix隔离策略
概述
- Hystrix的隔离策略主要有
- 信号量隔离:每个命令执行前必须拿到信号量,如果拿不到,就调用fallback方法降级
- 线程隔离:任务执行在单独的线程里
源码分析
在第二节分析命令执行过程中,有如下过程
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
...
//判断断路器是否打开
if (circuitBreaker.allowRequest()) {
//拿到执行的信号量,不同隔离策略的信号量实现类不同
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
...
//尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//获取失败则直接降级
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
从上面可以看出,无论是哪种隔离策略,都必须拿到信号量才行;而不同的隔离策略,拿到的信号量是不同的
protected TryableSemaphore getExecutionSemaphore() {
//信号量隔离
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
//若这个变量不为空,直接返回该信号量
if (executionSemaphoreOverride == null) {
//根据commandKey拿到属于这个命令的信号量,每个断路器都有一个信号量
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
//如果没有则创建并缓存
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return executionSemaphoreOverride;
}
} else {
//线程隔离策略的话,返回NoOp类型的信号量,这种信号量其实是空的
return TryableSemaphoreNoOp.DEFAULT;
}
}
当策略是线程隔离时,返回NoOp类型的信号量,这种信号量直接是不操作,返回true
static class TryableSemaphoreNoOp implements TryableSemaphore {
public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
//可以看到并没有维护信号量,而是直接返回true
@Override
public boolean tryAcquire() {
return true;
}
@Override
public void release() {
}
...
}
拿到执行的信号量之后,
executeCommandAndObserve
方法里面会调用如下方法private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
//判断是否为线程隔离
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
//返回Observable,注意最后一个方法调用subscribeOn()该方法是指定Observable在另外的线程中执行
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
...
}
}).doOnTerminate(new Action0() {
...
}).doOnUnsubscribe(new Action0() {
...
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
...
}));
} else {
//信号量隔离
return Observable.defer(new Func0<Observable<R>>() {
...
}
}
Observable.subscribeOn()
方法是RxJava中的用法,该方法会传入一个Scheduler,让Observable.call()
在该调度器的线程中执行,从而实现线程隔离
5.Hystrix超时
概述
- Hystrix当开启超时控制时,会在命令执行时启动一个定时任务,当命令执行超过指定时间后,触发定时任务;
- 该定时任务会把命令的状态切换为超时,并且发布超时事件,这样观察者就可以做出相应的操作了(如结束命令,反馈执行情况等)
源码分析
在第二节执行HystrixCommand命令时,会经过如下方法
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
...
Observable<R> execution;
//判断是否开启超时控制
if (properties.executionTimeoutEnabled().get()) {
//超时控制时,会调用lift方法,传入超时控制器HystrixObservableTimeoutOperator
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
...
}
lift方法也是RxJava中的方法,会对Observable执行后的返回结果进行类型变换;重点看一下
HystrixObservableTimeoutOperator
类的call方法public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
// if the child unsubscribes we unsubscribe our parent as well
child.add(s);
//超时的策略,调用onError发布超时事件
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
@Override
public void run() {
child.onError(new HystrixTimeoutException());
}
});
//当命令超时时,会调用listener的tick方法
TimerListener listener = new TimerListener() {
@Override
public void tick() {
//尝试将命令状态从未完成切换为超时
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
//发布超时事件
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// shut down the original request
s.unsubscribe();
//调用上面创建的Runnable
timeoutRunnable.run();
//if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
}
}
//返回定义的超时时间
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
//拿到HystrixTimer定时器,添加定时任务
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
...
return parent;
}
接下来看看HystrixTimer
public class HystrixTimer {
private static HystrixTimer INSTANCE = new HystrixTimer();
private HystrixTimer() {
// private to prevent public instantiation
}
//getInstance会返回一个静态实例
public static HystrixTimer getInstance() {
return INSTANCE;
}
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
// add the listener
//创建Runnable,内部调用传进来的listener.tick()方法,即超时时的操作
Runnable r = new Runnable() {
@Override
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
//加入定时任务,这样当任务执行超过期限后,就会调用前面创建的runnable.run(),完成超时的控制
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
}
6.Metrics
概述
- 还记得在断路器中有一个Metrics成员变量吗,它是Hystrix中一个很重要的角色;Metrics中文意思可以理解为指标;
- 每个CommandKey都会对应一个Metrics,负责监控统计命令执行的情况;内部维护了很多事件流;命令执行成功,失败,超时,异常等都会发布对应的事件,然后各种事件流接收到后做相应的统计
- Metrics会将统计结果反馈给断路器,这样就可以根据统计结果,决定断路器打开还是关闭等操作
先来看一下HystrixCommand创建时,有如下过程,会拿到属于它的metrics;到这里你会发现,构造一个HystrixCommand中需要的各种依赖,已经讲了不少了;需要注意的是,每次调用都会创建HystrixCommand,不必担心依赖这么多东西会导致效率问题,因为几乎所有的依赖都是有缓存的(可以debug一下看看);所以只有首次创建才会真正创建所有依赖
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
//获取commnadKey对应的metrics
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
//获取断路器时,也传入了metrics
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
}