0. 前提

点进去 .toBlocking().toFuture() 方法是Rxjava包中的,没必要研究Rxjava的底层原理,我们就打断点研究这几个回调就可以啦

当中执行的URL为:http://localhost:9090/ServiceB/user/1

1. 请求触发,进入Observable.defer(new Func0<Observable<R>>() 内的 call()方法

当请求执行完 toObservable() 方法,执行 .toBlocking() 时候,会触发进入 Observable.defer(new Func0<Observable<R>>() 内的 call() 方法[AbstractCommand内456-515行]
image.png
判断是否开启了请求缓存:默认情况下不启用
5.1.对toBlocking().toFuture()方法中解析补充 - 图3
构建Observable afterCache:
在不开启请求缓存的情况下,Observable afterCache就包含了上面所说的五个回调函数,然后把observable返回了5.1.对toBlocking().toFuture()方法中解析补充 - 图4

  • 首先进入的是Func0的回调函数,在这里面调用了上面 applyHystrixSemanticswrapWithAllOnNextHooksterminateCommandCleanupunsubscribeCommandCleanupfireOnCompletedHook 这5个回调函数
  • 状态 commandStat 由NOT_STARTED —> OBSERVABLE_CHAIN_CREATED
  • 返回一个 Observable<R> 对象

    2.调用回调方法 applyHystrixSemantics()

    image.png
    进入 applyHystrixSemantics() 方法:
    5.1.对toBlocking().toFuture()方法中解析补充 - 图6

executionHook.onStart(_cmd) 方法啥都没干,如下图:5.1.对toBlocking().toFuture()方法中解析补充 - 图7

判断断路器是否开启:**
如果没有断路器开启了就直接走fallback降级逻辑了。在circuitBreaker.attemptExecution()中肯定就是判断断路器是否开启的逻辑!
5.1.对toBlocking().toFuture()方法中解析补充 - 图8

断路器默认肯定是不会打开的,继续走下面的代码逻辑:5.1.对toBlocking().toFuture()方法中解析补充 - 图9
代码524行,判断隔离策略是否是信号量,如果是则做一些处理。我们现在是线程池隔离策略所以这里什么都不做。
这里定义了两个回调函数,打断点进行调试!
代码一值走走走,走到546行,这里调用了executeCommandAndObserve方法,这里才是核心逻辑:

3.执行executeCommandAndObserve() 方法调用:5.1.对toBlocking().toFuture()方法中解析补充 - 图10

  • 上面定义了四个回调函数:markEmits、markOnCompleted、handleFallback、setRequestContext我们都打上断点执行到了之后再进行分析
  • 之后程序执行到635行,判断是否开启超时!此处肯定是个关键方法:executeCommandWithSpecifiedIsolation(),对execution还创建了一个HystrixObservableTimeoutOperator的组件。

5.1.对toBlocking().toFuture()方法中解析补充 - 图11
其实在 executeCommandWithSpecifiedIsolation 也是定义了一个回调函数,我们也来打上断点,全速前进!
执行后会进入到上图中的call方法中,代码先设置了commandState从OBSERVABLE_CHAIN_CREATED设置到USER_CODE_EXECUTED,然后判断是否超时
没超时会把threadState的状态从NOT_USING_THREAD设置到STARTED。再执行到下面的代码:5.1.对toBlocking().toFuture()方法中解析补充 - 图12
其中677行-679行,代码什么都没有实现。后面进入

4.执行 getUserExecutionObservable(_cmd) 方法:

5.1.对toBlocking().toFuture()方法中解析补充 - 图13
构造了一个Observable,并设置了ExecutionHookApplication、DeprecatedOnRunHookApplication两个组件
5.1.对toBlocking().toFuture()方法中解析补充 - 图14
代码全速前进可以看到直接调用了doOnSubscribe方法进行订阅此Observable,之后会执行到上图的call方法中:return Observable.just(run());此处就会去调用HystrixCommand定义的run方法啦!如下图:5.1.对toBlocking().toFuture()方法中解析补充 - 图15