什么是Semaphore和线程池各自是干什么?

信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可。

 线程池用来控制实际工作的线程数量,通过线程复用的方式来减小内存开销。线程池可同时工作的线程数量是一定的,超过该数量的线程需进入线程队列等待,直到有可用的工作线程来执行任务。

  使用Seamphore,你创建了多少线程,实际就会有多少线程进行执行,只是可同时执行的线程数量会受到限制。但使用线程池,你创建的线程只是作为任务提交给线程池执行,实际工作的线程由线程池创建,并且实际工作的线程数量由线程池自己管理。

  简单来说,线程池实际工作的线程是work线程,不是你自己创建的,是由线程池创建的,并由线程池自动控制实际并发的work线程数量。而Seamphore相当于一个信号灯,作用是对线程做限流,Seamphore可以对你自己创建的的线程做限流(也可以对线程池的work线程做限流),Seamphore的限流必须通过手动acquire和release来实现。

两者区别

image.png
1、实际工作的线程是谁创建的?
使用线程池,实际工作线程由线程池创建;使用Seamphore,实际工作的线程由你自己创建。
2、限流是否自动实现?
线程池自动,Seamphore手动。

线程池隔离

  在该模式下,用户请求会被提交到各自的线程池中执行,把执行每个下游服务的线程分离,从而达到资源隔离的作用。当线程池来不及处理并且请求队列塞满时,新进来的请求将快速失败,可以避免依赖问题扩散。
  在信号量模式提到的问题,对所依赖的多个下游服务,通过线程池的异步执行,可以有效的提高接口性能。

优势

  • 减少所依赖服务发生故障时的影响面,比如ServiceA服务发生异常,导致请求大量超时,对应的线程池被打满,这时并不影响ServiceB、ServiceC的调用。
  • 如果接口性能有变动,可以方便的动态调整线程池的参数或者是超时时间,前提是Hystrix参数实现了动态调整。

缺点

  • 请求在线程池中执行,肯定会带来任务调度、排队和上下文切换带来的开销。
  • 因为涉及到跨线程,那么就存在ThreadLocal数据的传递问题,比如在主线程初始化的ThreadLocal变量,在线程池线程中无法获取

执行依赖代码的线程与请求线程(比如Tomcat线程)分离,请求线程可以自由控制离开的时间,这也是我们通常说的异步编程,Hystrix是结合RxJava来实现的异步编程,通过为每个包裹了HystrixCommand的API接口设置独立的、固定大小的线程池(hystrx.threadpool.default.coreSize)来控制并发访问量,当线程饱和的时候可以拒绝服务(走fallback方法),防止依赖问题扩散。
线上建议线程池不要设置过大,否则大量堵塞线程有可能会拖慢服务器。

信号量模式

在该模式下,接受请求和执行下游依赖在同一个线程内完成,不存在线程上下文切换所带来的性能开销,所以大部分场景应该选择信号量模式,但是在下面这种情况下,信号量默认并非是一个好的选择。

比如一个接口中依赖了3个下游 :serviceA、serviceB、serviceC,且这3个服务返回的数据互相不依赖,这种情况下如果针对A、B、C的熔断降级使用信号量模式,那么接口耗时就等于请求A、B、C服务耗时的总和,无疑这不是好的方案。

另外,为了限制对下游依赖的并发调用量,可以配置Hystrix的execution.isolation.semaphore.maxConcurrentRequests ,当并发请求数达到阈值时,请求线程可以快速失败,执行降级。

实现也很简单,一个简单的计数器,当请求进入熔断器时,执行tryAcquire(),计数器加1,结果大于阈值的话,就返回false,发生信号量拒绝事件,执行降级逻辑。当请求离开熔断器时,执行release(),计数器减1。

实例

(1)采用线程池隔离

  1. //------------------线程隔离
  2. //groupKey 一组command ,如果没有配这个,相同的groupkey会使用同一个线程池
  3. @HystrixCommand(groupKey = "thread1-group",commandKey = "thread1",threadPoolKey = "thread1",commandProperties = {
  4. @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "30000")
  5. },threadPoolProperties = {
  6. @HystrixProperty(name = "coreSize",value = "3"),
  7. @HystrixProperty(name = "maxQueueSize",value = "5")
  8. })
  9. public void thread1(){
  10. try {
  11. Thread.sleep(10000);
  12. } catch (InterruptedException e) {
  13. System.out.println(111);
  14. }
  15. System.out.println(Thread.currentThread().getName());
  16. }

(2)采用信号量隔离

  1. //信号量隔离
  2. @HystrixCommand(fallbackMethod = "降级方法名xxx",commandProperties = {
  3. @HystrixProperty(name = "execution.isolation.strategy",value = "SEMAPHORE"), //开启信号量隔离策略
  4. @HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests", value = "10")//设置信号量隔离允许的最大并发数,默认为10
  5. })
  6. public String xxx(@PathVariable("id") Integer id){
  7. // 方法体
  8. }