0. 前提
点进去 .toBlocking().toFuture() 方法是Rxjava包中的,没必要研究Rxjava的底层原理,我们就打断点研究这几个回调就可以啦
当中执行的URL为:http://localhost:9090/ServiceB/user/1
1. 请求触发,进入Observable.defer(new Func0<Observable<R>>() 内的 call()方法
当请求执行完 toObservable() 方法,执行 .toBlocking() 时候,会触发进入 Observable.defer(new Func0<Observable<R>>() 内的 call() 方法[AbstractCommand内456-515行]
判断是否开启了请求缓存:默认情况下不启用
构建Observable
在不开启请求缓存的情况下,Observable
- 首先进入的是Func0的回调函数,在这里面调用了上面
applyHystrixSemantics、wrapWithAllOnNextHooks、terminateCommandCleanup、unsubscribeCommandCleanup、fireOnCompletedHook这5个回调函数 - 状态
commandStat由NOT_STARTED —> OBSERVABLE_CHAIN_CREATED - 返回一个
Observable<R>对象2.调用回调方法
applyHystrixSemantics()
进入applyHystrixSemantics()方法:
executionHook.onStart(_cmd) 方法啥都没干,如下图:
判断断路器是否开启:**
如果没有断路器开启了就直接走fallback降级逻辑了。在circuitBreaker.attemptExecution()中肯定就是判断断路器是否开启的逻辑!
断路器默认肯定是不会打开的,继续走下面的代码逻辑:
代码524行,判断隔离策略是否是信号量,如果是则做一些处理。我们现在是线程池隔离策略所以这里什么都不做。
这里定义了两个回调函数,打断点进行调试!
代码一值走走走,走到546行,这里调用了executeCommandAndObserve方法,这里才是核心逻辑:
3.执行executeCommandAndObserve() 方法调用:
- 上面定义了四个回调函数:markEmits、markOnCompleted、handleFallback、setRequestContext我们都打上断点执行到了之后再进行分析
- 之后程序执行到635行,判断是否开启超时!此处肯定是个关键方法:executeCommandWithSpecifiedIsolation(),对execution还创建了一个HystrixObservableTimeoutOperator的组件。

其实在 executeCommandWithSpecifiedIsolation 也是定义了一个回调函数,我们也来打上断点,全速前进!
执行后会进入到上图中的call方法中,代码先设置了commandState从OBSERVABLE_CHAIN_CREATED设置到USER_CODE_EXECUTED,然后判断是否超时。
没超时会把threadState的状态从NOT_USING_THREAD设置到STARTED。再执行到下面的代码:
其中677行-679行,代码什么都没有实现。后面进入
4.执行 getUserExecutionObservable(_cmd) 方法:

构造了一个Observable,并设置了ExecutionHookApplication、DeprecatedOnRunHookApplication两个组件
代码全速前进可以看到直接调用了doOnSubscribe方法进行订阅此Observable,之后会执行到上图的call方法中:return Observable.just(run());此处就会去调用HystrixCommand定义的run方法啦!如下图:
