前面了解了Hystrix最基本的支持高可用的技术:资源隔离+限流
- 创建command
- 执行command
- 配置command对应的group和线程池
那么,调用了这个command的execute()方法之后,Hystrix底层的执行流程和步骤及原理是什么。
步骤一:创建command
一个HystrixCommand或HystrixObservableCommand对象,代表了对某个依赖服务发起的一次请求或调用。创建的时候,可以在构造函数中传入任何需要的参数。
- HystrixCommand主要用于进返回一个结果的调用。
- HystrixObservableCommand主要用于可能返回多条结果的调用。 ```java // 创建 HystrixCommand HystrixCommand hystrixCommand = new HystrixCommand(arg1, arg2);
// 创建 HystrixObservableCommand HystrixObservableCommand hystrixObservableCommand = new HystrixObservableCommand(arg1, arg2);
<a name="BRdIX"></a># 步骤二:调用command执行方法执行command,就可以发起一次对依赖服务的调用。<br />要执行command,可以在4个方法中选择其中的一个:execute()、queue()、observe()、toObservable()。<br />其中execute()和queue()方法仅对HystrixCommand适用。- execute():调用后直接block,属于同步调用,直到依赖服务返回单条结果,或者抛出异常。- queue():返回一个Feature,属于异步调用,后面可以通过Feature获取单条结果。- observe():订阅一个Observable对象,Observable代表的是依赖服务返回的结果,获取到一个那个代表结果的 Observable 对象的拷贝对象。- toObservable():返回一个 Observable 对象,如果我们订阅这个对象,就会执行 command 并且获取返回结果。```javaK value = hystrixCommand.execute();Future<K> fValue = hystrixCommand.queue();Observable<K> oValue = hystrixObservableCommand.observe();Observable<K> toOValue = hystrixObservableCommand.toObservable();
execute()实际上会调用queue().get()方法
public R execute() {try {return queue().get();} catch (Exception e) {throw Exceptions.sneakyThrow(decomposeException(e));}}
而在queue()方法中,会调用toObsevable().toBlocking().toFuture()。
final Future<R> delegate = toObservable().toBlocking().toFuture();
也就是说,先通过toObesevable()获取到Feture对象,然后调用Future的get()方法。所以,无论通过哪种方式调用command,最终都会依赖toObesevable()去执行。
步骤三:检查是否开启缓存
如果这个command开启了请求缓存Request Cache,而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果,否则,继续后面的步骤。
步骤四:检查是否开启了断路器
检查这个command对应的依赖服务是否开启了断路器,如果断路器被打开,那么这个Hystrix就不会执行这个command方法,而是执行fallback降级机制。
步骤五:检查线程池/队列/信号量是否已满
如果这个command线程池和队列已满,或者semphore信号量已满,那么就不会执行command,而是直接调用fallback降级机制,同时发送reject给断路器统计。
步骤六:执行command
调用HystirxObservableCommand对象的construct()方法,或者HystrixCommand的run()方法来实际执行这个command。
如果才用线程池方式,并且HystrixCommand.run()或者HystrixObservableCommand.construct()的执行时间超过了timeout时长的话,那么command所在的线程池会抛出一个TimeoutException,这时会执行fallback降级机制,不会去管run()或construct()方法的返回值了。如果command执行抛出了其他异常,也会走fallback降级机制。两种情况,Hystrix都会发送异常事件给断路器统计。
如果command成功执行,Hystrix也会也会做一些logging记录与metric度量统计。
步骤七:断路健康检查
Hystrix会把每一个依赖服务的调用成功、失败、reject、Timeout等事件发送给circuit breaker断路器。断路器就会对这些事件的次数进行统计,根据异常事件发生的比例来决定是否进行熔断。如果打开了断路器,那么在接下来一段时间,会直接断路,返回降级结果。
如果在之后,断路器尝试执行command,调用没出错,返回了正常结果,那么Hystrix就会把断路器关闭。
步骤八:调用fallback降级机制
在以下几种情况下,Hystrix会调用fallback降级机制。
- 断路器处于打开状态;
- 线程池/队列/信号量满了;
- command执行超时;
- run()或construt()抛出异常;
HystrixCommand中,实现fallback()方法,可以提供降级机制;HystrixObeservableCommand中,实现resumeWithFallback()方法,返回一个Observable对象,可以提供降级效果。
不同的执行方式

- execute(),和queue()获取的方式一样获取一个 Future,然后通过调用get()方法阻塞并等待结果的返回。
- queue(),将 toObservable()产生的原始Observable通过toBlocking()方法转换成BlockingObservable对象,并调用它的toFuture()方法返回异步的Future对象。
- observe(),在toObservable()产生原始Observable之后立即订阅它,让命令能够马上开始异步执行,并返回一个Observable对象,当调用它的subscribe时,将重新产生结果和通知给订阅者。
- toObservable(),返回最原始的Observable,用户必须订阅它才能真正开始执行命令的订阅流程。
