Hystrix 提供了降级、熔断、限流、线程池隔离等功能

  • 降级:当访问某个微服务时,出现错误(或超时),这会调用降级方法
  • 熔断:当访问某个微服务时,出现错误(或超时)达到一定的次数时,则判定为该服务出现了问题,则直接调用降级方法
  • 限流:当对某个微服务进行限流时,超过的流量将直接 Fallback,即熔断

应用

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
  4. </dependency>
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@EnableHystrix
public class AppUserClient {
    public static void main(String[] args) {
        SpringApplication.run(AppUserClient.class);
    }
}

方法降级

针对某个方法进行降级

@RequestMapping("/power/get/{id}")
@HystrixCommand(fallbackMethod = "getOrderFallback", threadPoolKey = "power",
                threadPoolProperties = {
                    @HystrixProperty(name = "coreSize", value = "5"),
                    @HystrixProperty(name = "maxQueueSize", value = "2")})
public R getPower(@PathVariable String id) {
    System.out.println("被调用了");
    return restTemplate.getForObject(POWER_URL + "/power/get/" + id, R.class);
}

public R getOrderFallback(@PathVariable String id) {
    return R.error("被降级了");
}

微服务降级

针对某个微服务进行降级

@FeignClient(name = "server-power", fallback = PowerFeignFallback.class)
public interface PowerFeignClient {

    @RequestMapping("/power/get/{id}")
    R getPower(@PathVariable("id") String id);

}

@Component // 一定要加 @Component
public class PowerFeignFallback implements PowerFeignClient {

    @Override
    public R getPower(String id) {
        return R.error("被降级了,暂时不可用");
    }
}

源码解析

初始化流程

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@EnableCircuitBreaker
public @interface EnableHystrix {

}

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {

}

ImportSelector 接口是至 Spring 中导入外部配置的核心接口,在 SpringBoot 的自动化配置和 @EnableXXX 都有它的存在,通过其的 selectImports 方法会返回一个包含所有类名的数组对象

@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector
        extends SpringFactoryImportSelector<EnableCircuitBreaker> {

    @Override
    protected boolean isEnabled() {
        return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
                Boolean.class, Boolean.TRUE);
    }

}

public abstract class SpringFactoryImportSelector<T>
        implements DeferredImportSelector, BeanClassLoaderAware, EnvironmentAware {

    @Override
    public String[] selectImports(AnnotationMetadata metadata) {
        if (!isEnabled()) {
            return new String[0];
        }
        AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                metadata.getAnnotationAttributes(this.annotationClass.getName(), true));

        Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
                + metadata.getClassName() + " annotated with @" + getSimpleName() + "?");

        // Find all possible auto configuration classes, filtering duplicates
        List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
                .loadFactoryNames(this.annotationClass, this.beanClassLoader)));

        if (factories.isEmpty() && !hasDefaultFactory()) {
            throw new IllegalStateException("Annotation @" + getSimpleName()
                    + " found, but there are no implementations. Did you forget to include a starter?");
        }

        if (factories.size() > 1) {
            // there should only ever be one DiscoveryClient, but there might be more than
            // one factory
            this.log.warn("More than one implementation " + "of @" + getSimpleName()
                    + " (now relying on @Conditionals to pick one): " + factories);
        }

        return factories.toArray(new String[factories.size()]);
    }

}

其中: SpringFactoriesLoader.loadFactoryNames(this.annotationClass, this.beanClassLoader)) 会读取 spring.factories 里面对应的 key 指定的 value 类,然后通过 factories.toArray(new String[factories.size()]) 将其加载到 Spring 容器中

这里会通过 org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker 返回 org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

D:\maven_repo\org\springframework\cloud\spring-cloud-netflix-hystrix\2.1.4.RELEASE\spring-cloud-netflix-hystrix-2.1.4.RELEASE.jar!\META-INF\spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

HystrixCircuitBreakerConfiguration 配置类

HystrixCircuitBreakerConfiguration 配置类,定义了一个 HystrixCommandAspect 切面对象,来处理 Controller 的请求信息

@Configuration
public class HystrixCircuitBreakerConfiguration {

    @Bean
    public HystrixCommandAspect hystrixCommandAspect() {
        return new HystrixCommandAspect();
    }

    @Bean
    public HystrixShutdownHook hystrixShutdownHook() {
        return new HystrixShutdownHook();
    }

    @Bean
    public HasFeatures hystrixFeature() {
        return HasFeatures
                .namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
    }

    /**
     * {@link DisposableBean} that makes sure that Hystrix internal state is cleared when
     * {@link ApplicationContext} shuts down.
     */
    private class HystrixShutdownHook implements DisposableBean {

        @Override
        public void destroy() throws Exception {
            // Just call Hystrix to reset thread pool etc.
            Hystrix.reset();
        }

    }

}

HystrixCommandAspect 切面

methodsAnnotatedWithHystrixCommand 方法会在调用 Controller 之前先调用

@Aspect
public class HystrixCommandAspect {

    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        // 初始化断路器、线程池等
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

        Object result;
        try {
            if (!metaHolder.isObservable()) {
                // 执行熔断
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause();
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
}

HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); 创建了一个 Hystrix 执行对象

public class HystrixCommandFactory {

    public HystrixInvokable create(MetaHolder metaHolder) {
        HystrixInvokable executable;
        if (metaHolder.isCollapserAnnotationPresent()) {
            executable = new CommandCollapser(metaHolder);
        } else if (metaHolder.isObservable()) {
            executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        } else {
            // 执行这一步操作
            executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
        }
        return executable;
    }

}

new GenericCommand 的时候会调用其父类 AbstractCommand 来初始化一系列组件

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

        this.commandGroup = initGroupKey(group);
        this.commandKey = initCommandKey(key, getClass());
        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

        //Strategies from plugins
        this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
        this.executionHook = initExecutionHook(executionHook);

        this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
        this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

        /* fallback semaphore override if applicable */
        this.fallbackSemaphoreOverride = fallbackSemaphore;

        /* execution semaphore override if applicable */
        this.executionSemaphoreOverride = executionSemaphore;
    }    
}

熔断器

this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), 
                                         circuitBreaker, this.commandGroup, this.commandKey, 
                                         this.properties, this.metrics);

private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    if (enabled) {
        if (fromConstructor == null) {
            // get the default implementation of HystrixCircuitBreaker
            return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
        } else {
            return fromConstructor;
        }
    } else {
        return new NoOpCircuitBreaker();
    }
}

实例化熔断器

public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    // this should find it for all but the first time
    HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // if we get here this is the first time so we need to initialize

    // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
    // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
    // If 2 threads hit here only one will get added and the other will get a non-null response instead.
    HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
    if (cbForCommand == null) {
        // this means the putIfAbsent step just created a new one so let's retrieve and return it
        return circuitBreakersByCommand.get(key.name());
    } else {
        // this means a race occurred and while attempting to 'put' another one got there before
        // and we instead retrieved it and will now return it
        return cbForCommand;
    }
}

熔断器的实现

static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private final HystrixCommandProperties properties;
    private final HystrixCommandMetrics metrics;

    // 熔断开关
    /* track whether this circuit is open/closed at any given point in time (default to false==closed) */
    private AtomicBoolean circuitOpen = new AtomicBoolean(false);

    // 半开的时间记录
    /* when the circuit was marked open or was last allowed to try a 'singleTest' */
    private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;
    }

    public void markSuccess() {
        if (circuitOpen.get()) {
            if (circuitOpen.compareAndSet(true, false)) {
                //win the thread race to reset metrics
                //Unsubscribe from the current stream to reset the health counts stream.  This only affects the health counts view,
                //and all other metric consumers are unaffected by the reset
                metrics.resetStream();
            }
        }
    }

    @Override
    public boolean allowRequest() {
        if (properties.circuitBreakerForceOpen().get()) {
            // properties have asked us to force the circuit open so we will allow NO requests
            return false;
        }
        if (properties.circuitBreakerForceClosed().get()) {
            // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
            isOpen();
            // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
            return true;
        }
        return !isOpen() || allowSingleTest();
    }

    public boolean allowSingleTest() {
        long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
        // 1) if the circuit is open
        // 2) and it's been longer than 'sleepWindow' since we opened the circuit
        if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
            // We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
            // If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
            if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                // if this returns true that means we set the time so we'll return true to allow the singleTest
                // if it returned false it means another thread raced us and allowed the singleTest before we did
                return true;
            }
        }
        return false;
    }

    @Override
    public boolean isOpen() {
        if (circuitOpen.get()) {
            // if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
            return true;
        }

        // we're closed, so let's see if errors have made us so we should trip the circuit open
        HealthCounts health = metrics.getHealthCounts();

        // check if we are past the statisticalWindowVolumeThreshold
        if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
            // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
            return false;
        }

        if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
            return false;
        } else {
            // our failure rate is too high, trip the circuit
            if (circuitOpen.compareAndSet(false, true)) {
                // if the previousValue was false then we want to set the currentTime
                circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                return true;
            } else {
                // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
                // caused another thread to set it to true already even though we were in the process of doing the same
                // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
                return true;
            }
        }
    }

}

总结

在程序入口中定义了 AOP 的切面,通过环绕通知代理 Hystrix 的注解,所有加了 @HystrixCommand 注解的方法都会进入代理方法,在代理方法中,会读取注解中的配置,执行初始化配置,构建一个熔断器,线程池等,配置来源都是 yml 和 注解中的属性,然后会构建为一个 Command 对象,回去判断断路器打开和半开的情况