原址 : https://xcz.yuque.com/docs/share/d71f13bc-81ae-4c3a-bc0e-66bcd295c2e0?# 《@Async 和 TransmittableThreadLocal 使用记录》

线上问题

image.png
lALPDhJzxoMlDqbNAxzNB3w_1916_796.png
lALPDgtYyTmSXjLNA7TNB3I_1906_948.png
lALPDg7mR95Y-L7NAnzNCZg_2456_636.png

问题猜想

  1. 线上复现发送异常, 查询到对应时间日志, 消息配置为空, 现实是消息配置没有问题
  2. 线上发送偶尔有问题, 偶尔正常, pre 则完全正常
  3. 查看日志合理怀疑, 异步时, 获取 groupId 异常, F6static 使用了 TransmittableThreadLocal 获取
  4. 按正常道理来说, 使用 TransmittableThreadLocal 应该不存在这个问题, 查看 TransmittableThreadLocal 文档, 使用线程池对象需要使用 TtlExecutors.getTtlExecutorService 修饰
  5. 怀疑 @Async 注解, 有默认线程池对象, 当时写代码, 没有添加 async 单独使用的线程池, 所以更没有将 异步线程池使用 TtlExecutors 修饰

验证猜想

验证线程池修饰问题

未修饰程池, 导致 ttl 获取上下文异常

添加测试文件, 再两个主线程内, 添加普通线程池异步, 线程池对象没有使用 TtlExecutors.getTtlExecutorService 修饰

  1. public class NormalExecutorsTest {
  2. private static ExecutorService executorService = Executors.newFixedThreadPool(2);
  3. private static ThreadLocal tl = new TransmittableThreadLocal<>(); //这里采用TTL的实现
  4. public static void main(String[] args) {
  5. new Thread(() -> {
  6. String mainThreadName = "main_01";
  7. tl.set(1);
  8. executorService.execute(() -> {
  9. sleep(1L);
  10. System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  11. });
  12. executorService.execute(() -> {
  13. sleep(1L);
  14. System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  15. });
  16. executorService.execute(() -> {
  17. sleep(1L);
  18. System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  19. });
  20. sleep(1L); //确保上面的会在tl.set执行之前执行
  21. tl.set(2); // 等上面的线程池第一次启用完了,父线程再给自己赋值
  22. executorService.execute(() -> {
  23. sleep(1L);
  24. System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  25. });
  26. executorService.execute(() -> {
  27. sleep(1L);
  28. System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  29. });
  30. executorService.execute(() -> {
  31. sleep(1L);
  32. System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  33. });
  34. System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
  35. }).start();
  36. new Thread(() -> {
  37. String mainThreadName = "main_02";
  38. tl.set(3);
  39. executorService.execute(() -> {
  40. sleep(1L);
  41. System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  42. });
  43. executorService.execute(() -> {
  44. sleep(1L);
  45. System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  46. });
  47. executorService.execute(() -> {
  48. sleep(1L);
  49. System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  50. });
  51. sleep(1L); //确保上面的会在tl.set执行之前执行
  52. tl.set(4); // 等上面的线程池第一次启用完了,父线程再给自己赋值
  53. executorService.execute(() -> {
  54. sleep(1L);
  55. System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  56. });
  57. executorService.execute(() -> {
  58. sleep(1L);
  59. System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  60. });
  61. executorService.execute(() -> {
  62. sleep(1L);
  63. System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  64. });
  65. System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
  66. }).start();
  67. }
  68. private static void sleep(long time) {
  69. try {
  70. Thread.sleep(time);
  71. } catch (InterruptedException e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. }

image.png

从上述测试发现, 两个主线程里都使用线程池异步,而且值在主线程里还发生过改变,测试结果显示确实获取上下文异常,发生错乱

修饰线程池, 获取上下文信息正常

添加测试文件, 区别只有线程池使用了 TtlExecutors.getTtlExecutorService 修饰

  1. public class TtlExecutorsTest {
  2. // 需要注意的是,使用TTL的时候,要想传递的值不出问题,线程池必须得用TTL加一层代理(下面会讲这样做的目的)
  3. private static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(2));
  4. private static ThreadLocal tl = new TransmittableThreadLocal<>(); //这里采用TTL的实现
  5. public static void main(String[] args) {
  6. new Thread(() -> {
  7. String mainThreadName = "main_01";
  8. tl.set(1);
  9. executorService.execute(() -> {
  10. sleep(1L);
  11. System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  12. });
  13. executorService.execute(() -> {
  14. sleep(1L);
  15. System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  16. });
  17. executorService.execute(() -> {
  18. sleep(1L);
  19. System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  20. });
  21. sleep(1L); //确保上面的会在tl.set执行之前执行
  22. tl.set(2); // 等上面的线程池第一次启用完了,父线程再给自己赋值
  23. executorService.execute(() -> {
  24. sleep(1L);
  25. System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  26. });
  27. executorService.execute(() -> {
  28. sleep(1L);
  29. System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  30. });
  31. executorService.execute(() -> {
  32. sleep(1L);
  33. System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  34. });
  35. System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
  36. }).start();
  37. new Thread(() -> {
  38. String mainThreadName = "main_02";
  39. tl.set(3);
  40. executorService.execute(() -> {
  41. sleep(1L);
  42. System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  43. });
  44. executorService.execute(() -> {
  45. sleep(1L);
  46. System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  47. });
  48. executorService.execute(() -> {
  49. sleep(1L);
  50. System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  51. });
  52. sleep(1L); //确保上面的会在tl.set执行之前执行
  53. tl.set(4); // 等上面的线程池第一次启用完了,父线程再给自己赋值
  54. executorService.execute(() -> {
  55. sleep(1L);
  56. System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  57. });
  58. executorService.execute(() -> {
  59. sleep(1L);
  60. System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  61. });
  62. executorService.execute(() -> {
  63. sleep(1L);
  64. System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  65. });
  66. System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
  67. }).start();
  68. }
  69. private static void sleep(long time) {
  70. try {
  71. Thread.sleep(time);
  72. } catch (InterruptedException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. }

image.png

不难发现,两个主线程里都使用线程池异步,而且值在主线程里还发生过改变,测试结果展示一切正常,由此可以知道 TTL 在使用线程池的情况下,只要使用 TtlExecutors 对应方法修饰线程池, 也可以很好的完成传递,而且不会发生错乱。

普通异步请求不使用线程池, 正常获取上下文信息

  1. public class NormalAsyncTest {
  2. private static ThreadLocal tl = new TransmittableThreadLocal<>(); //这里采用TTL的实现
  3. public static void main(String[] args) {
  4. new Thread(() -> {
  5. String mainThreadName = "main_01";
  6. tl.set(1);
  7. new Thread(() -> {
  8. sleep(1L);
  9. System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  10. }).start();
  11. new Thread(() -> {
  12. sleep(1L);
  13. System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  14. }).start();
  15. new Thread(() -> {
  16. sleep(1L);
  17. System.out.println(String.format("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  18. }).start();
  19. sleep(1L); //确保上面的会在tl.set执行之前执行
  20. tl.set(2); // 等上面的线程池第一次启用完了,父线程再给自己赋值
  21. new Thread(() -> {
  22. sleep(1L);
  23. System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  24. }).start();
  25. new Thread(() -> {
  26. sleep(1L);
  27. System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  28. }).start();
  29. new Thread(() -> {
  30. sleep(1L);
  31. System.out.println(String.format("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  32. }).start();
  33. System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
  34. }).start();
  35. new Thread(() -> {
  36. String mainThreadName = "main_02";
  37. tl.set(3);
  38. new Thread(() -> {
  39. sleep(1L);
  40. System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  41. }).start();
  42. new Thread(() -> {
  43. sleep(1L);
  44. System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  45. }).start();
  46. new Thread(() -> {
  47. sleep(1L);
  48. System.out.println(String.format("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  49. }).start();
  50. sleep(1L); //确保上面的会在tl.set执行之前执行
  51. tl.set(4); // 等上面的线程池第一次启用完了,父线程再给自己赋值
  52. new Thread(() -> {
  53. sleep(1L);
  54. System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  55. }).start();
  56. new Thread(() -> {
  57. sleep(1L);
  58. System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  59. }).start();
  60. new Thread(() -> {
  61. sleep(1L);
  62. System.out.println(String.format("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
  63. }).start();
  64. System.out.println(String.format("线程名称-%s, 变量值=%s", Thread.currentThread().getName(), tl.get()));
  65. }).start();
  66. }
  67. private static void sleep(long time) {
  68. try {
  69. Thread.sleep(time);
  70. } catch (InterruptedException e) {
  71. e.printStackTrace();
  72. }
  73. }
  74. }


image.png

测试结果显示和使用了线程池修饰的测试保持一致, 普通线程异步直接获取 ttl 上下文信息没有问题

验证 @Async 使用了默认线程池

断点异步方法, 查看线程池相关

@Async 和 TransmittableThreadLocal 使用记录 - 图8

很明显可以看到默认线程池配置

源码解析

TransmittableThreadLocal

image.png

接入文档显示的很清楚, 使用线程池需要 TtlExecutors 对应修饰

先来看 TTL 里面的几个重要属性及方法
TTL 定义:

  1. public class TransmittableThreadLocal extends InheritableThreadLocal

可以看到,TTL 继承了 ITL,意味着 TTL 首先具备 ITL 的功能。
再来看看一个重要属性 holder:

  1. /**
  2. * 这是一个ITL类型的对象,持有一个全局的WeakMap(weakMap的key是弱引用,同TL一样,也是为了解决内存泄漏的问题),里面存放了TTL对象
  3. * 并且重写了initialValue和childValue方法,尤其是childValue,可以看到在即将异步时父线程的属性是直接作为初始化值赋值给子线程的本地变量对象(TTL)的
  4. */
  5. private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
  6. new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
  7. @Override
  8. protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
  9. return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
  10. }
  11. @Override
  12. protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
  13. return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
  14. }
  15. };

再来看下 set 和 get:

  1. //下面的方法均属于TTL类
  2. @Override
  3. public final void set(T value) {
  4. super.set(value);
  5. if (null == value) removeValue();
  6. else addValue();
  7. }
  8. @Override
  9. public final T get() {
  10. T value = super.get();
  11. if (null != value) addValue();
  12. return value;
  13. }
  14. private void removeValue() {
  15. holder.get().remove(this); //从holder持有的map对象中移除
  16. }
  17. private void addValue() {
  18. if (!holder.get().containsKey(this)) {
  19. holder.get().put(this, null); //从holder持有的map对象中添加
  20. }
  21. }

TTL 里先了解上述的几个方法及对象,可以看出,单纯的使用 TTL 是达不到支持线程池本地变量的传递的,通过第一部分的例子,可以发现,除了要启用 TTL,还需要通过 TtlExecutors.getTtlExecutorService 包装一下线程池才可以,那么,下面就来看看在程序即将通过线程池异步的时候,TTL 帮我们做了哪些操作(这一部分是 TTL 支持线程池传递的核心部分):
首先打开包装类,看下 execute 方法在执行时做了些什么。

  1. // 此方法属于线程池包装类ExecutorTtlWrapper
  2. @Override
  3. public void execute(@Nonnull Runnable command) {
  4. executor.execute(TtlRunnable.get(command)); //这里会把Rannable包装一层,这是关键,有些逻辑处理,需要在run之前执行
  5. }
  6. // 对应上面的get方法,返回一个TtlRunnable对象,属于TtLRannable包装类
  7. @Nullable
  8. public static TtlRunnable get(@Nullable Runnable runnable) {
  9. return get(runnable, false, false);
  10. }
  11. // 对应上面的get方法
  12. @Nullable
  13. public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
  14. if (null == runnable) return null;
  15. if (runnable instanceof TtlEnhanced) { // 若发现已经是目标类型了(说明已经被包装过了)直接返回
  16. // avoid redundant decoration, and ensure idempotency
  17. if (idempotent) return (TtlRunnable) runnable;
  18. else throw new IllegalStateException("Already TtlRunnable!");
  19. }
  20. return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun); //最终初始化
  21. }
  22. // 对应上面的TtlRunnable方法
  23. private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
  24. this.capturedRef = new AtomicReference<Object>(capture()); //这里将捕获后的父线程本地变量存储在当前对象的capturedRef里
  25. this.runnable = runnable;
  26. this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
  27. }
  28. // 对应上面的capture方法,用于捕获当前线程(父线程)里的本地变量,此方法属于TTL的静态内部类Transmitter
  29. @Nonnull
  30. public static Object capture() {
  31. Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
  32. for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { // holder里目前存放的k-v里的key,就是需要传给子线程的TTL对象
  33. captured.put(threadLocal, threadLocal.copyValue());
  34. }
  35. return captured; // 这里返回的这个对象,就是当前将要使用线程池异步出来的子线程,所继承的本地变量合集
  36. }
  37. // 对应上面的copyValue,简单的将TTL对象里的值返回(结合之前的源码可以知道get方法其实就是获取当前线程(父线程)里的值,调用super.get方法)
  38. private T copyValue() {
  39. return copy(get());
  40. }
  41. protected T copy(T parentValue) {
  42. return parentValue;
  43. }

结合上述代码,大致知道了在线程池异步之前需要做的事情,其实就是把当前父线程里的本地变量取出来,然后赋值给 Rannable 包装类里的 capturedRef 属性,到此为止,下面会发生什么,我们大致上可以猜出来了,接下来大概率会在 run 方法里,将这些捕获到的值赋给子线程的 holder 赋对应的 TTL 值,那么我们继续往下看 Rannable 包装类里的 run 方法是怎么实现的:

  1. //run方法属于Rannable的包装类TtlRunnable
  2. @Override
  3. public void run() {
  4. Object captured = capturedRef.get(); // 获取由之前捕获到的父线程变量集
  5. if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
  6. throw new IllegalStateException("TTL value reference is released after run!");
  7. }
  8. /**
  9. * 重点方法replay,此方法用来给当前子线程赋本地变量,返回的backup是此子线程原来就有的本地变量值(原生本地变量,下面会详细讲),
  10. * backup用于恢复数据(如果任务执行完毕,意味着该子线程会归还线程池,那么需要将其原生本地变量属性恢复)
  11. */
  12. Object backup = replay(captured);
  13. try {
  14. runnable.run(); // 执行异步逻辑
  15. } finally {
  16. restore(backup); // 结合上面对于replay的解释,不难理解,这个方法就是用来恢复原有值的
  17. }
  18. }

根据上述代码,我们看到了 TTL 在异步任务执行前,会先进行赋值操作(就是拿着异步发生时捕获到的父线程的本地变量,赋给自己),当任务执行完,就恢复原生的自己本身的线程变量值。
下面来具体看这俩方法:

  1. //下面的方法均属于TTL的静态内部类Transmittable
  2. @Nonnull
  3. public static Object replay(@Nonnull Object captured) {
  4. @SuppressWarnings("unchecked")
  5. Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; //使用此线程异步时捕获到的父线程里的本地变量值
  6. Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); //当前线程原生的本地变量,用于使用完线程后恢复用
  7. //注意:这里循环的是当前子线程原生的本地变量集合,与本方法相反,restore方法里循环这个holder是指:该线程运行期间产生的变量+父线程继承来的变量
  8. for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
  9. iterator.hasNext(); ) {
  10. Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
  11. TransmittableThreadLocal<?> threadLocal = next.getKey();
  12. backup.put(threadLocal, threadLocal.get()); // 所有原生的本地变量都暂时存储在backup里,用于之后恢复用
  13. /**
  14. * 检查,如果捕获到的线程变量里,不包含当前原生变量值,则从当前原生变量里清除掉,对应的线程本地变量也清掉
  15. * 这就是为什么会将原生变量保存在backup里的原因,为了恢复原生值使用
  16. * 那么,为什么这里要清除掉呢?因为从使用这个子线程做异步那里,捕获到的本地变量并不包含原生的变量,当前线程
  17. * 在做任务时的首要目标,是将父线程里的变量完全传递给任务,如果不清除这个子线程原生的本地变量,
  18. * 意味着很可能会影响到任务里取值的准确性。
  19. *
  20. * 打个比方,有ttl对象tl,这个tl在线程池的某个子线程里存在对应的值2,当某个主线程使用该子线程做异步任务时
  21. * tl这个对象在当前主线程里没有值,那么如果不进行下面这一步的操作,那么在使用该子线程做的任务里就可以通过
  22. * 该tl对象取到值2,不符合预期
  23. */
  24. if (!capturedMap.containsKey(threadLocal)) {
  25. iterator.remove();
  26. threadLocal.superRemove();
  27. }
  28. }
  29. // 这一步就是直接把父线程本地变量赋值给当前线程了(这一步起就刷新了holder里的值了,具体往下看该方法,在异步线程运行期间,还可能产生别的本地变量,比如在真正的run方法内的业务代码,再用一个tl对象设置一个值)
  30. setTtlValuesTo(capturedMap);
  31. // 这个方法属于扩展方法,ttl本身支持重写异步任务执行前后的操作,这里不再具体赘述
  32. doExecuteCallback(true);
  33. return backup;
  34. }
  35. // 结合之前Rannable包装类的run方法来看,这个方法就是使用上面replay记录下的原生线程变量做恢复用的
  36. public static void restore(@Nonnull Object backup) {
  37. @SuppressWarnings("unchecked")
  38. Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
  39. // call afterExecute callback
  40. doExecuteCallback(false);
  41. // 注意,这里的holder取出来的,实际上是replay方法设置进去的关于父线程里的所有变量(结合上面来看,就是:该线程运行期间产生的变量+父线程继承来的变量)
  42. for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
  43. iterator.hasNext(); ) {
  44. Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
  45. TransmittableThreadLocal<?> threadLocal = next.getKey();
  46. /**
  47. * 同样的,如果子线程原生变量不包含某个父线程传来的对象,那么就删除,可以思考下,这里的清除跟上面replay里的有什么不同?
  48. * 这里会把不属于原生变量的对象给删除掉(这里被删除掉的可能是父线程继承下来的,也可能是异步任务在执行时产生的新值)
  49. */
  50. if (!backupMap.containsKey(threadLocal)) {
  51. iterator.remove();
  52. threadLocal.superRemove();
  53. }
  54. }
  55. // 同样调用这个方法,进行值的恢复
  56. setTtlValuesTo(backupMap);
  57. }
  58. // 真正给当前子线程赋值的方法,对应上面的setTtlValuesTo方法
  59. private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
  60. for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
  61. @SuppressWarnings("unchecked")
  62. TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
  63. threadLocal.set(entry.getValue()); //赋值,注意,从这里开始,子线程的holder里的值会被重新赋值刷新,可以参照上面ttl的set方法的实现
  64. }
  65. }

ok,到这里基本上把 TTL 比较核心的代码看完了,下面整理下整个流程,这是官方给出的时序图:
@Async 和 TransmittableThreadLocal 使用记录 - 图10

@Async 注解

这个类的 value 参数简直太友好了:
@Async 和 TransmittableThreadLocal 使用记录 - 图11
五处调用的地方,其中四处都是注释。
有效的调用就这一个地方,直接先打上断点再说:

org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor#getExecutorQualifier

发起调用之后,果然跑到了断点这个地方:
@Async 和 TransmittableThreadLocal 使用记录 - 图12
顺着断点往下调试,就会来到这个地方:

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor

@Async 和 TransmittableThreadLocal 使用记录 - 图13
这个代码结构非常的清晰。
编号为 ① 的地方,是获取对应方法上的 @Async 注解的 value 值。这个值其实就是 bean 名称,如果不为空则从 Spring 容器中获取对应的 bean。
如果 value 是没有值的,也就是我们 Demo 的这种情况,会走到编号为 ② 的地方。
这个地方就是我要找的默认的线程池。
最后,不论是默认的线程池还是 Spring 容器中我们自定义的线程池。
都会以方法为维度,在 map 中维护方法和线程池的映射关系。
也就是编号为 ③ 的这一步,代码中的 executors 就是一个 map:
@Async 和 TransmittableThreadLocal 使用记录 - 图14
所以,我要找的东西,就是编号为 ② 的这个地方的逻辑。
这里面主要是一个 defaultExecutor 对象:
@Async 和 TransmittableThreadLocal 使用记录 - 图15
这个玩意是一个函数式编程,所以如果你不知道这个玩意是干什么的,调试起来可能有点懵逼:
我建议你去恶补一下, 10 分钟就能入门。
@Async 和 TransmittableThreadLocal 使用记录 - 图16
最终你会调试到这个地方来:

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#getDefaultExecutor

@Async 和 TransmittableThreadLocal 使用记录 - 图17
这个代码就有点意思了,就是从 BeanFactory 里面获取一个默认的线程池相关的 Bean 出来。流程很简单,日志也打印的很清楚,就不赘述了。

最终结果

添加异步线程池, 使用 TtlExecutors.getTtlExecutor 修饰, 其余不变, 上线验证

  1. @Component
  2. public class AsyncThreadPoolConfiguration implements AsyncConfigurer {
  3. @Override
  4. public Executor getAsyncExecutor() {
  5. ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
  6. threadPool.setCorePoolSize(5);
  7. threadPool.setMaxPoolSize(20);
  8. threadPool.setQueueCapacity(200);
  9. threadPool.setKeepAliveSeconds(60);
  10. threadPool.setThreadNamePrefix("AsyncThread: ");
  11. threadPool.initialize();
  12. return TtlExecutors.getTtlExecutor(threadPool);
  13. }
  14. @Override
  15. public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
  16. return new SimpleAsyncUncaughtExceptionHandler();
  17. }
  18. }

参考文档

TransmittableThreadLocal github : https://github.com/alibaba/transmittable-thread-local
ThreadLocal 系列(三)-TransmittableThreadLocal 的使用及原理解析 : https://www.cnblogs.com/hama1993/p/10409740.html
别问了,我真的不喜欢这个注解 : https://www.toutiao.com/i7004671249742021150/?tt_from=weixin&utm_campaign=client_share&wxshare_count=1&timestamp=1632363548&app=news_article&utm_source=weixin&utm_medium=toutiao_android&use_new_style=1&req_id=202109231019080101351600185028DD1E&share_token=de46e460-2f18-4459-87af-beb96b8955fc&group_id=7004671249742021150