一、基本介绍

1.1 简介

TransmittableThreadLocal 是Alibaba开源的、用于解决 “在使用线程池等会缓存线程的组件情况下传递ThreadLocal” 问题的 InheritableThreadLocal 扩展。若希望 TransmittableThreadLocal 在线程池与主线程间传递,需配合 TtlRunnableTtlCallable 使用
github地址:https://github.com/alibaba/transmittable-thread-local

1.2 功能

TransmittableThreadLocal(TTL):在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。一个Java标准库本应为框架/中间件设施开发提供的标配能力,本库功能聚焦 & 0依赖

JDK的InheritableThreadLocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时 的ThreadLocal值传递到 任务执行时
[

](https://github.com/alibaba/transmittable-thread-local#-user-guide)

1.3 需求场景

ThreadLocal 的需求场景即 TransmittableThreadLocal 的潜在需求场景,如果你的业务需要『在使用线程池等会池化复用线程的执行组件情况下传递 ThreadLocal 值』则是 TransmittableThreadLocal 目标场景。
下面是几个典型场景例子。

  1. 分布式跟踪系统 或 全链路压测(即链路打标)
  2. 日志收集记录系统上下文
  3. Session级Cache
  4. 应用容器或上层框架跨应用代码给下层SDK传递信息

各个场景的展开说明参见子文档 需求场景

1.4 用户指南

使用类TransmittableThreadLocal来保存值,并跨线程池传递。
TransmittableThreadLocal继承InheritableThreadLocal,使用方式也类似。相比InheritableThreadLocal,添加了

  1. copy方法
    用于定制 任务提交给线程池时 的ThreadLocal值传递到 任务执行时 的拷贝行为,缺省传递的是引用。
    注意:如果跨线程传递了对象引用因为不再有线程封闭,与InheritableThreadLocal.childValue一样,使用者/业务逻辑要注意传递对象的线程
  2. protected的beforeExecute/afterExecute方法
    执行任务(Runnable/Callable)的前/后的生命周期回调,缺省是空操作

二、使用教程

2.1 简单使用

pom.xml

  1. <dependency>
  2. <groupId>com.alibaba</groupId>
  3. <artifactId>transmittable-thread-local</artifactId>
  4. <version>2.12.4</version>
  5. </dependency>

父线程给子线程传递值。
示例代码:

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

// =====================================================

// 在子线程中可以读取,值是"value-set-in-parent"
String value = context.get();

这其实是InheritableThreadLocal的功能,应该使用InheritableThreadLocal来完成。
但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal值传递到 任务执行时
解决方法参见下面的这几种用法。

2.2 保证线程池中传递值

2.2.1 修饰Runnable和Callable

使用TtlRunnableTtlCallable来修饰传入线程池的Runnable和Callable。
示例代码:

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);

// =====================================================

// Task中可以读取,值是"value-set-in-parent"
String value = context.get();

注意
即使是同一个Runnable任务多次提交到线程池时,每次提交时都需要通过修饰操作(即TtlRunnable.get(task))以抓取这次提交时的TransmittableThreadLocal上下文的值;即如果同一个任务下一次提交时不执行修饰而仍然使用上一次的TtlRunnable,则提交的任务运行时会是之前修饰操作所抓取的上下文。示例代码如下:

// 第一次提交
Runnable task = new RunnableTask();
executorService.submit(TtlRunnable.get(task));

// ...业务逻辑代码,
// 并且修改了 TransmittableThreadLocal上下文 ...
// context.set("value-modified-in-parent");

// 再次提交
// 重新执行修饰,以传递修改了的 TransmittableThreadLocal上下文
executorService.submit(TtlRunnable.get(task));

上面演示了Runnable,Callable的处理类似

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

Callable call = new CallableTask();
// 额外的处理,生成修饰了的对象ttlCallable
Callable ttlCallable = TtlCallable.get(call);
executorService.submit(ttlCallable);

// =====================================================

// Call中可以读取,值是"value-set-in-parent"
String value = context.get();

整个过程的完整时序图
TransmittableThreadLocal - 图1

2.2.2 修饰线程池

省去每次Runnable和Callable传入线程池时的修饰,这个逻辑可以在线程池中完成。
通过工具类com.alibaba.ttl.threadpool.TtlExecutors完成,有下面的方法:

  • getTtlExecutor():修饰接口Executor
  • getTtlExecutorService():修饰接口ExecutorService
  • getTtlScheduledExecutorService():修饰接口ScheduledExecutorService

示例代码:

ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);

// =====================================================

// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();

完整可运行的Demo代码参见TtlExecutorWrapperDemo.kt

三、原理解析

3.1 Demo演示

Demo

// 初始化一个TransmittableThreadLocal,这个是继承了InheritableThreadLocal的
TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>();

// 初始化一个长度为1的线程池
ExecutorService poolExecutor = Executors.newFixedThreadPool(1);

@Test
public void test2() throws ExecutionException, InterruptedException {
    // 设置初始值
    local.set("天王老子");
    //!!!! 注意:这个地方的Task是使用了TtlRunnable包装的
    Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));
    future.get();

    Future future2 = poolExecutor.submit(TtlRunnable.get(new Task("任务2")));
    future2.get();

    System.out.println("父线程的值:"+local.get());
    poolExecutor.shutdown();
}

class Task implements Runnable{

    String str;
    Task(String str){
        this.str = str;
    }
    @Override
    public void run() {
        // 获取值
        System.out.println(Thread.currentThread().getName()+":"+local.get());
        // 重新设置一波
        local.set(str);
        System.out.println(Thread.currentThread().getName()+":"+local.get());
    }
}

打印结果:

pool-1-thread-1:天王老子
pool-1-thread-1:任务1
pool-1-thread-1:天王老子
pool-1-thread-1:任务2
父线程的值:天王老子

3.2 源码解析

TransmittableThreadLocal

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {

  // 1. 此处的holder是他的主要设计点,后续在构建TtlRunnable
  private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
            new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
                @Override
                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
                    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
                }

                @Override
                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
                }
            };

    @SuppressWarnings("unchecked")
    private void addThisToHolder() {
        if (!holder.get().containsKey(this)) {
            holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
        }
    }

  @Override
    public final T get() {
        T value = super.get();
        if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
        return value;
    }
    /**
     * see {@link InheritableThreadLocal#set}
     */
    @Override
    public final void set(T value) {
        if (!disableIgnoreNullValueSemantics && null == value) {
            // may set null to remove value
            remove();
        } else {
            super.set(value);
            addThisToHolder();
        }
    }
    /**
     * see {@link InheritableThreadLocal#remove()}
     */
    @Override
    public final void remove() {
        removeThisFromHolder();
        super.remove();
    }

    private void superRemove() {
        super.remove();
    }
}

重点查看 holder 对象:

  • 在代码中,作者构建了一个holder对象,这个对象是一个InheritableThreadLocal , 里面的类型是一个弱引用的WeakHashMap , 这个map的value就是TransmittableThreadLocal , 至于value永远都是空的
  • holder里面存储的是这个应用里面,所有关于TransmittableThreadLocal的引用
  • 从上面可以看到,每次get, set , remove都会操作holder对象,这样做的目的是为了保持TransmittableThreadLocal所有的这个引用都在holder里面存一份

TtlRunnable

回到上面的代码

Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));

发现调用了TtlRunnable 对象的get方法

public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
    if (null == runnable) return null;

    if (runnable instanceof TtlEnhanced) {
        // avoid redundant decoration, and ensure idempotency
        if (idempotent) return (TtlRunnable) runnable;
        else throw new IllegalStateException("Already TtlRunnable!");
    }
    // 重点在这里
    return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}

看上面的代码,细节上我们不看,我们看大致的思路, 这个地方主要就是根据传入的runnable构建了一个TtlRunnable对象

private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
              //重点在这里
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}

下面这行代码,运行到这里的时候,还是在主线程里面,调用了capture方法

this.capturedRef = new AtomicReference<Object>(capture());

Capture

public static Object capture() {
    // 构建一个临时对象,主要看captureTtlValues方法
    return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}

private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
  // 构建一个WeakHashMap方法,
  WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
  // 在主线程里面,调用holder变量,循环获取里面所有的key和value
  for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
    ttl2Value.put(threadLocal, threadLocal.copyValue());
  }
  // 返回出去
  return ttl2Value;
}


步骤说明:
1.调用静态变量holder, 循环获取里面所有的key和value, value的获取就比较巧妙一点。

private T copyValue() {
      // 这里的get方法,调用的是父类的方法,可以在父类里面最终获取到当前TransmittableThreadLocal所对应的value
      return copy(get());
}

2.组装好一个WeakHashMap出去,最终就会到了我们上面的构造方法里面,针对capturedRef 的赋值操作。

run

@Override
public void run() {
  //1. 获取到刚刚构造TtlRunnable对象的时候初始化的capturedRef对象。包含了从submit丢任务进来的时候父线程的数据
  Object captured = capturedRef.get();
  if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
    throw new IllegalStateException("TTL value reference is released after run!");
  }
    // 清除不在captured里面的key,同时在这个子线程中,对所有的ThreadLocal进行重新设置值
  Object backup = replay(captured);
  try {
    // 执行实际的线程方法
    runnable.run();
  } finally {
    // 做好还原工作,根据backup
    restore(backup);
  }
}

private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
            WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();

    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
      TransmittableThreadLocal<Object> threadLocal = iterator.next();

      // 做好当前线程的local备份
      backup.put(threadLocal, threadLocal.get());

      // 清除数据,不在captured里面的。
      if (!captured.containsKey(threadLocal)) {
        iterator.remove();
        threadLocal.superRemove();
      }
    }

    // 这里就是把值设置到当前线程的TransmittableThreadLocal里面。
    setTtlValuesTo(captured);

    // 一个钩子
    doExecuteCallback(true);

    return backup;
}

3.3 总结

  1. 通过继承InheritableThreadLocal,新成立一个TransmittableThreadLocal类, 该类中有一个hodel变量,用来维护所有的TransmittableThreadLocal引用。
  2. 在实际submit任务到线程池的时候,我们是需要调用TtlRunnable.get方法,构建一个任务的包装类。这里使用装饰者模式,对runnable线程对象进行装饰包装,在初始化这个包装对象的时候,会获取主线程里面所有的TransmittableThreadLocal引用,以及里面所有的值,这个值其实就是当前父线程里面的(跟你当时创建这个线程的父线程没有任何关系,注意,这里讲的是线程池的场景)。
  3. 对数据做规整,根据收集到的 captured (这个对象里面存储的都是主线程里面能够获取到TransmittableThreadLocal以及对应的值) 做规整,去掉当前线程里面不需要的(当前线程TheadLocalMap中不存在的),同时将剩余的key和value ,更新到当前线程的ThreadLocal里面。这样就达到了在池化技术里面父子线程传值的安全性

四、使用TTL的好处和必要性

好处:透明且自动完成所有异步执行上下文的可定制、规范化的捕捉与传递。
这个好处也是TransmittableThreadLocal的目标。

必要性:随着应用的分布式微服务化并使用各种中间件,越来越多的功能与组件会涉及不同的上下文,逻辑流程也越来越长;上下文问题实际上是个大的易错的架构问题,需要统一的对业务透明的解决方案。

使用ThreadLocal作为业务上下文传递的经典技术手段在中间件、技术与业务框架中广泛大量使用。而对于生产应用,几乎一定会使用线程池等异步执行组件,以高效支撑线上大流量。但使用ThreadLocal及其set/remove的上下文传递模式,在使用线程池等异步执行组件时,存在多方面的问题:
从业务使用者角度来看

  1. 繁琐
    • 业务逻辑要知道:有哪些上下文;各个上下文是如何获取的。
    • 并需要业务逻辑去一个一个地捕捉与传递。
  2. 依赖
    • 需要直接依赖不同ThreadLocal上下文各自的获取的逻辑或类。
    • 像RPC的上下文(如Dubbo的RpcContext)、全链路跟踪的上下文(如SkyWalking的ContextManager)、不同业务模块中的业务流程上下文,等等。
  3. 静态(易漏)
    • 因为要 事先 知道有哪些上下文,如果系统出现了一个新的上下文,业务逻辑就要修改添加上新上下文传递的几行代码。也就是说因 系统的 上下文新增,业务的 逻辑就跟进要修改。
    • 而对于业务来说,不关心系统的上下文,即往往就可能遗漏,会是线上故障了。
    • 随着应用的分布式微服务化并使用各种中间件,越来越多的功能与组件会涉及不同的上下文,逻辑流程也越来越长;上下文问题实际上是个大的易错的架构问题,需要统一的对业务透明的解决方案。
  4. 定制性
    • 因为需要业务逻辑来完成捕捉与传递,业务要关注『上下文的传递方式』:直接传引用?还是拷贝传值?拷贝是深拷贝还是浅拷贝?在不同的上下文会需要不同的做法。
    • 『上下文的传递方式』往往是 上下文的提供者(或说是业务逻辑的框架部分)才能决策处理好的;而 上下文的使用者(或说是业务逻辑的应用部分)往往不(期望)知道上下文的传递方式。这也可以理解成是 依赖,即业务逻辑 依赖/关注/实现了 系统/架构的『上下文的传递方式』。

从整体流程实现角度来看
关注的是 上下文传递流程的规范化。上下文传递到了子线程要做好 清理(或更准确地说是要 恢复 成之前的上下文),需要业务逻辑去处理好。如果业务逻辑对清理的处理不正确,比如:

  • 如果清理操作漏了:
    • 下一次执行可能是上次的,即『上下文的 污染/串号』,会导致业务逻辑错误。
    • 『上下文的 泄漏』,会导致内存泄漏问题。
  • 如果清理操作做多了,会出现上下文 丢失

上面的问题,在业务开发中引发的Bug真是屡见不鲜 !本质原因是:ThreadLocal的set/remove的上下文传递模式 在使用线程池等异步执行组件的情况下不再是有效的。常见的典型例子:

  • 当线程池满了且线程池的RejectedExecutionHandler使用的是CallerRunsPolicy时,提交到线程池的任务会在提交线程中直接执行,ThreadLocal.remove操作清理提交线程的上下文导致上下文丢失
  • 类似的,使用ForkJoinPool(包含并行执行Stream与CompletableFuture,底层使用ForkJoinPool)的场景,展开的ForkJoinTask会在任务提交线程中直接执行。同样导致上下文丢失