Hystrix 提供了降级、熔断、限流、线程池隔离等功能
- 降级:当访问某个微服务时,出现错误(或超时),这会调用降级方法
- 熔断:当访问某个微服务时,出现错误(或超时)达到一定的次数时,则判定为该服务出现了问题,则直接调用降级方法
- 限流:当对某个微服务进行限流时,超过的流量将直接 Fallback,即熔断
应用
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-hystrix</artifactId></dependency>
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@EnableHystrix
public class AppUserClient {
public static void main(String[] args) {
SpringApplication.run(AppUserClient.class);
}
}
方法降级
针对某个方法进行降级
@RequestMapping("/power/get/{id}")
@HystrixCommand(fallbackMethod = "getOrderFallback", threadPoolKey = "power",
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "5"),
@HystrixProperty(name = "maxQueueSize", value = "2")})
public R getPower(@PathVariable String id) {
System.out.println("被调用了");
return restTemplate.getForObject(POWER_URL + "/power/get/" + id, R.class);
}
public R getOrderFallback(@PathVariable String id) {
return R.error("被降级了");
}
微服务降级
针对某个微服务进行降级
@FeignClient(name = "server-power", fallback = PowerFeignFallback.class)
public interface PowerFeignClient {
@RequestMapping("/power/get/{id}")
R getPower(@PathVariable("id") String id);
}
@Component // 一定要加 @Component
public class PowerFeignFallback implements PowerFeignClient {
@Override
public R getPower(String id) {
return R.error("被降级了,暂时不可用");
}
}
源码解析
初始化流程
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
ImportSelector 接口是至 Spring 中导入外部配置的核心接口,在 SpringBoot 的自动化配置和 @EnableXXX 都有它的存在,通过其的 selectImports 方法会返回一个包含所有类名的数组对象
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
extends SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
Boolean.class, Boolean.TRUE);
}
}
public abstract class SpringFactoryImportSelector<T>
implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {
@Override
public String[] selectImports(AnnotationMetadata metadata) {
if (!isEnabled()) {
return new String[0];
}
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
// Find all possible auto configuration classes, filtering duplicates
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
if (factories.isEmpty() && !hasDefaultFactory()) {
throw new IllegalStateException("Annotation @" + getSimpleName()
+ " found, but there are no implementations. Did you forget to include a starter?");
}
if (factories.size() > 1) {
// there should only ever be one DiscoveryClient, but there might be more than
// one factory
this.log.warn("More than one implementation " + "of @" + getSimpleName()
+ " (now relying on @Conditionals to pick one): " + factories);
}
return factories.toArray(new String[factories.size()]);
}
}
其中: SpringFactoriesLoader.loadFactoryNames(this.annotationClass, this.beanClassLoader)) 会读取 spring.factories 里面对应的 key 指定的 value 类,然后通过 factories.toArray(new String[factories.size()]) 将其加载到 Spring 容器中
这里会通过 org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker 返回 org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration 类
D:\maven_repo\org\springframework\cloud\spring-cloud-netflix-hystrix\2.1.4.RELEASE\spring-cloud-netflix-hystrix-2.1.4.RELEASE.jar!\META-INF\spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
HystrixCircuitBreakerConfiguration 配置类
HystrixCircuitBreakerConfiguration 配置类,定义了一个 HystrixCommandAspect 切面对象,来处理 Controller 的请求信息
@Configuration
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
@Bean
public HystrixShutdownHook hystrixShutdownHook() {
return new HystrixShutdownHook();
}
@Bean
public HasFeatures hystrixFeature() {
return HasFeatures
.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
}
/**
* {@link DisposableBean} that makes sure that Hystrix internal state is cleared when
* {@link ApplicationContext} shuts down.
*/
private class HystrixShutdownHook implements DisposableBean {
@Override
public void destroy() throws Exception {
// Just call Hystrix to reset thread pool etc.
Hystrix.reset();
}
}
}
HystrixCommandAspect 切面
methodsAnnotatedWithHystrixCommand 方法会在调用 Controller 之前先调用
@Aspect
public class HystrixCommandAspect {
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
// 初始化断路器、线程池等
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
// 执行熔断
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
}
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); 创建了一个 Hystrix 执行对象
public class HystrixCommandFactory {
public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
// 执行这一步操作
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}
}
在 new GenericCommand 的时候会调用其父类 AbstractCommand 来初始化一系列组件
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
this.executionHook = initExecutionHook(executionHook);
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
/* fallback semaphore override if applicable */
this.fallbackSemaphoreOverride = fallbackSemaphore;
/* execution semaphore override if applicable */
this.executionSemaphoreOverride = executionSemaphore;
}
}
熔断器
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(),
circuitBreaker, this.commandGroup, this.commandKey,
this.properties, this.metrics);
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// get the default implementation of HystrixCircuitBreaker
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
return fromConstructor;
}
} else {
return new NoOpCircuitBreaker();
}
}
实例化熔断器
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// this should find it for all but the first time
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
// Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
// 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
// If 2 threads hit here only one will get added and the other will get a non-null response instead.
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
// this means the putIfAbsent step just created a new one so let's retrieve and return it
return circuitBreakersByCommand.get(key.name());
} else {
// this means a race occurred and while attempting to 'put' another one got there before
// and we instead retrieved it and will now return it
return cbForCommand;
}
}
熔断器的实现
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
// 熔断开关
/* track whether this circuit is open/closed at any given point in time (default to false==closed) */
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
// 半开的时间记录
/* when the circuit was marked open or was last allowed to try a 'singleTest' */
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
}
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
//win the thread race to reset metrics
//Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,
//and all other metric consumers are unaffected by the reset
metrics.resetStream();
}
}
}
@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest();
}
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();
// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}
}
总结
在程序入口中定义了 AOP 的切面,通过环绕通知代理 Hystrix 的注解,所有加了 @HystrixCommand 注解的方法都会进入代理方法,在代理方法中,会读取注解中的配置,执行初始化配置,构建一个熔断器,线程池等,配置来源都是 yml 和 注解中的属性,然后会构建为一个 Command 对象,回去判断断路器打开和半开的情况
