Eureka-Ribbon-Feign框架整合前提

Eureka-Ribbon整合

Ribbon的 ILoadBalancer 里的 ServerList 会自动每隔30秒从eureka client里去获取本地的最新的注册表,根据注册表里的服务的server list,来进行负载均衡

Ribbon-Feign整合

Feign在发起一个请求之前,一定会使用Ribbon的 ILoadBalancer 去进行负载均衡,从一堆server list中获取一个server,然后再针对哪个server发起请求

Feign-Hystrix整合

Feign发起请求的时候,会基于hystrix来进行各个服务的隔离、超时、异常、限流、降级、熔断

Hystrix-Turbine-Dashboard整合

来看到一个最最基础的微服务架构的仪表盘,请求次数、请求延时

1-Hystrix源码入口,在执行Feign动态代理过程

1-1. Feign与Hystrix整合动态代理源码分析

在上一节Feign接受Request处理时候,方法在 targeter.target(this, builder, context, target) 进入, 在没结合Hystrix配置时候使用的是 Feign.builder().retryer(retryer)HystrixTargeter 生成动态代理处理,但是当结合Hystrix,就执行如下:

  1. //FeignClientFactoryBean#loadBalance(..) --> targeter.target(this, builder, context, target) 当中target为HystrixTargeter的target()方法
  2. //
  3. @Override
  4. public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context,
  5. Target.HardCodedTarget<T> target) {
  6. //代码省略.....
  7. //本接分析的是这里整合hytrix框架的feign,执行代码的流程....
  8. feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
  9. SetterFactory setterFactory = getOptional(factory.getName(), context,
  10. SetterFactory.class);
  11. if (setterFactory != null) {
  12. builder.setterFactory(setterFactory);
  13. }
  14. Class<?> fallback = factory.getFallback();
  15. if (fallback != void.class) {
  16. return targetWithFallback(factory.getName(), context, target, builder, fallback);
  17. }
  18. //获取到指定得fallbackFactory对象,进入targetWithFallbackFactory()方法中
  19. Class<?> fallbackFactory = factory.getFallbackFactory();
  20. if (fallbackFactory != void.class) {
  21. return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory);
  22. }
  23. return feign.target(target);
  24. }

这里debug如图:
image.png

fallbackFactoryClass : com.zhss.service.b.ServiceAClient$ServiceAClientFallbackFactory target : Target.HardCodedTarget【就是内有接口类型(com.zhss.service.ServiceAClient)、服务名称(ServiceA)、url地址(http://ServiceA)等信息】 builder : Feign.Builder【LoggerEncoderDecoderContract 这些对象。这些对象可都是feign的重要组件】 feignClientName: ServiceA context:ServiceA的上下文本

由于FallbackFactory在ServiceAClient内部构件了一个 ServiceAClientFallbackFactory ,此为在 @FeignClient(value = "ServiceA",configuration = MyConfiguration.class,fallbackFactory = ServiceAClient.ServiceAClientFallbackFactory.class) 注解中注入的相关信息,结合上列代入参数进入到targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory) 方法中

private <T> T targetWithFallbackFactory(String feignClientName, FeignContext context,
                                        Target.HardCodedTarget<T> target,
                                        HystrixFeign.Builder builder,
                                        Class<?> fallbackFactoryClass) {
    //我们设置的那个FallbackFactory,负责在每次超时、拒绝(线程池满)、异常的时候,create()方法返回一个降级机制的对象
    FallbackFactory<? extends T> fallbackFactory = (FallbackFactory<? extends T>)
    getFromContext("fallbackFactory", feignClientName, context, fallbackFactoryClass, FallbackFactory.class);

    //从服务(ServiceA)的独立的spring容器中取出来一个独立的FallbackFactory,调用每个服务的时候,他对应的FallbackFactory都是存在于那个服务关联的独立的spring容器中的
    Object exampleFallback = fallbackFactory.create(new RuntimeException());
    //代码省略.....
    //生成动态带理对象,把fallbackFactory作为入参代入
    return builder.target(target, fallbackFactory);
}
  1. FallbackFactory去创建一个Fallback对象出来
  2. 生成target代理对象,并把FallbackFactory作为入参代入

后面 builder.target(target, fallbackFactory) 进入代理源码分析

1-2.代理 HystrixFeign 生成源码分析

//1. 执行feign.hystrix.HystrixFeign.Builder#target(..)
public <T> T target(Target<T> target, FallbackFactory<? extends T> fallbackFactory) {
    return build(fallbackFactory).newInstance(target);
}

//2.在target方法中build(fallbackFactory)方法
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
    super.invocationHandlerFactory(new InvocationHandlerFactory() {
        @Override public InvocationHandler create(Target target,
                                                  Map<Method, MethodHandler> dispatch) {
           //后面request就是在这里通过HystrixInvocationHandler#invoke()方法进行拦截
            return new HystrixInvocationHandler(target, dispatch, setterFactory, nullableFallbackFactory);
        }
    });
    super.contract(new HystrixDelegatingContract(contract));
    return super.build();
}

和Feign之前执行一样,最终实际用来去处理这个请求的,其实是 InvocationHandler ,他是JDK动态代理的核心,基于JDK动态代理机制,生成一个动态代理的对象之后,对这个对象所有的方法调用,都会走关联的那个 InvocationHandler

return new HystrixInvocationHandler(target, dispatch, setterFactory, nullableFallbackFactory) 中 target:你要调用的服务image.png dispatch:map,接口的每个方法的Method对象 -> SynchronousMethodHandler 5.Hystrix源码学习 - 图4
image.png setterFactory:[这里可以认为是空,但实际是SetterFactory.Default ] nullableFallbackFactory:我们给的那个降级对象的工程,fallback工程 image.png HystrixInvocationHandler -> 包含了上面的4样东西 super.contract(new HystrixDelegatingContract(contract))

而当执行 super.contract(new HystrixDelegatingContract(contract)) , 就是Contract 解析第三方注解的组件,设置为了 HystrixDelegatingContract ,顾名思义就是说,设置了这个组件之后,后面就可以解析你在各个接口上打的这个 @HystirxCommand 以及其他的一些注解,hystrix相关的一些注解
接着调用 super.build() 就是Feign.Builder,后面的构造动态代理的逻辑,几乎都是一样了

2.Hystrix执行Request请求处理源码分析

2-1. HystrixInvocationHandler#invoke()拦截处理

和Feign一样,处理Request请求时候也是有拦截器拦截进行处理,这里主要由 HystrixInvocationHandler#invoke() 方法进行拦截处理,代码如下:

//feign.hystrix.HystrixInvocationHandler#invoke方法,处理request请求处理
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
    //代码省略....、

    //HystrixInvocationHandler#invoke方法内第一步构造了HystrixCommand类,
    //当中setterMethodMap.get(method)这里的map其实是通过HystrixCommand.Setter配置而成的,
    //所以setterMethodMap里面包含了Setter中HystrixCommandGroupKey、HystrixCommandKey等等相关HystrixCommond的相关配置信息---这点可以看demo即可
    HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) {
        @Override
        protected Object run() throws Exception {
            try {
                //这里是线程池执行线程处理执行通过对应的dispatch获取对应method名后,执行invoke(..)方法
                return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
            } catch (Exception e) {
                throw e;
            } catch (Throwable t) {
                throw (Error) t;
            }
        }

        //1.当hystrix command执行发生问题时候,就会调用这个getFallback()方法,获取一个Object降级对象
        @Override
        protected Object getFallback() {
            //1.确认降级的fallbackFactory有没有创建,这里我们已经在demo中构建好了
            if (fallbackFactory == null) {
                return super.getFallback();
            }
            try {
                //1-1.getExecutionException():获取command执行的异常,熔断、拒绝、超时、报错,都会给你,你自己来决定,根据各种不同的异常,如何实现降级
                //1-2.fallbackFactory.create(getExecutionException()):去创建一个fallback对象,这里是我们自定义的ServiceAClient下的内名内部类,包含了降级的逻辑
                Object fallback = fallbackFactory.create(getExecutionException());

                //1-3.fallbackMethodMap一看就是包含了接口的各种方法,
                //根据你要调用的这个方法,获取一个方法对应的Method对象,
                //然后对fallback object调用那个Method方法,传入args参数
                Object result = fallbackMethodMap.get(method).invoke(fallback, args);
                if (isReturnsHystrixCommand(method)) {
                    return ((HystrixCommand) result).execute();
                } else if (isReturnsObservable(method)) {
                    // Create a cold Observable
                    return ((Observable) result).toBlocking().first();
                } else if (isReturnsSingle(method)) {
                    // Create a cold Observable as a Single
                    return ((Single) result).toObservable().toBlocking().first();
                } else if (isReturnsCompletable(method)) {
                    ((Completable) result).await();
                    return null;
                } else {
                    return result;
                }
            } catch (IllegalAccessException e) {
                // shouldn't happen as method is public due to being an interface
                throw new AssertionError(e);
            } catch (InvocationTargetException e) {
                // Exceptions on fallback are tossed by Hystrix
                throw new AssertionError(e.getCause());
            }
        }
    };

    //2.这里就是对HystrixCommand判断method返回的类型
    //2-1. isReturnsHystrixCommand(method):判断一下method的返回类型,是否是HystrixCommand类型
    if (isReturnsHystrixCommand(method)) { 
        return hystrixCommand;
    //2-2. sReturnsObservable(method):判断一下method的返回类型,是否是Observable类型
    } else if (isReturnsObservable(method)) {
        // Create a cold Observable
        return hystrixCommand.toObservable();
    } else if (isReturnsSingle(method)) {
        // Create a cold Observable as a Single
        return hystrixCommand.toObservable().toSingle();
    } else if (isReturnsCompletable(method)) {
        return hystrixCommand.toObservable().toCompletable();
    }
    //最后通过构建好的hystrixCommand对象执行execute()方法,完成后返回结果
    return hystrixCommand.execute();
}

分析归纳:

  1. 开始进来使用method先在setterMathodMap中获取了 HystrixCommand.Setter 来构建了 HystrixCommand ,当HystrixCommand执行run方法时候,就是调用的方法体:return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);而这个dispatch就是之前创建的 Map<Method, MethodHandler> ,获取到某一个 SynchronousMethodHandler 来进行处理请求
  2. HystrixCommand 中还有一个 getFallback() 方法 , 在hystrix command执行发生问题的时候,会调用这个command的fallback逻辑:熔断(断路器打开了,直接就会fallback)、拒绝(线程池满了,信号量满了)、超时(核心逻辑执行超时)、异常(如果核心逻辑执行报错,比如说远程接口报错,往外面抛异常)
  3. 判断method返回类型:如果返回结果是 HystrixCommand 的话,在此处就直接进行返回,不会去执行command(提供的扩展方法)

后面就是下一节: hystrixCommand.execute() 分析

2-2. hystrixCommand.execute() 分析

当进入 hystrixCommand.execute() 方法后,重点分析当中 queue() 方法:

//HystrixCommand#execute()方法
public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

当中执行的queue() 方法:
image.png
执行流程:

  1. 通过执行 Future<R> delegate = toObservable().toBlocking().toFuture() , 把这个command放进线程池里去执行结果,等待获取一个Future对象。但是由于此对象不支持异常情况下线程的终止,所以hystrix再次对 Future<R> delegate 进行了封装,封装成了 Future<R> f (下图标注)
  2. 此处封装的代码如下图,最主要的是封装了cancel方法,此处涉及到一个参数: execution.isolation.thread.interruptOnFutureCancel ,当隔离策略为THREAD时,当执行线程执行超时时,是否进行中断处理,即 Future#cancel(true) 处理,默认为false。 ```java /**
    • queue()方法内 **/ //1.Future对象,是不具备,因为一些异常的原因,中断这个线程执行的能力的,比如超时、异常,你没办法在异常情况下,终止future对应的线程的执行,所以说要对这里返回的delegate future进行包装 final Future delegate = toObservable().toBlocking().toFuture();

//2. 所以上面1方法中的delegate在这步骤中再封装了一次Future f ,所以说这个新创建的future对象,其实包装了上面的原生的delegate future对象 final Future f = new Future() { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (delegate.isCancelled()) { return false; }

            if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {

                interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
            }

            final boolean res = delegate.cancel(interruptOnFutureCancel.get());

            if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                final Thread t = executionThread.get();
                if (t != null && !t.equals(Thread.currentThread())) {
                    t.interrupt();
                }
            }

            return res;
        }

        @Override
        public boolean isCancelled() {
            return delegate.isCancelled();
        }

        @Override
        public boolean isDone() {
            return delegate.isDone();
        }

        @Override
        public R get() throws InterruptedException, ExecutionException {
            return delegate.get();
        }

        @Override
        public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return delegate.get(timeout, unit);
        }

    };
接下来调用Future对象的f.isDone()方法,此时会一直等待方法执行完毕,再把二次封装的Futrue对象进行返回。
<a name="aX5x4"></a>
## 2-3. `queue().get()` 的get()方法
`queue()` 方法其实就是返回的 `Future` 对象,只要返回肯定就已经执行完毕了,再调用get方法就把结果返回给调用者了。
<a name="5ZBXR"></a>
# 3.Hystrix核心`toObservable().toBlocking().toFuture()`  分析
> Hystrix的源码,真正的核心机制和逻辑都在 `toObservable().toBlocking().toFuture()` 这段剪短的代码内 

<a name="Hz0sn"></a>
## 3-1. 分析 `toObservable()` 源码分析  
> 方法上的解析:
> Used for asynchronous execution of command with a callback by subscribing to the {@link Observable}.
> 大概理解的意思:
> 这句话的意思,就是在这里,会将comand使用异步的方式来执行,怎么异步呢?肯定是扔到一个线程池里异步去跑。扔comand到线程池异步去执行之后,在这里你肯定是可以拿到一个Observable对象,拿到这个对象之后,你如果要看这个command执行的一些状态和结果,你需要去订阅这个Observable对象
> This lazily starts execution of the command once the {@link Observable} is subscribed to.
> 大概理解的意思:
> 如果你获取了一个Observable对象之后,此时command其实还没立即开始执行的。这个时候仅仅就是将command封装在Observable对象里面,什么都没干,返回给你一个Observable对象
> An eager {@link Observable} can be obtained from {@link #observe()}.
> 大概理解的意思:
> 如果你希望一旦获取到Observable对象,就立即让他去执行内部的command,那么不要调用toObservable()方法,你可以去调用observe()方法

<a name="4a7r7"></a>
### a. `toObservable()` 方法下面我们来研究一下
![20210303061332.png](https://cdn.nlark.com/yuque/0/2021/png/1642324/1615445501643-d161e669-7c0e-4649-b61a-4bbb1a806f8c.png#height=1658&id=uqZa3&margin=%5Bobject%20Object%5D&name=20210303061332.png&originHeight=1658&originWidth=1404&originalType=binary&ratio=1&size=240713&status=done&style=none&width=1404)<br />**概况来说: **在这里queue方法定义了5个回调函数【terminateCommandCleanup、unsubscribeCommandCleanup、applyHystrixSemantics、wrapWithAllOnNextHooks、fireOnCompletedHook】去构造了一个Observable对象,其他的啥也没干。
> **分析说明:**
> 我推测肯定是在.toBlocking().toFuture()调用完事后会回调这5个回调函数!点进去.toBlocking().toFuture()方法是rxjava包中的,没必要研究rxjava的底层原理,我们就打断点研究这5个回调就可以啦。

<a name="36Htr"></a>
## 3-2. `toBlocking().toFuture()` 方法源码分析
由于这里解析非常复杂,现迁移到【[对toBlocking().toFuture()方法中解析补充](https://www.yuque.com/wallacefw/xgnodf/qakwox)】当中!
> 概况来说:
> 这里当执行`toBlocking()`方法时,会通过Rxjava触发 `toObservable()` 方法中 `applyHystrixSemantics` 方法去处理相关Obserable的处理,包括对其commandState状态、threadState状态判断是否执行,如以上条件状态没出错,则实行订阅执行SynchronousMethodHandler对应method中接口访问

<a name="6VrFm"></a>
# 4.HystrixCommand中涉及的线程池相关
Hystrix框架中,到现在为止还没接触到线程池的使用,这个是不应该的,维持这里开始对线程池创建及使用作分析
<a name="dHPnN"></a>
## 4-1.线程池的创建
在HystrixInvocationHandler的invoke方法中创建了HystrixCommand,此时最终会去调用HystrixThreadPoolDefault类,如下图:<br />![image.png](https://cdn.nlark.com/yuque/0/2021/png/1642324/1615523598269-aee2b570-2cb3-4171-9069-267eb96287b7.png#height=949&id=W6Nrs&margin=%5Bobject%20Object%5D&name=image.png&originHeight=949&originWidth=999&originalType=binary&ratio=1&size=240048&status=done&style=none&width=999)<br />在上图的第6步,HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);中,如下图:<br />![](https://cdn.nlark.com/yuque/0/2021/png/1642324/1615523681442-bb40223f-c0b0-4bf8-ac05-30cf2e123e44.png#height=513&id=cRdux&originHeight=513&originWidth=1664&originalType=binary&ratio=1&size=0&status=done&style=none&width=1664)<br />threadPools是一个 `ConcurrentHashMap<String, HystrixThreadPool>` ,此时根据threadPoolkey创建了一个线程池放到 `ConcurrentHashMap` 中,很明确的知道:一个threadPoolkey对应一个线程池 `HystrixThreadPoolDefault` 
<a name="SRcDd"></a>
## 4-2. `HystrixThreadPoolDefault` 分析
![](https://cdn.nlark.com/yuque/0/2021/png/1642324/1615523861559-267c7f15-a247-4dd9-9bbd-bd01f15952c9.png#height=379&id=zelRU&originHeight=379&originWidth=1398&originalType=binary&ratio=1&size=0&status=done&style=none&width=1398)

先获取线程池的配置,这些参数都是默认的,都是上面做demo的时候设置过的。如下图:<br />![](https://cdn.nlark.com/yuque/0/2021/png/1642324/1615523885426-68fe0f69-a53a-426a-86ad-03d31a3cf420.png#height=733&id=ihFM1&originHeight=733&originWidth=1717&originalType=binary&ratio=1&size=0&status=done&style=none&width=1717)

获取完配置文件后,进行创建了线程池:<br />**concurrencyStrategy.getThreadPool(threadPoolKey, properties):**<br />![](https://cdn.nlark.com/yuque/0/2021/png/1642324/1615523885333-0dbf51c5-3567-451b-8bc9-e25bfa36419e.png#height=430&id=elBia&originHeight=430&originWidth=1445&originalType=binary&ratio=1&size=0&status=done&style=none&width=1445)<br />![](https://cdn.nlark.com/yuque/0/2021/png/1642324/1615523885354-8e3ea128-458c-499e-985d-db07dd35c30e.png#height=883&id=Ejop6&originHeight=883&originWidth=1710&originalType=binary&ratio=1&size=0&status=done&style=none&width=1710)<br />这个ThreadFactory,在线程池创建一个新的线程的时候,会基于这个ThreadFactory来创建,这里主要是创建出来一个Thread对象,给这个Thread设置一个线程名称,如:ServiceA-1、ServiceA-2**代码继续走,接下来根据maxQueueSize 参数来创建了一个queue:![](https://cdn.nlark.com/yuque/0/2021/png/1642324/1615523988464-36a55373-0b82-4db9-8da3-4491440ab2a8.png#height=749&id=vy7Gs&originHeight=749&originWidth=1882&originalType=binary&ratio=1&size=0&status=done&style=none&width=1882)<br />SynchronousQueue<Runnable>()是没有所谓的排队的效果的!<br />默认的配置如下:
```markdown
hystrix.threadpool.ServiceA.allowMaximumSizeToDivergeFromCoreSize = false
hystrix.threadpool.ServiceA.keepAliveTimeMinutes = 1
hystrix.threadpool.ServiceA.maximumSize = 10
hystrix.threadpool.ServiceA.coreSize = 10
hystrix.threadpool.ServiceA.maxQueueSize = -1

一个请求过来,会找一个新的线程来处理这个请求,但是最多同时只能是coreSize指定的10个线程,同时处理10个请求
如果10个线程都在繁忙中,此时来了第11个请求,直接就是线程池reject掉
如果配置是如下:

hystrix.threadpool.ServiceA.allowMaximumSizeToDivergeFromCoreSize = true
hystrix.threadpool.ServiceA.keepAliveTimeMinutes = 1
hystrix.threadpool.ServiceA.maximumSize = 20
hystrix.threadpool.ServiceA.coreSize = 10
hystrix.threadpool.ServiceA.maxQueueSize = 10

先是用线程池里的10个线程来处理,如果10个线程都繁忙了,此时会进入队列排队,最多排10个请求,如果队列也满了,此时会创建新的线程,最多创建额外的10个线程,让线程池的综述,最多增加到20个。新增加出来的10个线程,如果处理完了请求,超过1分钟是空闲的,那么此时就会释放掉新增加出来的额外的10个线程。

4-3. 线程池中异步执行任务的时候如何进行超时检测以及中断线程的执行:

AbstractCommand#executeCommandAndObserve 方法中,创建 Observable<R> execution 时会判断超时的配置是否开启,并注入一个HystrixObservableTimeoutOperator的组件,如下图:
5.Hystrix源码学习 - 图8
而HystrixObservableTimeoutOperator就是超时检测的关键步骤!

分析HystrixObservableTimeoutOperator

5.Hystrix源码学习 - 图9
在这里,先创建了一个TimerListener,然后把这个TimerListener监听器给放入到了AbstractCommand也就是当前的command命令里面

分析TimerListener5.Hystrix源码学习 - 图10

获取到我们配置的执行超时时间,然后如果超时就会被回调,然后设置TimedOutStatus的状态从NOT_EXECUTED变为TIME_OUT!
是谁来启用这个定时的呢?在下图:
5.Hystrix源码学习 - 图115.Hystrix源码学习 - 图12
首先startThreadIfNeeded()方法会创建一个线程池:5.Hystrix源码学习 - 图13
之后获取线程池在 listener.getIntervalTimeInMilliseconds() 分钟之后和每个 listener.getIntervalTimeInMilliseconds() 时间去调用一次Listener.tick方法。

ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);

这样就完成了超时的操作,但是如果请求正常执行完毕后肯定就把这个超时的清理掉:是在HystrixCommand的observer对象创建的时候使用的回调!
5.Hystrix源码学习 - 图14

5.TimeOut(超时)的处理

AbstractCommand#executeCommandAndObserve(final AbstractCommand<R> _cmd)) (行635-639)调用方法properties.executionTimeoutEnabled().get(),当中判断是否超时,如有构建 HystrixObservableTimeoutOperator 控件处理
image.png当出现超时时候,就会执行 AbstractCommand.HystrixObservableTimeoutOperator#HystrixObservableTimeoutOperator$call() 方法处理
(行1129-1217)

 @Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
    final CompositeSubscription s = new CompositeSubscription();
    // if the child unsubscribes we unsubscribe our parent as well
    child.add(s);

    //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
    final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

    TimerListener listener = new TimerListener() {

        @Override
        public void tick() {
            // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
            // otherwise it means we lost a race and the run() execution completed or did not start
            if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                // report timeout failure
                originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                // shut down the original request
                s.unsubscribe();

                final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                    @Override
                    public void run() {
                        child.onError(new HystrixTimeoutException());
                    }
                });


                timeoutRunnable.run();
                //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
            }
        }

        @Override
        public int getIntervalTimeInMilliseconds() {
            return originalCommand.properties.executionTimeoutInMilliseconds().get();
        }
    };

    final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

    // set externally so execute/queue can see this
    originalCommand.timeoutTimer.set(tl);

    /**
             * If this subscriber receives values it means the parent succeeded/completed
             */
    Subscriber<R> parent = new Subscriber<R>() {

        @Override
        public void onCompleted() {
            if (isNotTimedOut()) {
                // stop timer and pass notification through
                tl.clear();
                child.onCompleted();
            }
        }

        @Override
        public void onError(Throwable e) {
            if (isNotTimedOut()) {
                // stop timer and pass notification through
                tl.clear();
                child.onError(e);
            }
        }

        @Override
        public void onNext(R v) {
            if (isNotTimedOut()) {
                child.onNext(v);
            }
        }

        private boolean isNotTimedOut() {
            // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
            return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
        }

    };

    // if s is unsubscribed we want to unsubscribe the parent
    s.add(parent);

    return parent;
}
  • TimerListener 是一个监听我们的command执行是否超时的这么一个监听器
  • final Reference tl = HystrixTimer.getInstance().addTimerListener(listener) 这里拿到了一个HystirxTimer加入了上面的那个监听器,就是如果超时了,就会去回调那个监听器
  • 然后将这个HystrixTimer,给放到了command的里面去, 出发HystrixTimer的监听器触发在线程池里每一秒执行TimerListener#tick()方法,状态为为TIME_OUT时,抛出HystrixTimeoutException 异常

    流程图

    image.png

6.fallback降级逻辑的处理

处理Fallback异常由回调方法 handleFallback 处理(AbstractCommand中601-625行):

final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
    @Override
    public Observable<R> call(Throwable t) {
        circuitBreaker.markNonSuccess();
        Exception e = getExceptionFromThrowable(t);
        executionResult = executionResult.setExecutionException(e);
        //这里是线程池reject异常
        if (e instanceof RejectedExecutionException) {
            return handleThreadPoolRejectionViaFallback(e);
         //这里是超时处理异常
        } else if (t instanceof HystrixTimeoutException) {
            return handleTimeoutViaFallback();
        //这里是BadRequest出错处理异常
        } else if (t instanceof HystrixBadRequestException) {
            return handleBadRequestByEmittingError(e);
        } else {
            /*
             * 将ExecutionHook中的HystrixBadRequestException视为普通的HystrixBadRequestException。
            */
            if (e instanceof HystrixBadRequestException) {
                eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                return Observable.error(e);
            }

            return handleFailureViaFallback(e);
        }
    }
};

所有的异常,都会交给handleFallback来处理,针对不同的异常,reject、timeout、failure,拒绝、超时、失败,都会执行降级逻辑

7.熔断器的触发

在HystrixCommand创建的时候会去初始化断路器initCircuitBreaker,在方法中hystrix会把HystrixCircuitBreaker存到一个ConcurrentHashMap中,一个commandKey对应一个HystrixCircuitBreaker的实现类HystrixCircuitBreakerImpl。如下图:

5.Hystrix源码学习 - 图17
在HystrixCircuitBreakerImpl的初始化的时候会调用subscribeToStream方法,此方法中订阅了指标数据收集器:HystrixMetrics,每次如果有新的统计信息就会回调onNext方法!

5.Hystrix源码学习 - 图18

在onNext方法中,会有一个HealthCounts参数,此参数就是记录的是最近一个时间窗口的统计信息,默认是10s内!逻辑如下图:
5.Hystrix源码学习 - 图19
此处跟我们设置的参数circuitBreakerRequestVolumeThreshold(默认20)、circuitBreakerErrorThresholdPercentage(默认50%)有关系。
如果说最近一个时间窗口(默认是10s)内请求总数要大于20(circuitBreakerRequestVolumeThreshold)的话,并且异常的请求次数所占的比例>50%(circuitBreakerErrorThresholdPercentage)就会触发熔断器开关:
将熔断器的状态设置为OPEN,并把熔断器打开的时间戳设置成现在的时间戳。

8.熔断器开了后,如何阻止请求执行

在AbstractCommand#applyHystrixSemantics方法中,有circuitBreaker.attemptExecution()方法进行判断熔断器是否打开:
5.Hystrix源码学习 - 图20
如果不通过,直接走fallback逻辑。我们来看看circuitBreaker.attemptExecution()方法的执行。
5.Hystrix源码学习 - 图21
首先判断是否手动强制开启或者关闭了断路器,再进入isAfterSleepWindow里面:
5.Hystrix源码学习 - 图22
判断当前的断路器的开启时间,与circuitBreakerSleepWindowInMilliseconds参数(默认5s)进行对比。
如果当前时间已经离开启断路器过去了circuitBreakerSleepWindowInMilliseconds(默认5s)就会把断路器状态改成HALF_OPEN半开状态,否则直接返回false执行fallback逻辑。

9.半开状态的断路器如何处理

当请求处理成功后会回调markOnCompleted或者markEmits方法:
image.png
最终调用circuitBreaker.markSuccess();
5.Hystrix源码学习 - 图24
此方法中把断路器从HALF_OPEN修改成了CLOSED并把断路器开启时间设置成 -1。
如果执行失败就会回调handlerFallback:
5.Hystrix源码学习 - 图25
然后再执行circuitBreaker.markNonSuccess();方法:
5.Hystrix源码学习 - 图26
此时又回重新打开断路器,再把打开断路器的时间设置成当前时间。
5.Hystrix源码学习 - 图27

截取资料

Hystrix 中线程池隔离与信号量隔离区别