Hytirx的定义
就是在微服务之间调用之间做资源隔离、限流、熔断、降级处理、运维监控的高可用框架
- 资源隔离:让你的系统里,某一块东西,在故障的情况下,不会耗尽系统所有的资源,比如线程资源
- 限流:高并发的流量涌入进来,比如说突然间一秒钟100万QPS,废掉了,10万QPS进入系统,其他90万QPS被拒绝了
- 熔断:系统后端的一些依赖,出了一些故障,比如说mysql挂掉了,每次请求都是报错的,熔断了,后续的请求过来直接不接收了,拒绝访问,10分钟之后再尝试去看看mysql恢复没有
- 降级:mysql挂了,系统发现了,自动降级,从内存里存的少量数据中,去提取一些数据出来
- 运维监控:监控+报警+优化,各种异常的情况,有问题就及时报警,优化一些系统的配置和参数,或者代码
Hystrix的设计原则
- 对依赖服务调用时出现的调用延迟和调用失败进行控制和容错保护
- 在复杂的分布式系统中,阻止某一个依赖服务的故障在整个系统中蔓延,服务A->服务B->服务C,服务C故障了,服务B也故障了,服务A故障了,整套分布式系统全部故障,整体宕机
- 提供fail-fast(快速失败)和快速恢复的支持
- 提供fallback优雅降级的支持
-
Hystrix是如何实现它的目标的
通过HystrixCommand或者HystrixObservableCommand来封装对外部依赖的访问请求,这个访问请求一般会运行在独立的线程中,资源隔离
- 对于超出我们设定阈值的服务调用,直接进行超时,不允许其耗费过长时间阻塞住。这个超时时间默认是99.5%的访问时间,但是一般我们可以自己设置一下
- 为每一个依赖服务维护一个独立的线程池,或者是semaphore,当线程池已满时,直接拒绝对这个服务的调用
- 对依赖服务的调用的成功次数,失败次数,拒绝次数,超时次数,进行统计
- 如果对一个依赖服务的调用失败次数超过了一定的阈值,自动进行熔断,在一定时间内对该服务的调用直接降级,一段时间后再自动尝试恢复
- 当一个服务调用出现失败,被拒绝,超时,短路等异常情况时,自动调用fallback降级机制
- 对属性和配置的修改提供近实时的支持
【Hystrix】线程池隔离技术与信号量隔离技术的区别
Hystrix,资源隔离,两种技术,线程池的资源隔离,信号量的资源隔离线程池隔离技术
Hystrix里面,核心的一项功能,其实就是所谓的资源隔离,要解决的最最核心的问题,就是将多个依赖服务的调用分别隔离到各自自己的资源池内避免说对某一个依赖服务的调用,因为依赖服务的接口调用的延迟或者失败,导致服务所有的线程资源全部耗费在这个服务的接口调用上
一旦说某个服务的线程资源全部耗尽的话,可能就导致服务就会崩溃,甚至说这种故障会不断蔓延信号量隔离技术
信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可。
线程池用来控制实际工作的线程数量,通过线程复用的方式来减小内存开销。线程池可同时工作的线程数量是一定的,超过该数量的线程需进入线程队列等待,直到有可用的工作线程来执行任务。
使用Seamphore,你创建了多少线程,实际就会有多少线程进行执行,只是可同时执行的线程数量会受到限制。但使用线程池,你创建的线程只是作为任务提交给线程池执行,实际工作的线程由线程池创建,并且实际工作的线程数量由线程池自己管理。
简单来说,线程池实际工作的线程是work线程,不是你自己创建的,是由线程池创建的,并由线程池自动控制实际并发的work线程数量。而Seamphore相当于一个信号灯,作用是对线程做限流,Seamphore可以对你自己创建的的线程做限流(也可以对线程池的work线程做限流),Seamphore的限流必须通过手动acquire和release来实现。线程池与信号量的区别,在什么情景下使用
- 线程池(Thread):适合绝大多数的场景,99%的,线程池,对依赖服务的网络请求的调用和访问,timeout这种问题
信号量(Semaphore):适合你的访问不是对外部依赖的访问,而是对内部的一些比较复杂的业务逻辑的访问,但是像这种访问,系统内部的代码,其实不涉及任何的网络请求,那么只要做信号量的普通限流就可以了,因为不需要去捕获timeout类似的问题,算法+数据结构的效率不是太高,并发量突然太高,因为这里稍微耗时一些,导致很多线程卡在这里的话,不太好,所以进行一个基本的资源隔离和访问,避免内部复杂的低效率的代码,导致大量的线程被hang住。
通常是针对超大并发量的场景下,每个服务实例每秒都几百的 QPS,那么此时你用线程池的话,线程一般不会太 多,可能撑不住那么高的并发,如果要撑住,可能要耗费大量的线程资源,那么就是用信号量,来进行限流保护 (这种理解也能说得通)
区别 | 隔离方式 | 是否支持超时 | 是否支持熔断 | 隔离原理 | 是否是异步调用 | 资源消耗 | | —- | —- | —- | —- | —- | —- | | 线程池隔离 | 支持,可直接返回 | 支持,当线程池到达maxSize后,再请求会触发fallback接口进行熔断 | 每个服务单独用线程池 | 可以是异步,也可以是同步。看调用的方法 | 大,大量线程的上下文切换,容易造成机器负载高 | | 信号量隔离 | 不支持,如果阻塞,只能通过调用协议(如:socket超时才能返回) | 支持,当信号量达到maxConcurrentRequests后。再请求会触发fallback | 通过信号量的计数器 | 同步调用,不支持异步 | 小,只是个计数器 |
【Hytrix】入门及相关使用分析
这里使用的项目在Demo在gitee位置:https://gitee.com/ZIB/ribbon-demo
A. Hytrix 的入门与相关API使用
执行处理业务类继承 **HystrixCommand** 或者是 **HystrixObservableCommand** ,当中继承**HystrixCommand** 是用来获取一条数据(现实使用比较少),继承**HystrixObservableCommand** 是设计用来获取多条数据的(使用比较多)
A-0. command的四种调用方式
- 同步:new CommandHelloWorld(“World”).execute(),new ObservableCommandHelloWorld(“World”).toBlocking().toFuture().get()
如果你认为observable command只会返回一条数据,那么可以调用上面的模式,去同步执行,返回一条数据 异步:new CommandHelloWorld(“World”).queue(),new ObservableCommandHelloWorld(“World”).toBlocking().toFuture()
对command调用queue(),仅仅将command放入线程池的一个等待队列,就立即返回,拿到一个Future对象,后面可以做一些其他的事情,然后过一段时间对future调用get()方法获取数据A-1. 继承
HystrixCommand的demo//demo代码 //继承HystrixCommand /** * @author fanwei * GetProudctInfoCommand对执行HttpClientUtils.sendGetRequest(url)放入 * getProuductInfoCommandGroup线程池中 * @url -- http://127.0.0.1:8062/getProductInfo?productId={id} * 继承HystrixCommand的话,:是用来获取一条数据的 * 继承HystrixObservableCommand:是设计用来获取多条数据的@{com.zhss.eshop.cache.hytrix.command.GetProudctInfosCommand} */ public class GetProudctInfoCommand extends HystrixCommand<ProductInfo> { private Long productId; public GetProudctInfoCommand(Long productId) { super(HystrixCommandGroupKey.Factory.asKey("getProuductInfoCommandGroup")); this.productId = productId; } @Override protected ProductInfo run() throws Exception { // 拿到一个商品id // 调用商品服务的接口,获取商品id对应的商品的最新数据 // 用HttpClient去调用商品服务的http接口 String url = "http://127.0.0.1:8062/getProductInfo?productId=" + productId; String response = HttpClientUtils.sendGetRequest(url); return JSONObject.parseObject(response,ProductInfo.class); } } //继承HystrixCommand对应处理 @RequestMapping("/change/product/{productId}") @ResponseBody public String changeProduct(@PathVariable("productId") Long productId) { HystrixCommand<ProductInfo> getHystrixCommand = new GetProudctInfoCommand(productId); ProductInfo productInfo = getHystrixCommand.execute(); //这里添加一个Command去执行(这里省略.....) return "success"; }A-2. 继承
HystrixObservableCommand的demo```java
//继承HystrixObservableCommand /**
- @author fanwei
批量获取productIds集对应返回结果 */ public class GetProudctInfosCommand extends HystrixObservableCommand
{ private List
productIds; public GetProudctInfosCommand(List productIds) { // super(HystrixCommandGroupKey.Factory.asKey(“getProuductInfoCommandGroup”));
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("getProuductInfoCommandGroup")) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionTimeoutEnabled(false))); this.productIds = productIds;}
@Override protected Observable
construct() { return Observable.create(new Observable.OnSubscribe<ProductInfo>() { @Override public void call(Subscriber<? super ProductInfo> subscriber) { try { for (Long productId:productIds){ String url = "http://127.0.0.1:8062/getProductInfo?productId=" + productId; String response = HttpClientUtils.sendGetRequest(url); ProductInfo productInfo = JSONObject.parseObject(response,ProductInfo.class); subscriber.onNext(productInfo); } subscriber.onCompleted(); }catch (Exception e){ subscriber.onError(e); } } });} } //继承HystrixObservableCommand对应处理 —- Observer 方式(常用) /**
- 批量执行productIds获取所有PruductInfo数据集
- @param projectIds
@return */ @RequestMapping(“/change/products”) @ResponseBody public String changeProducts(@RequestParam(“projectIds”) List
projectIds){ HystrixObservableCommand getHystrixObservableCommand = new GetProudctInfosCommand(projectIds); Observable productInfoObservable = getHystrixObservableCommand.observe(); productInfoObservable.subscribe(new Observer () { @Override public void onCompleted() { System.out.println("this done!");}
@Override public void onError(Throwable e) {
e.printStackTrace();}
@Override public void onNext(ProductInfo productInfo) {
System.out.println(productInfo.toString());A-3 . HystrixObservableCommand 的另外2种调用方式
- lamdba方式
- 递归方式(常用)
- Observer 方式(常用) ```java /**
- 批量执行productIds获取所有PruductInfo数据集—第二种获取同步获取数据方法(lamdba方式)
- @param projectIds
- @return
*/
@RequestMapping(“/change/2/products”)
@ResponseBody
public String change2Products(@RequestParam(“projectIds”) List
projectIds){ HystrixObservableCommand getHystrixObservableCommand = new GetProudctInfosCommand(projectIds); Observable productInfoObservable = getHystrixObservableCommand.observe(); productInfoObservable.subscribe(productInfo -> {
}); return “success”; }System.out.println(productInfo);
/**
- 批量执行productIds获取所有PruductInfo数据集—第三种获取同步获取数据方法(递归方式)—- 我常用方式
- @param projectIds
- @return
*/
@RequestMapping(“/change/2/products”)
@ResponseBody
public String change2Products(@RequestParam(“projectIds”) List
projectIds){ HystrixObservableCommand getHystrixObservableCommand = new GetProudctInfosCommand(projectIds); // 同步调用方式 — 第二种方式 Iterator iterator = getHystrixObservableCommand.observe().toBlocking().getIterator(); while (iterator.hasNext()) {
} return “success”; } ```System.out.println(iterator.next());B. Hystrix的线程池隔离策略配置、服务、接口划分及资源池的容量大小控制
ps:细节分析可以查看 4-1 HystrixCommand 相关配置属性配置解析 有相关解析B-1. 隔离策略配置
- 线程池配置
//继承HystrixCommand或HystrixObservableCommand抽象类之后,构造方法配置 //command group -- GetProductCommandGroup super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetProductCommandGroup")) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD) ) );
command group:GetProductCommandGroup是这个group的命名,command group 来定义一个线程池的,而且还会通过command group来聚合一些监控和报警信息,同一个command group中的请求,都会进入同一个线程池中HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)设置是线程池
- 信号量配置
//继承HystrixCommand或HystrixObservableCommand抽象类之后,构造方法配置 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("getCityNameGroup")) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() //设置超时最多是1s .withExecutionTimeoutInMilliseconds(1000) .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE) ) );
HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)设置是线程池-
B-2.command名称和command组
线程池隔离,依赖服务->接口->线程池,如何来划分
你的每个command,都可以设置一个自己的名称,同时可以设置一个自己的组,如下面代码:private static final Setter cachedSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")); public CommandHelloWorld(String name) { super(cachedSetter); this.name = name; } command group,是一个非常重要的概念,默认情况下,因为就是通过command group来定义一个线程池的,而且还会通过command group来聚合一些监控和报警信息,这里使用
ExampleGroup命名command key,代表了一类command,一般来说,代表了底层的依赖服务的一个接口,这里使用
HelloWorld命名B-3.command线程池
threadpool key 代表了一个HystrixThreadPool,用来进行统一监控,统计,缓存,默认的threadpool key就是command group名称
- 每个command都会跟它的threadpool key对应的thread pool绑定在一起
- 如果不想直接用command group,也可以手动设置thread pool name
代码如下:
public CommandHelloWorld(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
command threadpool -> command group -> command key
command key,代表了一类command,一般来说,代表了底层的依赖服务的一个接口
command group,代表了某一个底层的依赖服务,合理,一个依赖服务可能会暴露出来多个接口,每个接口就是一个command key
command group,在逻辑上去组织起来一堆command key的调用,统计信息,成功次数,timeout超时次数,失败次数,可以看到某一个服务整体的一些访问情况
command group,一般来说,推荐是根据一个服务去划分出一个线程池,command key默认都是属于同一个线程池的
比如说你以一个服务为粒度,估算出来这个服务每秒的所有接口加起来的整体QPS在100左右
你调用那个服务的当前服务,部署了10个服务实例,每个服务实例上,其实用这个command group对应这个服务,给一个线程池,量大概在10个左右,就可以了,你对整个服务的整体的访问QPS大概在每秒100左右
一般来说,command group是用来在逻辑上组合一堆command的
举个例子,对于一个服务中的某个功能模块来说,希望将这个功能模块内的所有command放在一个group中,那么在监控和报警的时候可以放一起看
command group,对应了一个服务,但是这个服务暴露出来的几个接口,访问量很不一样,差异非常之大
你可能就希望在这个服务command group内部,包含的对应多个接口的command key,做一些细粒度的资源隔离
对同一个服务的不同接口,都使用不同的线程池
command key -> command group
command key -> 自己的threadpool key
逻辑上来说,多个command key属于一个command group,在做统计的时候,会放在一起统计
每个command key有自己的线程池,每个接口有自己的线程池,去做资源隔离和限流
但是对于thread pool资源隔离来说,可能是希望能够拆分的更加一致一些,比如在一个功能模块内,对不同的请求可以使用不同的thread pool
command group一般来说,可以是对应一个服务,多个command key对应这个服务的多个接口,多个接口的调用共享同一个线程池
如果说你的command key,要用自己的线程池,可以定义自己的threadpool key,就ok了
B-4.command线程池coreSize
关注4-1 HystrixCommand 相关配置属性配置解析中的coreSize
// 注意: HystrixObservableCommand 不能设置 ThreadPoolKey
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("getProuductInfoCommandGroup"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("product-info"))
//这里只有HystrixCommand继承类才有的配置
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10))
);
B-5.command线程池queueSizeRejectionThreshold
关注4-1 HystrixCommand 相关配置属性配置解析中的queueSizeRejectionThreshold
// 注意: HystrixObservableCommand 不能设置 ThreadPoolKey
super(Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("getProuductInfoCommandGroup"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("product-info"))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(10)
//线程池执行处理之前存储的队列满了之后reject的指定大小
.withQueueSizeRejectionThreshold(12)
)
);
B-n.信号量maxConcurrentRequests属性
关注4-1 HystrixCommand 相关配置属性配置解析中的maxConcurrentRequests
// 注意: HystrixObservableCommand 不能设置 ThreadPoolKey
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("getCityNameGroup"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(1000)
//信号量reject前最大的并发量多少设置
.withExecutionIsolationSemaphoreMaxConcurrentRequests(122)
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
C. Hystrix执行的8大流程步骤及原理
上面的章节中我们讲到Hystrix的创建 command,执行这个 command,配置这个 command 对应的 group 和线程池,以及线程池/信号量的容量和大小
再开始执行这个 command,调用了这个 command 的 execute() 方法以后, hystrix 内部的底层的执行流程和步骤以及原理是什么呢?
在讲解这个流程的过程中,我们会带出来 hystrix 其他的一些核心以及重要的功能
流程图
官网引用图
C-1.构建一个HystrixCommand或者HystrixObservableCommand
C-2. 调用command的执行方法
执行 Command 就可以发起一次对依赖服务的调用, 要执行 Command,需要在 4 个方法中选择其中的一个:execute()、queue()、observe()、toObservable()
其中 execute() 和 queue() 仅仅对 HystrixCommand 适用
execute() |
调用后直接block住,属于同步调用,直到依赖服务返回单条结果,或者抛出异常 | K value = command.execute(); |
|---|---|---|
queue() |
返回一个Future,属于异步调用,后面可以通过Future获取单条结果 | Future<K> fValue = command.queue(); |
observe() |
订阅一个Observable对象,Observable代表的是依赖服务返回的结果,获取到一个那个代表结果的Observable对象的拷贝对象 | Observable<K> ohValue = command.observe(); |
toObservable() |
返回一个Observable对象,如果我们订阅这个对象,就会执行command并且获取返回结果 | Observable<K> ocValue = command.toObservable(); |
execute()实际上会调用queue().get().queue(),接着会调用toObservable().toBlocking().toFuture() 也就是说,无论是哪种执行command的方式,最终都是依赖toObservable()去执行的
C-3. 检查是否开启缓存
从这一步开始,进入我们的底层的运行原理啦,了解 hysrix 的一些更加高级的功能和特性
如果这个 command 开启了请求缓存(request cache),而且这个调用的结果在缓存中存在,那么直接从缓存中返回结果
C-4. 检查是否开启了短路器
检查这个command对应的依赖服务是否开启了短路器
如果断路器被打开了,那么hystrix就不会执行这个command,而是直接去执行fallback降级机制
C-5. 检查线程池/队列/semaphore是否已经满了
如果command对应的线程池/队列/semaphore已经满了,那么也不会执行command,而是直接去调用fallback降级机制
C-6. 执行command
调用 HystrixObservableCommand.construct() 或 HystrixCommand.run() 来实际执行这个command
HystrixCommand.run() |
是返回一个单条结果,或者抛出一个异常 |
|---|---|
HystrixObservableCommand.construct() |
是返回一个Observable对象,可以获取多条结果 |
如果
HystrixCommand.run()或HystrixObservableCommand.construct()的执行,超过了timeout时长的话,那么command所在的线程就会抛出一个TimeoutException 如果timeout了,也会去执行fallback降级机制,而且就不会管run()或construct()返回的值 这里要注意的一点是,我们是不可能终止掉一个调用严重延迟的依赖服务的线程的,只能说给你抛出来一个TimeoutException,但是还是可能会因为严重延迟的调用线程占满整个线程池的 即使这个时候新来的流量都被限流了。。。 如果没有timeout的话,那么就会拿到一些调用依赖服务获取到的结果,然后hystrix会做一些logging记录和metric统计
C-7. 短路健康检查
Hystrix会将每一个依赖服务的调用成功,失败,拒绝,超时,等事件,都会发送给circuit breaker断路器
短路器就会对调用成功/失败/拒绝/超时等事件的次数进行统计
短路器会根据这些统计次数来决定,是否要进行短路,如果打开了短路器,那么在一段时间内就会直接短路,然后如果在之后第一次检查发现调用成功了,就关闭断路器
C-8. 调用fallback降级机制
在以下几种情况中,hystrix会调用fallback降级机制:
- run()或construct()抛出一个异常
- 短路器打开
- 线程池/队列/semaphore满了
- command执行超时了
在 HystrixCommand 中,上线 getFallback() 方法,可以提供降级机制 , 返回一个 Observable 对象,
在 HystirxObservableCommand 中,实现一个 resumeWithFallback() 方法,返回一个 Observable 对象,可以提供降级结果
C-9. 调用成功后返回的结果
execute() |
获取一个Future.get(),然后拿到单个结果 |
|---|---|
queue() |
返回一个Future,使用一个get()获取结果 |
observer() |
立即订阅Observable,然后启动8大执行步骤,返回一个拷贝的Observable,订阅时理解回调给你结果 |
toObservable() |
返回一个原始的Observable,必须手动订阅才会去执行8大步骤 |
