什么是Semaphore和线程池各自是干什么?
信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可。
线程池用来控制实际工作的线程数量,通过线程复用的方式来减小内存开销。线程池可同时工作的线程数量是一定的,超过该数量的线程需进入线程队列等待,直到有可用的工作线程来执行任务。
使用Seamphore,你创建了多少线程,实际就会有多少线程进行执行,只是可同时执行的线程数量会受到限制。但使用线程池,你创建的线程只是作为任务提交给线程池执行,实际工作的线程由线程池创建,并且实际工作的线程数量由线程池自己管理。
简单来说,线程池实际工作的线程是work线程,不是你自己创建的,是由线程池创建的,并由线程池自动控制实际并发的work线程数量。而Seamphore相当于一个信号灯,作用是对线程做限流,Seamphore可以对你自己创建的的线程做限流(也可以对线程池的work线程做限流),Seamphore的限流必须通过手动acquire和release来实现。
两者区别
1、实际工作的线程是谁创建的?
使用线程池,实际工作线程由线程池创建;使用Seamphore,实际工作的线程由你自己创建。
2、限流是否自动实现?
线程池自动,Seamphore手动。
线程池隔离
在该模式下,用户请求会被提交到各自的线程池中执行,把执行每个下游服务的线程分离,从而达到资源隔离的作用。当线程池来不及处理并且请求队列塞满时,新进来的请求将快速失败,可以避免依赖问题扩散。
在信号量模式提到的问题,对所依赖的多个下游服务,通过线程池的异步执行,可以有效的提高接口性能。
优势
- 减少所依赖服务发生故障时的影响面,比如ServiceA服务发生异常,导致请求大量超时,对应的线程池被打满,这时并不影响ServiceB、ServiceC的调用。
- 如果接口性能有变动,可以方便的动态调整线程池的参数或者是超时时间,前提是Hystrix参数实现了动态调整。
缺点
- 请求在线程池中执行,肯定会带来任务调度、排队和上下文切换带来的开销。
- 因为涉及到跨线程,那么就存在ThreadLocal数据的传递问题,比如在主线程初始化的ThreadLocal变量,在线程池线程中无法获取
执行依赖代码的线程与请求线程(比如Tomcat线程)分离,请求线程可以自由控制离开的时间,这也是我们通常说的异步编程,Hystrix是结合RxJava来实现的异步编程,通过为每个包裹了HystrixCommand的API接口设置独立的、固定大小的线程池(hystrx.threadpool.default.coreSize)来控制并发访问量,当线程饱和的时候可以拒绝服务(走fallback方法),防止依赖问题扩散。
线上建议线程池不要设置过大,否则大量堵塞线程有可能会拖慢服务器。
信号量模式
在该模式下,接受请求和执行下游依赖在同一个线程内完成,不存在线程上下文切换所带来的性能开销,所以大部分场景应该选择信号量模式,但是在下面这种情况下,信号量默认并非是一个好的选择。
比如一个接口中依赖了3个下游 :serviceA、serviceB、serviceC,且这3个服务返回的数据互相不依赖,这种情况下如果针对A、B、C的熔断降级使用信号量模式,那么接口耗时就等于请求A、B、C服务耗时的总和,无疑这不是好的方案。
另外,为了限制对下游依赖的并发调用量,可以配置Hystrix的execution.isolation.semaphore.maxConcurrentRequests ,当并发请求数达到阈值时,请求线程可以快速失败,执行降级。
实现也很简单,一个简单的计数器,当请求进入熔断器时,执行tryAcquire()
,计数器加1,结果大于阈值的话,就返回false,发生信号量拒绝事件,执行降级逻辑。当请求离开熔断器时,执行release()
,计数器减1。
实例
(1)采用线程池隔离
//------------------线程隔离
//groupKey 一组command ,如果没有配这个,相同的groupkey会使用同一个线程池
@HystrixCommand(groupKey = "thread1-group",commandKey = "thread1",threadPoolKey = "thread1",commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "30000")
},threadPoolProperties = {
@HystrixProperty(name = "coreSize",value = "3"),
@HystrixProperty(name = "maxQueueSize",value = "5")
})
public void thread1(){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println(111);
}
System.out.println(Thread.currentThread().getName());
}
(2)采用信号量隔离
//信号量隔离
@HystrixCommand(fallbackMethod = "降级方法名xxx",commandProperties = {
@HystrixProperty(name = "execution.isolation.strategy",value = "SEMAPHORE"), //开启信号量隔离策略
@HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "10")//设置信号量隔离允许的最大并发数,默认为10
})
public String xxx(@PathVariable("id") Integer id){
// 方法体
}