code.7z

一、问题描述

在使用 open-feign 作为 RPC 调用组件,并开启 hystrix 支持(feign.hystrix.enabled=true)时,会出现 请求头丢失的问题。

1.1、问题原因

《Feign Hystrix Support》

假设当前存在两个服务 feign-clientfeign-server ,现在有一次请求如下:
02.png
本次请求由 request1 + request2 组成。request1request2 属于两个不同的请求,在某些业务需求中需要将 request1 header 头部中部分信息通过 request2 进行传递。

此时进行传递的方式有两种

  • 直接通过方法参数进行传递(不优雅,甚至是繁琐)
  • 通过统一的方式进行设置,借助 Feign 提供了过滤器 RequestInterceptor

    在发起 HTTP 请求前, Feign 会先执行所有的 RequestInterceptor#apply 方法

使用 RequestInterceptor 进行 header 参数统一透传处理相关代码

  1. /**
  2. * <p> Feign Interceptor </p>
  3. *
  4. * @Author 彳失口亍
  5. */
  6. public class FeignInterceptor implements RequestInterceptor {
  7. private Logger logger = LoggerFactory.getLogger(FeignInterceptor.class);
  8. @Override
  9. public void apply(RequestTemplate requestTemplate) {
  10. ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
  11. .getRequestAttributes();
  12. if (attributes == null) {
  13. logger.info("[新的线程查询不到上下文信息]");
  14. return;
  15. }
  16. // 从 ThreadLocal 中获取 Request 信息
  17. HttpServletRequest request = attributes.getRequest();
  18. String token= request.getHeader("token");
  19. if (!StringUtils.isEmpty(token)) {
  20. requestTemplate.header("token", token);
  21. }
  22. }
  23. }

在 feign 开启了 hystrix 支持,request2 执行时序图如下
01.png
① HystrixInvocationHandler#invoke 相关代码如下

  1. final class HystrixInvocationHandler implements InvocationHandler {
  2. @Override
  3. public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
  4. HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) {
  5. @Override
  6. protected Object run() throws Exception {
  7. // 调用 SynchronousMethodHandler#invoke
  8. return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
  9. }
  10. }
  11. return hystrixCommand.execute();
  12. }
  13. }

HystrixCommand 中任务的执行由 Future 模式实现的线程池中的线程来完成。

上述代码中 HystrixCommand 中传入的执行任务会被线程池的某个线程执行。

换句话说,request1request2 的请求是在不同线程进行处理的,request2 是一个新发起的请求,并且在一个新的线程中,无法共享 reqeust1 ThreadLocal 中相关的数据。

所以在使用 feign 进行 RPC 调用的时候, request1 中 head 信息无法通过 RequestInterceptor 的方式统一进行传递。

1.2、扩展:为什么开启 feign hystrix 支持 链路追踪信息能够通过 header 进行透传。

《Spring Cloud Sleuth 原理-feign埋点》

在测试过程中,发现 head 参数无法透传,但是 sleuth 链路信息却能够进行传递。
03.png
通过源码 TracingFeignClient#client 能够看出端疑

  1. final class TracingFeignClient implements Client {
  2. @Override
  3. public Response execute(Request request, Request.Options options) throws IOException {
  4. Map<String, Collection<String>> headers = new HashMap<>(request.headers());
  5. // span 创建
  6. Span span = handleSend(headers, request, null);
  7. Response response = null;
  8. Throwable error = null;
  9. try (Tracer.SpanInScope ws = this.tracer.withSpanInScope(span)) {
  10. // modifiedRequest(request, headers) 构建新的请求参数
  11. response = this.delegate.execute(modifiedRequest(request, headers), options);
  12. return response;
  13. }
  14. catch (IOException | RuntimeException | Error e) { ...... }
  15. finally {
  16. handleReceive(span, response, error);
  17. if (log.isDebugEnabled()) {
  18. log.debug("Handled receive of " + span);
  19. }
  20. }
  21. }
  22. }

span 相关 内容是在 Feign Client 中才进行构建的,并不是从上一个请求 ThreadLocal 中获取的。

二、解决 RequestInterceptor 无法统一透传

问题本质:feign 开启 hystrix 功能,而 hystrix 默认使用线程隔离,所以feign 发起信息的请求必定是一个新的线程来发起请求处理。

方案一:修改 hystrix 隔离策略-信号量(不推荐)

《Hystrix 线程隔离和信号量隔离》

直接修改配置文件增加配置

hystrix 配置

  1. #hystrix 隔离策略-信号量隔离
  2. hystrix.command.default.execution.isolation.strategy = SEMAPHORE

方案二:自定义 HystrixConcurrencyStrategy

Hystrix 线程隔离为:线程隔离

分析

Feign 开启 Hystrix 发起 RPC 调用时,Hystrix 通过 HystrixCommand 包装了 RPC 调用,然后从其中的线程池中获取一个线程进行执行操作,线程池的从 HystrixConcurrencyStrategy 中获取,

Feign RPC 调用逻辑由 HystrixConcurrencyStrategy#wrapCallable 封装成 Callable<Void>,然后借由 HystrixContextRunnable#run 执行调用,HystrixContextRunnable 相关代码如下:

  1. public class HystrixContextRunnable implements Runnable {
  2. private final Callable<Void> actual;
  3. private final HystrixRequestContext parentThreadState;
  4. public HystrixContextRunnable(Runnable actual) {
  5. this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
  6. }
  7. public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
  8. this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {
  9. @Override
  10. public Void call() throws Exception {
  11. actual.run();
  12. return null;
  13. }
  14. });
  15. this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
  16. }
  17. @Override
  18. public void run() {
  19. HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
  20. try {
  21. // set the state of this thread to that of its parent
  22. HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
  23. // execute actual Callable with the state of the parent
  24. try {
  25. actual.call();
  26. } catch (Exception e) {
  27. throw new RuntimeException(e);
  28. }
  29. } finally {
  30. // restore this thread back to its original state
  31. HystrixRequestContext.setContextOnCurrentThread(existingState);
  32. }
  33. }
  34. }

通过自定义 HystrixConcurrencyStrategy 解决透传的问题。

参考:Spring Cloud Sleuth 实现 SleuthHystrixConcurrencyStrategySleuthHystrixConcurrencyStrategy 相关代码如下:

  1. public class SleuthHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
  2. private final Tracing tracing;
  3. private final SpanNamer spanNamer;
  4. // 被装饰者
  5. private HystrixConcurrencyStrategy delegate;
  6. @Override
  7. public <T> Callable<T> wrapCallable(Callable<T> callable) {
  8. if (callable instanceof TraceCallable) {
  9. return callable;
  10. }
  11. Callable<T> wrappedCallable = this.delegate != null
  12. ? this.delegate.wrapCallable(callable) : callable;
  13. if (wrappedCallable instanceof TraceCallable) {
  14. return wrappedCallable;
  15. }
  16. return new TraceCallable<>(this.tracing, this.spanNamer, wrappedCallable,
  17. HYSTRIX_COMPONENT);
  18. }
  19. }

上述方法 SleuthHystrixConcurrencyStrategy#wrapCallable 用来构建线程执行的 Runnable(HystrixContextRunnable) 逻辑模块的。所以在执行该操作时,并未切换线程,此时就可以在这里作文章,
在线程执行逻辑单元中,把 request1 中需要被传递的信息,直接赋值到即将发起 request2 的线程的逻辑执行单元中。

SleuthHystrixConcurrencyStrategy 使用了装饰者模式,SleuthHystrixConcurrencyStrategy 中构建的逻辑执行单元包裹了一层被装饰者的执行逻辑单元

实现

一个新的类 CustomerFeignHystrixConcurrencyStrategy ,参考SleuthHystrixConcurrencyStrategy 代码,修改其中 的 wrapCallable 方法相关代码如下:

  1. /**
  2. * <p> 自定义 Hystrix 策略 </p>
  3. *
  4. * @Author 彳失口亍
  5. */
  6. public class CustomerFeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
  7. ......
  8. @Override
  9. public <T> Callable<T> wrapCallable(Callable<T> callable) {
  10. RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
  11. return new WrappedCallable<>(callable, requestAttributes);
  12. }
  13. static class WrappedCallable<T> implements Callable<T> {
  14. private final Callable<T> target;
  15. private final RequestAttributes requestAttributes;
  16. public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
  17. this.target = target;
  18. this.requestAttributes = requestAttributes;
  19. }
  20. @Override
  21. public T call() throws Exception {
  22. try {
  23. RequestContextHolder.setRequestAttributes(requestAttributes);
  24. return target.call();
  25. } finally {
  26. RequestContextHolder.resetRequestAttributes();
  27. }
  28. }
  29. }
  30. }

关键在于构建的 WrappedCallable 对象中存放了 request1 线程上下文中的 RequestAttributes 对象,使得 request2 开启的新线程获取到的 RequestAttributesrequest1 中的值,达到参数传递的效果。

上述代码与 SleuthHystrixConcurrencyStrategy 组成的线程逻辑单元结构如下:
04.png

SleuthHystrixConcurrencyStrategy 中的 Callable 和 CustomerFeignHystrixConcurrencyStrategy 中的 WrappedCallable 也是装饰者模式。

使用

05.png
在 feign client 进行配置即可。