前面了解了Hystrix最基本的支持高可用的技术:资源隔离+限流

  • 创建command
  • 执行command
  • 配置command对应的group和线程池

那么,调用了这个command的execute()方法之后,Hystrix底层的执行流程和步骤及原理是什么。
Hystrix执行过程 - 图1

步骤一:创建command

一个HystrixCommand或HystrixObservableCommand对象,代表了对某个依赖服务发起的一次请求或调用。创建的时候,可以在构造函数中传入任何需要的参数。

  • HystrixCommand主要用于进返回一个结果的调用。
  • HystrixObservableCommand主要用于可能返回多条结果的调用。 ```java // 创建 HystrixCommand HystrixCommand hystrixCommand = new HystrixCommand(arg1, arg2);

// 创建 HystrixObservableCommand HystrixObservableCommand hystrixObservableCommand = new HystrixObservableCommand(arg1, arg2);

  1. <a name="BRdIX"></a>
  2. # 步骤二:调用command执行方法
  3. 执行command,就可以发起一次对依赖服务的调用。<br />要执行command,可以在4个方法中选择其中的一个:execute()、queue()、observe()、toObservable()。<br />其中execute()和queue()方法仅对HystrixCommand适用。
  4. - execute():调用后直接block,属于同步调用,直到依赖服务返回单条结果,或者抛出异常。
  5. - queue():返回一个Feature,属于异步调用,后面可以通过Feature获取单条结果。
  6. - observe():订阅一个Observable对象,Observable代表的是依赖服务返回的结果,获取到一个那个代表结果的 Observable 对象的拷贝对象。
  7. - toObservable():返回一个 Observable 对象,如果我们订阅这个对象,就会执行 command 并且获取返回结果。
  8. ```java
  9. K value = hystrixCommand.execute();
  10. Future<K> fValue = hystrixCommand.queue();
  11. Observable<K> oValue = hystrixObservableCommand.observe();
  12. Observable<K> toOValue = hystrixObservableCommand.toObservable();

execute()实际上会调用queue().get()方法

  1. public R execute() {
  2. try {
  3. return queue().get();
  4. } catch (Exception e) {
  5. throw Exceptions.sneakyThrow(decomposeException(e));
  6. }
  7. }

而在queue()方法中,会调用toObsevable().toBlocking().toFuture()。

  1. final Future<R> delegate = toObservable().toBlocking().toFuture();

也就是说,先通过toObesevable()获取到Feture对象,然后调用Future的get()方法。所以,无论通过哪种方式调用command,最终都会依赖toObesevable()去执行。

步骤三:检查是否开启缓存

如果这个command开启了请求缓存Request Cache,而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果,否则,继续后面的步骤。

步骤四:检查是否开启了断路器

检查这个command对应的依赖服务是否开启了断路器,如果断路器被打开,那么这个Hystrix就不会执行这个command方法,而是执行fallback降级机制。

步骤五:检查线程池/队列/信号量是否已满

如果这个command线程池和队列已满,或者semphore信号量已满,那么就不会执行command,而是直接调用fallback降级机制,同时发送reject给断路器统计。

步骤六:执行command

调用HystirxObservableCommand对象的construct()方法,或者HystrixCommand的run()方法来实际执行这个command。
如果才用线程池方式,并且HystrixCommand.run()或者HystrixObservableCommand.construct()的执行时间超过了timeout时长的话,那么command所在的线程池会抛出一个TimeoutException,这时会执行fallback降级机制,不会去管run()或construct()方法的返回值了。如果command执行抛出了其他异常,也会走fallback降级机制。两种情况,Hystrix都会发送异常事件给断路器统计。
如果command成功执行,Hystrix也会也会做一些logging记录与metric度量统计。

步骤七:断路健康检查

Hystrix会把每一个依赖服务的调用成功、失败、reject、Timeout等事件发送给circuit breaker断路器。断路器就会对这些事件的次数进行统计,根据异常事件发生的比例来决定是否进行熔断。如果打开了断路器,那么在接下来一段时间,会直接断路,返回降级结果。
如果在之后,断路器尝试执行command,调用没出错,返回了正常结果,那么Hystrix就会把断路器关闭。

步骤八:调用fallback降级机制

在以下几种情况下,Hystrix会调用fallback降级机制。

  • 断路器处于打开状态;
  • 线程池/队列/信号量满了;
  • command执行超时;
  • run()或construt()抛出异常;

HystrixCommand中,实现fallback()方法,可以提供降级机制;HystrixObeservableCommand中,实现resumeWithFallback()方法,返回一个Observable对象,可以提供降级效果。

不同的执行方式

Hystrix执行过程 - 图2

  • execute(),和queue()获取的方式一样获取一个 Future,然后通过调用get()方法阻塞并等待结果的返回。
  • queue(),将 toObservable()产生的原始Observable通过toBlocking()方法转换成BlockingObservable对象,并调用它的toFuture()方法返回异步的Future对象。
  • observe(),在toObservable()产生原始Observable之后立即订阅它,让命令能够马上开始异步执行,并返回一个Observable对象,当调用它的subscribe时,将重新产生结果和通知给订阅者。
  • toObservable(),返回最原始的Observable,用户必须订阅它才能真正开始执行命令的订阅流程。