写于:2019-06-10 08:52:37

待整理

参考资料: Hystrix GitHub 官方文档 Hystrix Wiki 文档

code.zip

一、Hystrix 封装对象回顾

Hystrix工作流程解析 中,提到 Hystrix 提供了两种封装对象 HystrixCommandHystrixObservableCommand

它们分别提供了四种执行方式:

  • execute()
  • queue()
  • observe()
  • toObservable()

官方针对这四种执行方式给定了说明:

the first two are only applicable to simple HystrixCommand objects and are not available for the HystrixObservableCommand

通过查看源码也能够知道 execute() 和 queue() 只在 HystrixCommand 中有定义,是 HystrixCommand 的专属方法。而 HystrixObservalbeCommand 没有定义。

但是通过查看源码,我们能够得知:HystrixCommand 和 HystrixObservableCommand 都继承了抽象类 AbstractCommand。而 HystrixCommand 中的 execute() 和 queue() 最终调用的是 AbstractCommand 中的 toObservable().toBlocking().toFuture() 。

HystrixCommand 和 HystrixObservableCommand 都有各自的实现方式,但是最终都是调用的 AbstractCommand 中的方法实现。
02.png

二、测试数据准备

提供一个简单的 web 业务处理接口 http://localhost:9527/hello-world/{name}

  1. @RequestMapping("/hello-world/{name}")
  2. public String helloWrold(@PathVariable String name){
  3. System.err.println("---->Hello world " + name);
  4. return "Hello World " + name;
  5. }

三、编码方式使用 HystrixCommand 和 HystrixObservableCommand

(基于 Hystrix 1.5.18)

3.1、HystrixCommand 包裹业务执行方法 “ 调用 hello-world 接口”

  1. public class Case1 extends HystrixCommand<String> {
  2. private final String param;
  3. private RestTemplate restTemplate;
  4. /** 构造方法,传入 业务参数 和 http 请求方法 restTemplate **/
  5. public Case1(String param, RestTemplate restTemplate){
  6. super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
  7. this.restTemplate = restTemplate;
  8. this.param = param;
  9. }
  10. /** 业务执行方法 **/
  11. @Override
  12. protected String run() throws Exception {
  13. return restTemplate.getForObject("http://localhost:9527/hello-world/"
  14. + param,String.class);
  15. }
  16. /** 业务执行异常,调用方法 **/
  17. @Override
  18. protected String getFallback() {
  19. return "出现异常执行 fallback";
  20. }
  21. }

进行单元测试

  1. @Test
  2. public void testSynchronous(){
  3. assertEquals("Hello World World", new Case1("World",new RestTemplate()).execute());
  4. assertEquals("Hello World Bob", new Case1("Bob",new RestTemplate()).execute());
  5. }
  6. @Test
  7. public void testAsynchronous() throws Exception {
  8. assertEquals("Hello World World", new Case1("World",new RestTemplate()).queue().get());
  9. assertEquals("Hello World Bob", new Case1("Bob",new RestTemplate()).queue().get());
  10. }
  11. @Test
  12. public void testObserve() throws ExecutionException, InterruptedException {
  13. // non-blocking
  14. Observable<String> ob = new Case1("World", new RestTemplate()).observe();
  15. ob.subscribe(new Observer<String>() {
  16. @Override
  17. public void onCompleted() {
  18. System.err.println("-----> onCompleted");
  19. }
  20. @Override
  21. public void onError(Throwable e) {
  22. e.printStackTrace();
  23. }
  24. @Override
  25. public void onNext(String s) {
  26. System.err.println("-----> onNext :" + s);
  27. }
  28. });
  29. ob.subscribe(new Action1<String>() {
  30. @Override
  31. public void call(String s) {
  32. System.err.println("------> call:" + s);
  33. }
  34. });
  35. assertEquals("Hello World World", ob.toBlocking().toFuture().get());
  36. }
  37. @Test
  38. public void testToObservable(){
  39. // blocking
  40. Observable<String> ob = new Case1("World", new RestTemplate()).toObservable();
  41. assertEquals("Hello World World", new Case1
  42. ("World",new RestTemplate()).toObservable().toBlocking().single());
  43. }

3.2、HystrixObservableCommand 包裹业务执行方法 “ 调用 hello-world 接口”

  1. public class Case1_1 extends HystrixObservableCommand<String> {
  2. private final String name;
  3. private RestTemplate restTemplate;
  4. public Case1_1(String name,RestTemplate restTemplate) {
  5. super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
  6. this.name = name;
  7. this.restTemplate = restTemplate;
  8. }
  9. @Override
  10. protected Observable<String> construct() {
  11. return Observable.create(new Observable.OnSubscribe<String>() {
  12. @Override
  13. public void call(Subscriber<? super String> observer) {
  14. try {
  15. if (!observer.isUnsubscribed()) {
  16. // a real example would do work like a network call here
  17. String result = restTemplate.getForObject
  18. ("http://localhost:9527/hello-world/" + name, String.class);
  19. observer.onNext(result);
  20. observer.onCompleted();
  21. }
  22. } catch (Exception e) {
  23. observer.onError(e);
  24. }
  25. }
  26. } ).subscribeOn(Schedulers.io());
  27. }
  28. @Override
  29. protected Observable<String> resumeWithFallback() {
  30. return Observable.create(new Observable.OnSubscribe<String>(){
  31. @Override
  32. public void call(Subscriber<? super String> subscriber) {
  33. try {
  34. if (!subscriber.isUnsubscribed()) {
  35. subscriber.onNext("fallback!");
  36. subscriber.onNext("失败异常!");
  37. subscriber.onCompleted();
  38. }
  39. } catch (Exception e) {
  40. subscriber.onError(e);
  41. }
  42. }
  43. }).subscribeOn(Schedulers.io());
  44. }
  45. }

执行单元测试

  1. @Test
  2. public void tetObservable2(){
  3. Observable<String> observe = new Case1_1("World",new RestTemplate()).observe();
  4. Iterator<String> iterator = observe.toBlocking().getIterator();
  5. StringBuffer sb = new StringBuffer();
  6. while (iterator.hasNext()){
  7. sb.append(iterator.next());
  8. }
  9. assertEquals("Hello World World",sb.toString());
  10. }
  11. @Test
  12. public void testToObservable2(){
  13. Observable<String> observe = new Case1_1("World",new RestTemplate()).toObservable();
  14. Iterator<String> iterator = observe.toBlocking().getIterator();
  15. StringBuffer sb = new StringBuffer();
  16. while (iterator.hasNext()){
  17. sb.append(iterator.next());
  18. }
  19. assertEquals("Hello World World",sb.toString());
  20. }

在 web 开发中(spring mvc),我们来对比一下普通的业务调用和使用编码方式之后的业务调用。

  1. @RequestMapping("/hystrix")
  2. public String command(){
  3. // 编码方式:使用 HystrixCommand 包装原始的接口调用,需要定义新的 HystrixCommand 包装对象。
  4. return new Case1("WTF名字好难取",restTemplate).execute();
  5. // 传统调用接口的方式
  6. //return restTemplate.getForObject("http://localhost:9527/hello-world/" + "WTF名字好难取",String.class);
  7. }

使用 Hystrix 编码方式进行开发带来的问题:

  • 1、耦合高。业务方法需要耦合到 HystrixCommand 对象中
  • 2、代码量大。每一个使用 HystrixCommand 对象封装的 API 接口都需要定义一个新的 HystrixCommand封装类

四、注解方式使用 HystrixCommand 和 HystrixObservableCommand(基于 Hystrix 1.5.18,Spring Boot 2.1.5)

4.1、注解方式使用 HystrixCommand 封装业务接口

同步调用

  1. /** sync **/
  2. /** 注解方式,需要开启:@EnableCircuitBreaker **/
  3. @HystrixCommand(fallbackMethod = "syncCommandFallback",
  4. commandProperties = {
  5. @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "1000")
  6. })
  7. @RequestMapping("/hystrix-sync/{param}")
  8. public Object syncCommand(@PathVariable String param,
  9. @RequestParam(value = "error",defaultValue = "1") int error,
  10. @RequestParam(value = "timeout",defaultValue = "100") int timeout) throws InterruptedException {
  11. // 模拟异常熔断
  12. int a = 1 / error;
  13. // 模拟超时
  14. Thread.sleep(timeout);
  15. return restTemplate.getForObject("http://localhost:9527/hello-world/" + param,String.class);
  16. }
  17. /** <p> fallback 方法:需要与 注解方法参数对应 </p> **/
  18. public String syncCommandFallback(String param,int error,int timeout){
  19. return "syncCommandFallback";
  20. }

异步调用

  1. /** async **/
  2. /** 注解方式,需要开启:@EnableCircuitBreaker **/
  3. @HystrixCommand(fallbackMethod = "case2Fallback")
  4. @RequestMapping("/hystrix-async/{param}")
  5. public Object asynccommand(@PathVariable String param, int error){
  6. int a = 1 / error;
  7. AsyncResult<String> asyncResult = new AsyncResult<String>() {
  8. @Override
  9. public String invoke() {
  10. return restTemplate.getForObject("http://localhost:9527/hello-world/" + param,
  11. String.class);
  12. }
  13. };
  14. return asyncResult;
  15. }
  16. /** <p> fallback 方法:需要与 注解方法参数对应 **/
  17. public String case2Fallback(String param,int error){
  18. return "case2Fallback";
  19. }

4.2、注解方式使用 HystrixObservableCommand

observe()

  1. // HystrixObservableCommand 注解方式使用
  2. /** EAGER参数表示使用observe()方式执行 **/
  3. @HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER,
  4. fallbackMethod = "case2Fallback") //使用observe()执行方式
  5. @RequestMapping("/hystrix-observer/{param}")
  6. public Observable<String> observer(@PathVariable String param, int error) {
  7. return Observable.create(new Observable.OnSubscribe<String>() {
  8. @Override
  9. public void call(Subscriber<? super String> subscriber) {
  10. try {
  11. if(!subscriber.isUnsubscribed()) {
  12. int a = 1 / error;
  13. String result = restTemplate.getForObject(
  14. "http://localhost:9527/hello-world/" + param, String.class);
  15. subscriber.onNext(result);
  16. subscriber.onCompleted();
  17. }
  18. } catch (Exception e) {
  19. subscriber.onError(e);
  20. }
  21. }
  22. });
  23. }
  24. /** <p> fallback 方法:需要与 注解方法参数对应 **/
  25. public String case2Fallback(String param,int error){
  26. return "case2Fallback";
  27. }

toObservalbe()

  1. /** LAZY参数表示使用toObservable()方式执行 **/
  2. @HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY,
  3. fallbackMethod = "case2Fallback") //表示使用toObservable()执行方式
  4. @RequestMapping("/hystrix-to-observer/{param}")
  5. public Observable<String> toObserver(@PathVariable String param, int error) {
  6. return Observable.create(new Observable.OnSubscribe<String>() {
  7. @Override
  8. public void call(Subscriber<? super String> subscriber) {
  9. try {
  10. if(!subscriber.isUnsubscribed()) {
  11. // 异常熔断控制
  12. int a = 1 / error;
  13. String results = restTemplate.getForObject(
  14. "http://localhost:9527/hello-world/" + param, String.class);
  15. // 结果填入发送方
  16. subscriber.onNext(results);
  17. subscriber.onCompleted();
  18. }
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. subscriber.onError(e);
  22. }
  23. }
  24. });
  25. }
  26. /** fallback 方法:需要与 注解方法参数对应 **/
  27. public String case2Fallback(String param,int error){
  28. return "case2Fallback";
  29. }

五、Hystrix 集成 Feign (基于 spring boot 2.1.5 ,spring cloud Greenwich.SR1)

5.1、Feign 整合 hystrix

  • step1、开启 feign 对于 hystrix 的支持

feign 手动开启 hystrix feign.hystrix.enabled=true

  • step2、注解开启 feign 和 hystrix

@EnableFeignClients @EnableCircuitBreaker

  • step3、feign 使用 hystrix 功能

声明方法调用熔断 fallback 方法

  1. @Component
  2. public class UserClientFallback implements FallbackFactory<UserClient> {
  3. @Override
  4. public UserClient create(Throwable throwable) {
  5. return new UserClient() {
  6. @Override
  7. public String saveUser(Long userId) {
  8. return "save faild";
  9. }
  10. @Override
  11. public String queryUserByUserId(Long userId) {
  12. return "query faild";
  13. }
  14. };
  15. }
  16. }

feign 调用使用 hystrix fallback 方法

  1. @FeignClient(name = "provider",fallbackFactory = UserClientFallback.class)
  2. public interface UserClient extends IUserController {
  3. }

5.2、通过全局配置 hystrix 策略

如:(更多配置参考 Hystrix 官方配置

hystrix 全局配置超时时间:1000ms hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds = 1000

5.3、针对特定接口进行细粒度控制

通过引入 feign 依赖和 hystrix 依赖,开启 feign 熔断之后,spring boot 会自动进行配置。

之后我们可以通过在 application.properties 或者 Bean 的方式进行 hystrix 的全局策略配置。

在开发过程中,大部分接口的配置使用相同的 hystrix 配置策略即可,但是在特定的场合中,如:某个接口的运行时间相对较长,超时时间1000ms 不够,这时候可以通过在接口上添加的自定义 HystrixCommand 来配置。

  1. @HystrixCommand(fallbackMethod = "syncCommandFallback",
  2. commandProperties = {
  3. // 该接口调用时长较长,进行特殊配置 2000ms
  4. @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "2000")
  5. })
  6. @RequestMapping("/hystrix-sync/{param}")
  7. public Object syncCommand(......){
  8. ......
  9. }

如果此时 hystrix 全局的超时时间配置为 1000ms ,此接口优先使用自定义的 hystrix 策略。

六、Hystrix 常用的配置策略

参考官方链接:Hystrix 配置

6.1、Execution 【HystrixCommand.run() 执行相关参数】

  • hystrix.command.default.execution.isolation.strategy = Thread
    配置线程隔离。两种:THREAD(线程池) 和 Semaphore (信号量)。默认:THREAD 线程池
  • hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds = 1000
    hystrixcommand 命令执行超时时间。默认:1000 ms。
  • hystrix.command.default.execution.timeout.enabled = true
    hystrixcommand 命令执行是否开启超时。默认:true
  • hystrix.command.default.execution.isolation.thread.interruptOnTimeout = true
    hystrixcommand 命令执行发生超时时是否中断执行操作。默认:true
  • hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests = 10
    线程隔离为Semaphore 。允许的最大请求数。默认:10。

6.2、Fallback 【Fallback 相关参数】

  • hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests = 10
    线程隔离为Semaphore。fallback 信息允许的最大并发数。超过不会再走 fallback,直接抛出异常。默认:10。
  • hystrix.command.default.fallback.enabled = true
    是否开启异常自定义信息 fallback,不开启时异常直接抛出。默认:true。

6.3、Circuit Breaker 【Hystrix 熔断相关参数】

  • hystrix.command.default.circuitBreaker.enabled = true
    是否开启熔断机制。默认:true。
  • hystrix.command.default.circuitBreaker.errorThresholdPercentage = 50
    fallback 逻辑错误比率阈值,达到阈值会触发 fallback。默认:50。
  • hystrix.command.default.circuitBreaker.forceOpen = false
    强制开启熔断机制,相当于拒绝所有请求。默认:false。

6.4、Thread Pool Properties 【Hystrix 线程池相关参数】

  • hystrix.threadpool.default.coreSize = 10
    线程隔离为THREAD时,线程核心大小。默认:10.
  • hystrix.threadpool.default.maximumSize = 10
    线程隔离为THREAD时,最大线程数据。默认:10。
    注意:Hystrix 1.5.9 之前,coreSize == maximumSize 。Hystrix 1.5.9 后 hystrix.threadpool.default.allowMaximumSizeToDivergeFromCoreSize = true 时,可以分别设置 coreSize 和 maximumSize 为不同的值。

6.5、线程池相关配置策略选择

针对线程池相关的配置设置,Hystrix 官方给出了如下的计算公式:

requests per second at peak when healthy × 99th percentile latency in seconds + some breathing room

每秒正常的请求峰值 * 99%的请求延迟(也就是请求响应延迟) + 预留的缓冲

下图是 Hystrix 官方给出的一个例子。
01.jpg
图中定义:每秒有30个峰值请求,每个请求的响应延迟为200ms(0.2s),预留 4 个线程。
所以 Hystrix 线程池配置 为 30 * 0.2 + 4 = 10
或者
每个线程每秒处理请求数:1000 /200 = 5 个请求
30个请求需要: 30 / 5 = 6 个线程
总线程数 = 峰值请求数所需线程 + 缓冲预留线程数 = 6 + 4 = 10 个线程。