参考: https://zhuanlan.zhihu.com/p/360172630
https://www.cnblogs.com/luliang888/p/14440118.html

概览

CompletableFuture是JDK8对Future接口的增强

  • 提供了函数式编程写法, 使代码简练, 语义清晰
  • 默认使用forkJoinPool线程池, 无需手工维护线程
  • completionStage接口, 提供了异步线程编排的能力, 支持链式编程

函数式接口

有且仅有一个抽象方法的接口为函数式接口, 可以使用@FunctionalInterface标识, 函数式接口能适用于函数式编程场景(Lambda).

常用的函数式接口

接口 说明
Runnable 无参无返回值
Fuction 接收T类型参数, 返回R类型结果
Consumer 接收T类型参数, 无返回值
Predicate 接收T类型参数, 返回boolean类型结果
Supplier 无参, 返回T类型的结果

ComletableFuture使用

接口概览

CompletableFuture最佳实践 - 图1

创建CompletableFuture对象

  1. //使用默认线程池
  2. static CompletableFuture<Void> runAsync(Runnable runnable)
  3. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  4. //可以指定线程池
  5. static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  6. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • 带Async后缀的方法为异步执行, 下同
  • Runnable接口无返回值, 而Supplier接口的get()是有返回值的
  • 默认会使用 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数
  • 如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议根据不同的业务类型创建不同的线程池,以避免互相干扰

使用CompletionStage编排异步任务

串行关系

  1. CompletionStage<R> thenApply(fn);
  2. CompletionStage<R> thenApplyAsync(fn);
  3. CompletionStage<Void> thenAccept(consumer);
  4. CompletionStage<Void> thenAcceptAsync(consumer);
  5. CompletionStage<Void> thenRun(action);
  6. CompletionStage<Void> thenRunAsync(action);
  7. CompletionStage<R> thenCompose(fn);
  8. CompletionStage<R> thenComposeAsync(fn);
  • 使用上的区别为方法参数的函数式接口, 是否接收参数, 以及是否有返回值
  • thenApply与thenCompose区别

    • thenApply转换的是泛型中的类型, 相当于将CompletableFuture 转换生成新的CompletableFuture
    • thenCompose用来连接两个CompletableFuture,生成一个新的CompletableFuture

      1. public void serial(){
      2. CompletableFuture<String> f0 = CompletableFuture.supplyAsync(() -> "hello!");
      3. CompletableFuture<String> f1 = f0.thenApply(str -> str + "world!");
      4. System.out.println(f1.join()); // hello!world!
      5. CompletableFuture<String> f2 = f0
      6. .thenCompose(str -> CompletableFuture.supplyAsync(() -> str + "world!"));
      7. System.out.println(f2.join()); // hello!world!
      8. }

汇聚关系

AND
  1. CompletionStage<R> thenCombine(other, fn);
  2. CompletionStage<R> thenCombineAsync(other, fn);
  3. CompletionStage<Void> thenAcceptBoth(other, consumer);
  4. CompletionStage<Void> thenAcceptBothAsync(other, consumer);
  5. CompletionStage<Void> runAfterBoth(other, action);
  6. CompletionStage<Void> runAfterBothAsync(other, action);
  • 主要区别为方法参数函数式接口的不同
    1. public void and(){
    2. CompletableFuture<String> f0 = CompletableFuture.supplyAsync(() -> "es recall!");
    3. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "sm recall!");
    4. String res = f0.thenCombine(f1, (str1, str2) -> str1 + " & " + str2).join();
    5. System.out.println(res); // es recall! & sm recall!
    6. }

OR
  1. CompletionStage applyToEither(other, fn);
  2. CompletionStage applyToEitherAsync(other, fn);
  3. CompletionStage acceptEither(other, consumer);
  4. CompletionStage acceptEitherAsync(other, consumer);
  5. CompletionStage runAfterEither(other, action);
  6. CompletionStage runAfterEitherAsync(other, action);
  • 主要区别为方法参数函数式接口的不同
  • OR在语义上理解为”最快返回”, 拿最快返回的结果作为下一次任务的输入

    1. public void or() {
    2. CompletableFuture<String> f0 = CompletableFuture.supplyAsync(() -> {
    3. try {
    4. Thread.sleep(random.nextInt(3));
    5. } catch (InterruptedException e) {
    6. e.printStackTrace();
    7. }
    8. return "es recall!";
    9. });
    10. CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    11. try {
    12. Thread.sleep(random.nextInt(3));
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. return "sm recall!";
    17. });
    18. String res = f0.applyToEitherAsync(f1, str -> "fastest: " + str).join();
    19. System.out.println(res); // fastest: es recall! 或者 fastest: sm recall!
    20. }

异常处理

  1. CompletionStage exceptionally(fn);
  • 相当于try{}catch{}中的catch{}, 支持链式编程
    1. public void exception(){
    2. CompletableFuture<Object> f0 = CompletableFuture.supplyAsync(() -> {
    3. throw new RuntimeException("hello!exception!");
    4. }).exceptionally(e -> {
    5. e.printStackTrace();
    6. return "exception happened";
    7. });
    8. System.out.println(f0.join()); //exception happened
    9. }

任务结束

  1. CompletionStage<R> whenComplete(consumer);
  2. CompletionStage<R> whenCompleteAsync(consumer);
  3. CompletionStage<R> handle(fn);
  4. CompletionStage<R> handleAsync(fn);
  • 相当于finally{}, 无论异常与否, 都会执行 consumer/fn 回调函数
    1. public void complete() {
    2. CompletableFuture<Object> f0 = CompletableFuture.supplyAsync(() -> {
    3. throw new RuntimeException("hello!exception!");
    4. }).whenComplete((str, e) -> {
    5. if (e != null) {
    6. e.printStackTrace();
    7. } else {
    8. System.out.println(str);
    9. }
    10. });
    11. }

超时

  1. /**
  2. * java8中CompletableFuture异步处理超时的方法
  3. * <p>
  4. * Java 8 的 CompletableFuture 并没有 timeout 机制,虽然可以在 get 的时候指定 timeout,是一个同步堵塞的操作。怎样让 timeout 也是异步的呢?Java 8 内有内建的机
  5. * 制支持,一般的实现方案是启动一个 ScheduledThreadpoolExecutor 线程在 timeout 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException()),
  6. * 然后用acceptEither() 或者 applyToEither 看是先计算完成还是先超时:
  7. * <p>
  8. * 在 java 9 引入了 orTimeout 和 completeOnTimeOut 两个方法支持 异步 timeout 机制:
  9. * <p>
  10. * public CompletableFuture orTimeout(long timeout, TimeUnit unit) : completes the CompletableFuture with a TimeoutException after the specified timeout has elapsed.
  11. * public CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit) : provides a default value in the case that the CompletableFuture pipeline times out.
  12. * 内部实现上跟我们上面的实现方案是一模一样的,只是现在不需要自己实现了。
  13. * <p>
  14. * 实际上hystrix等熔断的框架,其实现线程Timeout之后就关闭线程,也是基于同样的道理,所以我们可以看到hystrix中会有一个Timer Thread
  15. * 参考: https://www.cnblogs.com/luliang888/p/14440118.html
  16. *
  17. * @author xinzhang
  18. * @version 2022/5/10
  19. */
  20. public class CompletableFutureTimeout<T> {
  21. /**
  22. * Singleton delay scheduler, used only for starting and * cancelling tasks.
  23. */
  24. static final class Delayer {
  25. static ScheduledFuture<?> delay(Runnable command, long delay,
  26. TimeUnit unit) {
  27. return delayer.schedule(command, delay, unit);
  28. }
  29. static final class DaemonThreadFactory implements ThreadFactory {
  30. @Override
  31. public Thread newThread(Runnable r) {
  32. Thread t = new Thread(r);
  33. t.setDaemon(true);
  34. t.setName("CompletableFutureDelayScheduler");
  35. return t;
  36. }
  37. }
  38. static final ScheduledThreadPoolExecutor delayer;
  39. static {
  40. (delayer = new ScheduledThreadPoolExecutor(
  41. 1, new CompletableFutureTimeout.Delayer.DaemonThreadFactory())).
  42. setRemoveOnCancelPolicy(true);
  43. }
  44. }
  45. /**
  46. * timeout时间后抛出TimeoutException
  47. */
  48. private static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
  49. CompletableFuture<T> result = new CompletableFuture<T>();
  50. CompletableFutureTimeout.Delayer.delayer
  51. .schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
  52. return result;
  53. }
  54. /**
  55. * future执行超时返回默认值
  56. */
  57. public static <T> CompletableFuture<T> completeOnTimeout(T t, CompletableFuture<T> future, long timeout,
  58. TimeUnit unit) {
  59. final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
  60. return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t);
  61. }
  62. /**
  63. * future执行超时抛出异常
  64. */
  65. public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
  66. final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
  67. return future.applyToEither(timeoutFuture, Function.identity());
  68. }
  69. }

使用示例

  1. public void completeOnTimeout() {
  2. CompletableFuture<String> f0 = CompletableFuture.supplyAsync(() -> {
  3. try {
  4. Thread.sleep(2000);
  5. } catch (InterruptedException e) {
  6. e.printStackTrace();
  7. }
  8. return "executing 2000 ms";
  9. });
  10. CompletableFuture<String> within = CompletableFutureTimeout
  11. .completeOnTimeout("timeout default value", f0, 1, TimeUnit.SECONDS);
  12. System.out.println(within.join()); //timeout default value
  13. }

其他

  1. // 显式返回执行异常
  2. boolean completeExceptionally(Throwable ex)
  3. // 全部完成
  4. CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
  5. // 其中一个完成即返回
  6. CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

使用CompletionService批量执行异步任务

CompletionService可以用于大量独立同构任务的异步批量执行, 可以submit提交任务, 通过take/poll获取任务Future结果
接口概览:

  1. Future<V> submit(Callable<V> task);
  2. Future<V> submit(Runnable task, V result);
  3. Future<V> take() throws InterruptedException;
  4. Future<V> poll();
  5. Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
  • 提交的任务互相独立执行, 谁先完成先返回
  • take()、poll() 都是从阻塞队列中获取并移除一个元素; 它们的区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值

CompletionService接口的实现是ExecutorCompletionService, 其实现原理是其内部维护了一个阻塞队列, 该阻塞队列用来保存任务执行结果的Future对象
ExecutorCompletionService构造方法

  1. // 如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue
  2. ExecutorCompletionService(Executor executor)
  3. ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)

示例

  1. private CompletionService<String> completionService = new ExecutorCompletionService<>(
  2. Executors.newFixedThreadPool(3));
  3. public void batchExecute() throws ExecutionException, InterruptedException {
  4. completionService.submit(() -> "hello!");
  5. completionService.submit(() -> "world!");
  6. completionService.submit(() -> "nice!");
  7. for (int i = 0; i < 3; i++) {
  8. System.out.println(completionService.take().get());
  9. }
  10. }

高级主题

线程池配置

  1. /**
  2. * ThreadPoolConfig
  3. *
  4. * @author xinzhang
  5. * @version 2022/5/10
  6. */
  7. @Configuration
  8. public class ThreadPoolConfig {
  9. @Bean
  10. public ThreadPoolExecutorFactoryBean bizExecutor() {
  11. ThreadPoolExecutorFactoryBean factoryBean = new ThreadPoolExecutorFactoryBean();
  12. // 核心线程数,一直存活
  13. factoryBean.setCorePoolSize(5);
  14. // 当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。
  15. // 如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
  16. factoryBean.setMaxPoolSize(10);
  17. // 任务队列容量
  18. factoryBean.setQueueCapacity(20);
  19. // 当线程空闲时间达到setKeepAliveSeconds,该线程会退出,直到线程数量等于corePoolSize。
  20. factoryBean.setKeepAliveSeconds(60);
  21. factoryBean.setThreadNamePrefix("biz-task");
  22. //(1) 默认的ThreadPoolExecutor.AbortPolicy 处理程序遭到拒绝将抛出运行时RejectedExecutionException;
  23. //(2) ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
  24. //(3) ThreadPoolExecutor.DiscardPolicy 不能执行的任务将被删除;
  25. //(4) ThreadPoolExecutor.DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
  26. factoryBean.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  27. return factoryBean;
  28. }
  29. }
  • 线程数量
    • CPU密集型: 多线程本质上是提升多核 CPU 的利用率, 理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率
    • I/O密集型: 最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
  • 不建议使用 Java 并发包中的静态工厂类Executors , 原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。
  • 默认拒绝策略要慎重使用, 默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略, 在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
  • 注意异常处理, 如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。最稳妥和简单的方案还是捕获所有异常并按需处理, 如下示例代码 ```java

try { //业务逻辑 } catch (RuntimeException x) { //按需处理 } catch (Throwable x) { //按需处理 }

  1. 使用示例
  2. ```java
  3. @Autowired
  4. private ExecutorService bizExecutor;
  5. public void runWithExecutor() {
  6. for (int i = 0; i < 10; i++) {
  7. CompletableFuture.runAsync(() -> {
  8. System.out.println(Thread.currentThread().getName()+"===========>hello!world!");
  9. }, bizExecutor);
  10. }
  11. }

上下文

TransmittableThreadLocal

建议阅读: https://github.com/alibaba/transmittable-thread-local

TransmittableThreadLocal(TTL):在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题

原理浅析: https://cloud.tencent.com/developer/article/1484420

使用方式同传统的ThreadLocal, 父子线程传值示例:

  1. public void transferCtx() {
  2. TransmittableThreadLocal<String> ctx = new TransmittableThreadLocal<>();
  3. ctx.set("hello!world!");
  4. new Thread(() -> System.out.println(ctx.get())).run(); // hello!world!
  5. }

具体在CompletableFuture使用场景上:

方案一: 使用Java Agent修饰JDK线程池实现类

在Java的启动参数加上:-javaagent:path/to/transmittable-thread-local-2.x.y.jar

方案二(推荐): CompletableFuture使用自定义线程池, 并使用TtlExecutors修饰
  1. @Configuration
  2. public class ThreadPoolConfig {
  3. @Bean
  4. public ThreadPoolExecutorFactoryBean bizExecutor() {
  5. ThreadPoolExecutorFactoryBean factoryBean = new ThreadPoolExecutorFactoryBean();
  6. // 核心线程数,一直存活
  7. factoryBean.setCorePoolSize(5);
  8. // 当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。
  9. // 如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。
  10. factoryBean.setMaxPoolSize(10);
  11. // 任务队列容量
  12. factoryBean.setQueueCapacity(20);
  13. // 当线程空闲时间达到setKeepAliveSeconds,该线程会退出,直到线程数量等于corePoolSize。
  14. factoryBean.setKeepAliveSeconds(60);
  15. factoryBean.setThreadNamePrefix("biz-task");
  16. //(1) 默认的ThreadPoolExecutor.AbortPolicy 处理程序遭到拒绝将抛出运行时RejectedExecutionException;
  17. //(2) ThreadPoolExecutor.CallerRunsPolicy 线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
  18. //(3) ThreadPoolExecutor.DiscardPolicy 不能执行的任务将被删除;
  19. //(4) ThreadPoolExecutor.DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
  20. factoryBean.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  21. return factoryBean;
  22. }
  23. @Bean
  24. public ExecutorService ttlExecutor(ExecutorService bizExecutor) {
  25. return TtlExecutors.getTtlExecutorService(bizExecutor);
  26. }
  27. }

使用示例

  1. @Autowired
  2. private ExecutorService ttlExecutor;
  3. public String decorateExecutor() {
  4. TransmittableThreadLocal<Long> ctx = new TransmittableThreadLocal<>();
  5. try {
  6. Thread.sleep(100);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. ctx.set(System.currentTimeMillis());
  11. return CompletableFuture.supplyAsync(() -> Thread.currentThread().getName() + "=====>" + ctx.get(),
  12. ttlExecutor).join();
  13. }

测试

  1. @Test
  2. public void testDecorateExecutor() {
  3. for (int i = 0; i < 10; i++) {
  4. System.out.println(demo.decorateExecutor());
  5. }
  6. }
  7. ==========================
  8. 15:32:55.649 DEBUG [main] o.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@6f1d799 size = 1, maxSize = 32, parentContextCount = 0, hitCount = 2, missCount = 1]
  9. biz-task1=====>1652167976179
  10. biz-task2=====>1652167976279
  11. biz-task3=====>1652167976379
  12. biz-task4=====>1652167976489
  13. biz-task5=====>1652167976599
  14. biz-task1=====>1652167976699
  15. biz-task2=====>1652167976809
  16. biz-task3=====>1652167976919
  17. biz-task4=====>1652167977019
  18. biz-task5=====>1652167977129

Logback的MDC

方案一: logback-mdc-ttl

建议阅读: https://github.com/ofpay/logback-mdc-ttl

实现上集成使用了Transmittable ThreadLocal(TTL) :在使用线程池等会缓存线程的组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。支持JDK 9/8/7/6。

方案二(推荐): 实现MDCAdaper接口, 使用TransmittableThreadLocal替换默认InheritableThreadLocal实现
  • TtlMDCAdapter仅将2.4.0版本logback的BasicMDCAdapter中的InheritableThreadLocal替换为TransmittableThreadLocal ```java package org.slf4j;

import com.alibaba.ttl.TransmittableThreadLocal; import java.util.HashMap; import java.util.Map; import java.util.Set; import org.slf4j.spi.MDCAdapter;

/**

  • TtlMDCAdapter *
  • @author xinzhang
  • @version 2022/5/10 */ public class TtlMDCAdapter implements MDCAdapter {

    private TransmittableThreadLocal> transmittableThreadLocal = new TransmittableThreadLocal>() {

    1. @Override
    2. protected Map<String, String> childValue(Map<String, String> parentValue) {
    3. return parentValue == null ? null : new HashMap<>(parentValue);
    4. }

    };

    public TtlMDCAdapter() { }

    @Override public void put(String key, String val) {

    1. if (key == null) {
    2. throw new IllegalArgumentException("key cannot be null");
    3. } else {
    4. Map<String, String> map = this.transmittableThreadLocal.get();
    5. if (map == null) {
    6. map = new HashMap<>();
    7. this.transmittableThreadLocal.set(map);
    8. }
    9. map.put(key, val);
    10. }

    }

    @Override public String get(String key) {

    1. Map<String, String> map = this.transmittableThreadLocal.get();
    2. return map != null && key != null ? map.get(key) : null;

    }

    @Override public void remove(String key) {

    1. Map<String, String> map = this.transmittableThreadLocal.get();
    2. if (map != null) {
    3. map.remove(key);
    4. }

    }

    @Override public void clear() {

    1. Map<String, String> map = this.transmittableThreadLocal.get();
    2. if (map != null) {
    3. map.clear();
    4. this.transmittableThreadLocal.remove();
    5. }

    }

    public Set getKeys() {

    1. Map<String, String> map = this.transmittableThreadLocal.get();
    2. return map != null ? map.keySet() : null;

    }

    @Override public Map getCopyOfContextMap() {

    1. Map<String, String> oldMap = this.transmittableThreadLocal.get();
    2. return oldMap != null ? new HashMap<>(oldMap) : null;

    }

    @Override public void setContextMap(Map contextMap) {

    1. this.transmittableThreadLocal.set(new HashMap<>(contextMap));

    } }

  1. - 实例化, **注意包结构固定为org.slf4j.impl**, 通过MDC#bwCompatibleGetMDCAdapterFromBinder()实例化
  2. ```java
  3. package org.slf4j.impl;
  4. import org.slf4j.TtlMDCAdapter;
  5. import org.slf4j.spi.MDCAdapter;
  6. /**
  7. * StaticMDCBinder
  8. *
  9. * @author xinzhang
  10. * @version 2022/5/10
  11. */
  12. public class StaticMDCBinder {
  13. public static final StaticMDCBinder SINGLETON = new StaticMDCBinder();
  14. private StaticMDCBinder() {
  15. }
  16. public MDCAdapter getMDCA() {
  17. return new TtlMDCAdapter();
  18. }
  19. public String getMDCAdapterClassStr() {
  20. return TtlMDCAdapter.class.getName();
  21. }
  22. }

使用示例

  1. @Autowired
  2. private ExecutorService ttlExecutor;
  3. public String transferMDC() {
  4. try {
  5. Thread.sleep(100);
  6. } catch (InterruptedException e) {
  7. e.printStackTrace();
  8. }
  9. MDC.put("time", String.valueOf(System.currentTimeMillis()));
  10. return CompletableFuture.supplyAsync(() -> Thread.currentThread().getName() + "=====>" + MDC.get("time")
  11. ,ttlExecutor).join();
  12. }

测试

  1. @Test
  2. public void testTransferMDC() {
  3. for (int i = 0; i < 10; i++) {
  4. System.out.println(demo.transferMDC());
  5. }
  6. }
  7. =========================
  8. biz-task1=====>1652172234438
  9. biz-task2=====>1652172234548
  10. biz-task3=====>1652172234658
  11. biz-task4=====>1652172234758
  12. biz-task5=====>1652172234858
  13. biz-task1=====>1652172234968
  14. biz-task2=====>1652172235078
  15. biz-task3=====>1652172235178
  16. biz-task4=====>1652172235288
  17. biz-task5=====>1652172235398

Hystrix线程池隔离模式

参考: https://zhuanlan.zhihu.com/p/273292662
hystrix默认为线程池隔离模式, 会复用线程, 导致上下文传递出现问题
示例
serviceA—>serviceB

  1. 1.feign集成了hystrix, 在配置中打开开关即可, 将核心线程数限制为3
  2. # hystrix
  3. feign.hystrix.enabled=true
  4. hystrix.threadpool.default.coreSize=3
  5. hystrix.threadpool.default.maxQueueSize=50
  6. # 没达到maxQueueSize, 但达到queueSizeRejectionThreshold值, 请求也会被拒绝, 默认为5
  7. hystrix.threadpool.default.queueSizeRejectionThreshold=20
  8. 2.serviceA调用serviceB
  9. (TtlContextTransmittableThreadLocal实现的上下文)
  10. @GetMapping
  11. public void accessServiceB() {
  12. long now = System.currentTimeMillis();
  13. System.out.println(Thread.currentThread().getName() + "设置上下文=======>" + now);
  14. // 设置上下文
  15. TtlContext.put(now);
  16. // 业务操作, 调用serviceB
  17. serviceBFeignClient.hello();
  18. // 请求结束, 清除上下文
  19. TtlContext.clear();
  20. }
  21. 3.serviceBfeignClient设置拦截器, 获取上下文
  22. @Component
  23. public class FeignInterceptor implements RequestInterceptor {
  24. @Override
  25. public void apply(RequestTemplate requestTemplate) {
  26. System.out.println(Thread.currentThread().getName() + "获取上下文=======>" + TtlContext.get());
  27. }
  28. }
  29. 3.jmeter10个并发一次访问serviceA, 使hystrix线程复用, 日志如下:
  30. ===========================可以看到对应不上, hystrix中复用的线程上下文也并未清除
  31. http-nio-9001-exec-2设置上下文=======>1652180438894
  32. http-nio-9001-exec-1设置上下文=======>1652180438894
  33. http-nio-9001-exec-3设置上下文=======>1652180438894
  34. http-nio-9001-exec-4设置上下文=======>1652180438964
  35. http-nio-9001-exec-5设置上下文=======>1652180439064
  36. hystrix-service-b-2获取上下文=======>1652180438894
  37. hystrix-service-b-1获取上下文=======>1652180439064
  38. hystrix-service-b-3获取上下文=======>1652180438964
  39. http-nio-9001-exec-6设置上下文=======>1652180439154
  40. http-nio-9001-exec-7设置上下文=======>1652180439254
  41. http-nio-9001-exec-8设置上下文=======>1652180439354
  42. http-nio-9001-exec-9设置上下文=======>1652180439454
  43. http-nio-9001-exec-10设置上下文=======>1652180439564
  44. hystrix-service-b-1获取上下文=======>1652180439064
  45. hystrix-service-b-2获取上下文=======>1652180438894
  46. hystrix-service-b-3获取上下文=======>1652180438964
  47. hystrix-service-b-2获取上下文=======>1652180438894
  48. hystrix-service-b-3获取上下文=======>1652180438964
  49. 4.再发起单次请求, 可以观察的更明显, hystrix获取的上下文为之前请求遗留的数据
  50. http-nio-9001-exec-1设置上下文=======>1652180593488
  51. hystrix-service-b-1获取上下文=======>1652180439064

方案一(推荐): 使用Hystrix插件机制, 用TtlCallable包装线程
  • 此方案的本质是针对HystrixCommand的run()方法(也就是加了@HystrixCommand注解的业务方法)拦截处理, 但它可能会超时或失败,那么就会去执行fallback方法,如果在 fallback方法中也想共享相关上下文信息,这时就无法覆盖到这种场景了

    1. @Slf4j
    2. @Configuration
    3. public class HystrixPluginConfiguration {
    4. @PostConstruct
    5. public void initHystrixPlugins() {
    6. try {
    7. HystrixConcurrencyStrategy target = new TtlHystrixConcurrencyStrategy();
    8. HystrixConcurrencyStrategy strategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    9. if (strategy instanceof TtlHystrixConcurrencyStrategy) {
    10. return;
    11. }
    12. HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
    13. .getInstance().getCommandExecutionHook();
    14. HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
    15. .getEventNotifier();
    16. HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
    17. .getMetricsPublisher();
    18. HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
    19. .getPropertiesStrategy();
    20. if (log.isDebugEnabled()) {
    21. log.debug("Current Hystrix plugins configuration is ["
    22. + "concurrencyStrategy [" + target + "]," + "eventNotifier ["
    23. + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
    24. + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
    25. log.debug("Registering Ttl Hystrix Concurrency Strategy.");
    26. }
    27. HystrixPlugins.reset();
    28. HystrixPlugins.getInstance().registerConcurrencyStrategy(target);
    29. HystrixPlugins.getInstance()
    30. .registerCommandExecutionHook(commandExecutionHook);
    31. HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
    32. HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
    33. HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
    34. } catch (Exception e) {
    35. log.error("Failed to register Ttl Hystrix Concurrency Strategy", e);
    36. }
    37. }
    38. /**
    39. * 使用TransmittableThreadLocal修饰Callable, 以实现线程池中上下文的正确传递
    40. */
    41. public static class TtlHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    42. @Override
    43. public <T> Callable<T> wrapCallable(Callable<T> callable) {
    44. return TtlCallable.get(callable);
    45. }
    46. }
    47. }

    测试: 同样以10个并发一次访问serviceA, 日志如下, 上下文一一对应 ```java http-nio-9001-exec-3设置上下文=======>1652180858093 http-nio-9001-exec-1设置上下文=======>1652180858093 http-nio-9001-exec-2设置上下文=======>1652180858173 http-nio-9001-exec-4设置上下文=======>1652180858273 hystrix-service-b-1获取上下文=======>1652180858093 hystrix-service-b-3获取上下文=======>1652180858093 hystrix-service-b-2获取上下文=======>1652180858173 http-nio-9001-exec-5设置上下文=======>1652180858376 http-nio-9001-exec-6设置上下文=======>1652180858472 http-nio-9001-exec-8设置上下文=======>1652180858566 http-nio-9001-exec-9设置上下文=======>1652180858673 http-nio-9001-exec-7设置上下文=======>1652180858773 hystrix-service-b-2获取上下文=======>1652180858376 hystrix-service-b-1获取上下文=======>1652180858472 hystrix-service-b-3获取上下文=======>1652180858273 hystrix-service-b-2获取上下文=======>1652180858673 hystrix-service-b-1获取上下文=======>1652180858566 hystrix-service-b-3获取上下文=======>1652180858773 http-nio-9001-exec-10设置上下文=======>1652180858883 hystrix-service-b-3获取上下文=======>1652180858883

再次单次请求, 观察得更清晰 http-nio-9001-exec-1设置上下文=======>1652180929774 hystrix-service-b-1获取上下文=======>1652180929774

  1. <a name="AG8IA"></a>
  2. ##### 方案二: 使用HystrixRequestContext上下文
  3. 参考: [https://www.freesion.com/article/9286656341/](https://www.freesion.com/article/9286656341/)<br />此方案只有使用HystrixContextRunnable或HystrixContextCallable创建线程才能在线程间传递数据, 在这里不过多介绍
  4. <a name="se3sK"></a>
  5. ## 业务案例
  6. <a name="kTwK3"></a>
  7. ### 编排异步任务实现并行召回
  8. 搜索业务有如下的并行召回流程, 下面我们用CompletableFuture模拟实现, 其中关键的mm, es, sm的并行召回<br />![search召回.png](https://cdn.nlark.com/yuque/0/2022/png/281275/1652258714747-af8799ed-eb0b-4347-b198-840600bec769.png#clientId=ua3bb9012-2db7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=401&id=ue1133160&margin=%5Bobject%20Object%5D&name=search%E5%8F%AC%E5%9B%9E.png&originHeight=401&originWidth=771&originalType=binary&ratio=1&rotation=0&showTitle=false&size=22702&status=done&style=none&taskId=udcc1ca54-298f-4b39-bd1a-83a2690ddaa&title=&width=771)
  9. ```java
  10. /**
  11. * Search
  12. *
  13. * @author xinzhang
  14. * @version 2022/5/9
  15. */
  16. @Slf4j
  17. @Component
  18. public class Search {
  19. @Autowired
  20. private ExecutorService ttlExecutor;
  21. public List<Article> recall(String query) {
  22. long start = System.currentTimeMillis();
  23. // 设置UUID作为traceId
  24. TransmittableThreadLocal<String> ctx = new TransmittableThreadLocal<>();
  25. String traceId = UUID.fastUUID().toString();
  26. System.out.println("=====================" + traceId + "===================");
  27. ctx.set(traceId);
  28. CompletableFuture<List<Article>> smRecall = CompletableFuture.supplyAsync(() -> {
  29. // 子线程打印traceId
  30. System.out.println(Thread.currentThread().getName()
  31. + "=====>smRecall executing, timeout<1s, ==>traceId: " + ctx.get());
  32. randomSleepWithIn5Seconds(ctx.get());
  33. return ArticleFactory.randomGenerate(query, "sm");
  34. }, ttlExecutor);
  35. // 设置超时以及异常处理
  36. CompletableFuture<List<Article>> smRecallTimeOut = CompletableFutureTimeout
  37. .orTimeout(smRecall, 1, TimeUnit.SECONDS)
  38. .exceptionally(e -> {
  39. System.out.println(String.format("smRecall failed, e: %s", e.getMessage()));
  40. return Collections.emptyList();
  41. });
  42. CompletableFuture<List<Article>> mmRecall = CompletableFuture.supplyAsync(() -> {
  43. System.out.println(Thread.currentThread().getName()
  44. + "=====> mmRecall executing, timeout<2s, ==>traceId: " + ctx.get());
  45. randomSleepWithIn5Seconds(ctx.get());
  46. return ArticleFactory.randomGenerate(query, "mm");
  47. }, ttlExecutor);
  48. CompletableFuture<List<Article>> mmRecallTimeOut = CompletableFutureTimeout
  49. .orTimeout(mmRecall, 2, TimeUnit.SECONDS)
  50. .exceptionally(e -> {
  51. System.out.println(String.format("mmRecall failed, e: %s", e.getMessage()));
  52. return Collections.emptyList();
  53. });
  54. CompletableFuture<List<Article>> esRecall = CompletableFuture.supplyAsync(() -> {
  55. System.out.println(Thread.currentThread().getName()
  56. + "=====>esRecall executing, timeout<3s, ==>traceId: " + ctx.get());
  57. randomSleepWithIn5Seconds(ctx.get());
  58. return ArticleFactory.randomGenerate(query, "es");
  59. }, ttlExecutor);
  60. CompletableFuture<List<Article>> esRecallTimeOut = CompletableFutureTimeout
  61. .orTimeout(esRecall, 3, TimeUnit.SECONDS)
  62. .exceptionally(e -> {
  63. System.out.println(String.format("esRecall failed, e: %s", e.getMessage()));
  64. return Collections.emptyList();
  65. });
  66. CompletableFuture<List<Article>> allDone = CompletableFuture
  67. .allOf(mmRecallTimeOut, esRecallTimeOut, smRecallTimeOut).thenApply(v -> {
  68. List<Article> all = new ArrayList<>();
  69. all.addAll(mmRecallTimeOut.join());
  70. all.addAll(esRecallTimeOut.join());
  71. all.addAll(smRecallTimeOut.join());
  72. return all;
  73. });
  74. List<Article> list = allDone.join();
  75. // 打印总耗时, 因为es, sm, mm超时时间最长为3s, 因此总耗时一定<=3s
  76. System.out.println("recall total: " + (System.currentTimeMillis() - start) / 1000 + "s");
  77. System.out.println(list);
  78. return list;
  79. }
  80. /**
  81. * 随机睡眠0~5s
  82. */
  83. public void randomSleepWithIn5Seconds(String traceId) {
  84. Random random = new Random();
  85. int time = random.nextInt(5000) - 1;
  86. try {
  87. System.out.println(Thread.currentThread().getName() + "睡眠" + time / 1000 + "s, ==>traceId: " + traceId);
  88. Thread.sleep(time);
  89. } catch (InterruptedException e) {
  90. e.printStackTrace();
  91. }
  92. }
  93. }

可以看到, 超时处理以及异常处理等API的调用较为固定, 可以抽象出一个并行处理器来简化代码

  1. package com.pingan.lcloud.cf;
  2. import com.pingan.lcloud.jdk.CompletableFutureTimeout;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.concurrent.CompletableFuture;
  6. import java.util.concurrent.ExecutorService;
  7. import java.util.concurrent.TimeUnit;
  8. import java.util.function.Function;
  9. import java.util.function.Supplier;
  10. import java.util.stream.Collectors;
  11. /**
  12. * ParallelExecutor
  13. * 并行执行器, 通过submit(), 提交任务, 可设置超时(超时会抛出TimeoutException)以及异常处理器,
  14. * 通过execute()并行执行并获取结果
  15. *
  16. * @author xinzhang
  17. * @version 2022/5/11
  18. */
  19. public class ParallelExecutor<T> {
  20. private final List<CompletableFuture<T>> tasks = new ArrayList<>();
  21. private ExecutorService executorService;
  22. public ParallelExecutor(ExecutorService executorService) {
  23. this.executorService = executorService;
  24. }
  25. /**
  26. * 任务提交, 超时或异常返回null
  27. */
  28. public void submit(Supplier<T> task, long timeout, TimeUnit unit) {
  29. CompletableFuture<T> future = CompletableFuture.supplyAsync(task, executorService);
  30. CompletableFuture<T> timeoutFuture = CompletableFutureTimeout.orTimeout(future, timeout, unit)
  31. .exceptionally(e -> null);
  32. tasks.add(timeoutFuture);
  33. }
  34. /**
  35. * 任务提交
  36. *
  37. * @param task 需要执行的任务
  38. * @param timeout 超时时间
  39. * @param unit 超时单位
  40. * @param errorHandler 异常处理器
  41. */
  42. public void submit(Supplier<T> task, long timeout, TimeUnit unit, Function<Throwable, ? extends T> errorHandler) {
  43. CompletableFuture<T> future = CompletableFuture.supplyAsync(task, executorService);
  44. CompletableFuture<T> timeoutFuture = CompletableFutureTimeout.orTimeout(future, timeout, unit)
  45. .exceptionally(errorHandler);
  46. tasks.add(timeoutFuture);
  47. }
  48. /**
  49. * 任务执行
  50. *
  51. * @return 结果
  52. */
  53. public List<T> execute() {
  54. return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[]{}))
  55. .thenApply(v -> tasks.stream().map(CompletableFuture::join).collect(Collectors.toList())).join();
  56. }
  57. }

使用

  1. /**
  2. * Search
  3. *
  4. * @author xinzhang
  5. * @version 2022/5/9
  6. */
  7. @Slf4j
  8. @Component
  9. public class SearchV2 {
  10. @Autowired
  11. private ExecutorService ttlExecutor;
  12. /**
  13. * 召回错误处理器
  14. */
  15. static class RecallErrorHandler implements Function<Throwable, List<Article>> {
  16. @Override
  17. public List<Article> apply(Throwable t) {
  18. System.out.println(String.format("recall failed, e: %s", t.getMessage()));
  19. return Collections.emptyList();
  20. }
  21. }
  22. public List<Article> recall(String query) {
  23. long start = System.currentTimeMillis();
  24. // 设置UUID作为traceId
  25. TransmittableThreadLocal<String> ctx = new TransmittableThreadLocal<>();
  26. String traceId = UUID.fastUUID().toString();
  27. System.out.println("=====================" + traceId + "===================");
  28. ctx.set(traceId);
  29. // 定义错误处理器
  30. RecallErrorHandler errorHandler = new RecallErrorHandler();
  31. // 定义并行执行器
  32. ParallelExecutor<List<Article>> parallelExecutor = new ParallelExecutor<>(ttlExecutor);
  33. parallelExecutor.submit(() -> {
  34. // 子线程打印traceId
  35. System.out.println(Thread.currentThread().getName()
  36. + "=====>smRecall executing, timeout<1s, ==>traceId: " + ctx.get());
  37. randomSleepWithIn5Seconds(ctx.get());
  38. return ArticleFactory.randomGenerate(query, "sm");
  39. }, 1, TimeUnit.SECONDS, errorHandler);
  40. parallelExecutor.submit(() -> {
  41. // 子线程打印traceId
  42. System.out.println(Thread.currentThread().getName()
  43. + "=====>mmRecall executing, timeout<2s, ==>traceId: " + ctx.get());
  44. randomSleepWithIn5Seconds(ctx.get());
  45. return ArticleFactory.randomGenerate(query, "mm");
  46. }, 2, TimeUnit.SECONDS, errorHandler);
  47. parallelExecutor.submit(() -> {
  48. // 子线程打印traceId
  49. System.out.println(Thread.currentThread().getName()
  50. + "=====>esRecall executing, timeout<3s, ==>traceId: " + ctx.get());
  51. randomSleepWithIn5Seconds(ctx.get());
  52. return ArticleFactory.randomGenerate(query, "es");
  53. }, 3, TimeUnit.SECONDS, errorHandler);
  54. List<List<Article>> res = parallelExecutor.execute();
  55. // 将list合并
  56. List<Article> list = res.stream().flatMap(Collection::stream).collect(Collectors.toList());
  57. // 打印总耗时, 因为es, sm, mm超时时间最长为3s, 因此总耗时一定<=3s
  58. System.out.println("recall total: " + (System.currentTimeMillis() - start) / 1000 + "s");
  59. System.out.println(list);
  60. return list;
  61. }
  62. /**
  63. * 随机睡眠0~5s
  64. */
  65. public void randomSleepWithIn5Seconds(String traceId) {
  66. Random random = new Random();
  67. int time = random.nextInt(5000) - 1;
  68. try {
  69. System.out.println(Thread.currentThread().getName() + "睡眠" + time / 1000 + "s, ==>traceId: " + traceId);
  70. Thread.sleep(time);
  71. } catch (InterruptedException e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. }