一、基本介绍
1.1 简介
TransmittableThreadLocal 是Alibaba开源的、用于解决 “在使用线程池等会缓存线程的组件情况下传递ThreadLocal” 问题的 InheritableThreadLocal 扩展。若希望 TransmittableThreadLocal 在线程池与主线程间传递,需配合 TtlRunnable 和 TtlCallable 使用
github地址:https://github.com/alibaba/transmittable-thread-local
1.2 功能
TransmittableThreadLocal(TTL):在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。一个Java标准库本应为框架/中间件设施开发提供的标配能力,本库功能聚焦 & 0依赖
JDK的InheritableThreadLocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时 的ThreadLocal值传递到 任务执行时。
[
](https://github.com/alibaba/transmittable-thread-local#-user-guide)
1.3 需求场景
ThreadLocal 的需求场景即 TransmittableThreadLocal 的潜在需求场景,如果你的业务需要『在使用线程池等会池化复用线程的执行组件情况下传递 ThreadLocal 值』则是 TransmittableThreadLocal 目标场景。
下面是几个典型场景例子。
- 分布式跟踪系统 或 全链路压测(即链路打标)
- 日志收集记录系统上下文
- Session级Cache
- 应用容器或上层框架跨应用代码给下层SDK传递信息
各个场景的展开说明参见子文档 需求场景
1.4 用户指南
使用类TransmittableThreadLocal来保存值,并跨线程池传递。
TransmittableThreadLocal继承InheritableThreadLocal,使用方式也类似。相比InheritableThreadLocal,添加了
- copy方法
用于定制 任务提交给线程池时 的ThreadLocal值传递到 任务执行时 的拷贝行为,缺省传递的是引用。
注意:如果跨线程传递了对象引用因为不再有线程封闭,与InheritableThreadLocal.childValue一样,使用者/业务逻辑要注意传递对象的线程 - protected的beforeExecute/afterExecute方法
执行任务(Runnable/Callable)的前/后的生命周期回调,缺省是空操作
二、使用教程
2.1 简单使用
pom.xml
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.12.4</version>
</dependency>
父线程给子线程传递值。
示例代码:
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
// =====================================================
// 在子线程中可以读取,值是"value-set-in-parent"
String value = context.get();
这其实是InheritableThreadLocal的功能,应该使用InheritableThreadLocal来完成。
但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal值传递到 任务执行时。
解决方法参见下面的这几种用法。
2.2 保证线程池中传递值
2.2.1 修饰Runnable和Callable
使用TtlRunnable和TtlCallable来修饰传入线程池的Runnable和Callable。
示例代码:
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);
// =====================================================
// Task中可以读取,值是"value-set-in-parent"
String value = context.get();
注意:
即使是同一个Runnable任务多次提交到线程池时,每次提交时都需要通过修饰操作(即TtlRunnable.get(task))以抓取这次提交时的TransmittableThreadLocal上下文的值;即如果同一个任务下一次提交时不执行修饰而仍然使用上一次的TtlRunnable,则提交的任务运行时会是之前修饰操作所抓取的上下文。示例代码如下:
// 第一次提交
Runnable task = new RunnableTask();
executorService.submit(TtlRunnable.get(task));
// ...业务逻辑代码,
// 并且修改了 TransmittableThreadLocal上下文 ...
// context.set("value-modified-in-parent");
// 再次提交
// 重新执行修饰,以传递修改了的 TransmittableThreadLocal上下文
executorService.submit(TtlRunnable.get(task));
上面演示了Runnable,Callable的处理类似
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Callable call = new CallableTask();
// 额外的处理,生成修饰了的对象ttlCallable
Callable ttlCallable = TtlCallable.get(call);
executorService.submit(ttlCallable);
// =====================================================
// Call中可以读取,值是"value-set-in-parent"
String value = context.get();
2.2.2 修饰线程池
省去每次Runnable和Callable传入线程池时的修饰,这个逻辑可以在线程池中完成。
通过工具类com.alibaba.ttl.threadpool.TtlExecutors完成,有下面的方法:
- getTtlExecutor():修饰接口Executor
- getTtlExecutorService():修饰接口ExecutorService
- getTtlScheduledExecutorService():修饰接口ScheduledExecutorService
示例代码:
ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);
// =====================================================
// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();
完整可运行的Demo代码参见TtlExecutorWrapperDemo.kt
三、原理解析
3.1 Demo演示
Demo
// 初始化一个TransmittableThreadLocal,这个是继承了InheritableThreadLocal的
TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>();
// 初始化一个长度为1的线程池
ExecutorService poolExecutor = Executors.newFixedThreadPool(1);
@Test
public void test2() throws ExecutionException, InterruptedException {
// 设置初始值
local.set("天王老子");
//!!!! 注意:这个地方的Task是使用了TtlRunnable包装的
Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));
future.get();
Future future2 = poolExecutor.submit(TtlRunnable.get(new Task("任务2")));
future2.get();
System.out.println("父线程的值:"+local.get());
poolExecutor.shutdown();
}
class Task implements Runnable{
String str;
Task(String str){
this.str = str;
}
@Override
public void run() {
// 获取值
System.out.println(Thread.currentThread().getName()+":"+local.get());
// 重新设置一波
local.set(str);
System.out.println(Thread.currentThread().getName()+":"+local.get());
}
}
打印结果:
pool-1-thread-1:天王老子
pool-1-thread-1:任务1
pool-1-thread-1:天王老子
pool-1-thread-1:任务2
父线程的值:天王老子
3.2 源码解析
TransmittableThreadLocal
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
// 1. 此处的holder是他的主要设计点,后续在构建TtlRunnable
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
@SuppressWarnings("unchecked")
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
}
}
@Override
public final T get() {
T value = super.get();
if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
return value;
}
/**
* see {@link InheritableThreadLocal#set}
*/
@Override
public final void set(T value) {
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
super.set(value);
addThisToHolder();
}
}
/**
* see {@link InheritableThreadLocal#remove()}
*/
@Override
public final void remove() {
removeThisFromHolder();
super.remove();
}
private void superRemove() {
super.remove();
}
}
重点查看 holder 对象:
- 在代码中,作者构建了一个holder对象,这个对象是一个InheritableThreadLocal , 里面的类型是一个弱引用的WeakHashMap , 这个map的value就是TransmittableThreadLocal , 至于value永远都是空的
- holder里面存储的是这个应用里面,所有关于TransmittableThreadLocal的引用
- 从上面可以看到,每次get, set , remove都会操作holder对象,这样做的目的是为了保持TransmittableThreadLocal所有的这个引用都在holder里面存一份
TtlRunnable
回到上面的代码
Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));
发现调用了TtlRunnable 对象的get方法
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
// 重点在这里
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
看上面的代码,细节上我们不看,我们看大致的思路, 这个地方主要就是根据传入的runnable构建了一个TtlRunnable对象
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
//重点在这里
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
下面这行代码,运行到这里的时候,还是在主线程里面,调用了capture方法
this.capturedRef = new AtomicReference<Object>(capture());
Capture
public static Object capture() {
// 构建一个临时对象,主要看captureTtlValues方法
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
// 构建一个WeakHashMap方法,
WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
// 在主线程里面,调用holder变量,循环获取里面所有的key和value
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
// 返回出去
return ttl2Value;
}
步骤说明:
1.调用静态变量holder, 循环获取里面所有的key和value, value的获取就比较巧妙一点。
private T copyValue() {
// 这里的get方法,调用的是父类的方法,可以在父类里面最终获取到当前TransmittableThreadLocal所对应的value
return copy(get());
}
2.组装好一个WeakHashMap出去,最终就会到了我们上面的构造方法里面,针对capturedRef 的赋值操作。
run
@Override
public void run() {
//1. 获取到刚刚构造TtlRunnable对象的时候初始化的capturedRef对象。包含了从submit丢任务进来的时候父线程的数据
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
// 清除不在captured里面的key,同时在这个子线程中,对所有的ThreadLocal进行重新设置值
Object backup = replay(captured);
try {
// 执行实际的线程方法
runnable.run();
} finally {
// 做好还原工作,根据backup
restore(backup);
}
}
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// 做好当前线程的local备份
backup.put(threadLocal, threadLocal.get());
// 清除数据,不在captured里面的。
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 这里就是把值设置到当前线程的TransmittableThreadLocal里面。
setTtlValuesTo(captured);
// 一个钩子
doExecuteCallback(true);
return backup;
}
3.3 总结
- 通过继承InheritableThreadLocal,新成立一个TransmittableThreadLocal类, 该类中有一个hodel变量,用来维护所有的TransmittableThreadLocal引用。
- 在实际submit任务到线程池的时候,我们是需要调用TtlRunnable.get方法,构建一个任务的包装类。这里使用装饰者模式,对runnable线程对象进行装饰包装,在初始化这个包装对象的时候,会获取主线程里面所有的TransmittableThreadLocal引用,以及里面所有的值,这个值其实就是当前父线程里面的(跟你当时创建这个线程的父线程没有任何关系,注意,这里讲的是线程池的场景)。
- 对数据做规整,根据收集到的 captured (这个对象里面存储的都是主线程里面能够获取到TransmittableThreadLocal以及对应的值) 做规整,去掉当前线程里面不需要的(当前线程TheadLocalMap中不存在的),同时将剩余的key和value ,更新到当前线程的ThreadLocal里面。这样就达到了在池化技术里面父子线程传值的安全性
四、使用TTL的好处和必要性
好处:透明且自动完成所有异步执行上下文的可定制、规范化的捕捉与传递。
这个好处也是TransmittableThreadLocal的目标。
必要性:随着应用的分布式微服务化并使用各种中间件,越来越多的功能与组件会涉及不同的上下文,逻辑流程也越来越长;上下文问题实际上是个大的易错的架构问题,需要统一的对业务透明的解决方案。
使用ThreadLocal作为业务上下文传递的经典技术手段在中间件、技术与业务框架中广泛大量使用。而对于生产应用,几乎一定会使用线程池等异步执行组件,以高效支撑线上大流量。但使用ThreadLocal及其set/remove的上下文传递模式,在使用线程池等异步执行组件时,存在多方面的问题:
从业务使用者角度来看
- 繁琐
- 业务逻辑要知道:有哪些上下文;各个上下文是如何获取的。
- 并需要业务逻辑去一个一个地捕捉与传递。
- 依赖
- 需要直接依赖不同ThreadLocal上下文各自的获取的逻辑或类。
- 像RPC的上下文(如Dubbo的RpcContext)、全链路跟踪的上下文(如SkyWalking的ContextManager)、不同业务模块中的业务流程上下文,等等。
- 静态(易漏)
- 因为要 事先 知道有哪些上下文,如果系统出现了一个新的上下文,业务逻辑就要修改添加上新上下文传递的几行代码。也就是说因 系统的 上下文新增,业务的 逻辑就跟进要修改。
- 而对于业务来说,不关心系统的上下文,即往往就可能遗漏,会是线上故障了。
- 随着应用的分布式微服务化并使用各种中间件,越来越多的功能与组件会涉及不同的上下文,逻辑流程也越来越长;上下文问题实际上是个大的易错的架构问题,需要统一的对业务透明的解决方案。
- 定制性
- 因为需要业务逻辑来完成捕捉与传递,业务要关注『上下文的传递方式』:直接传引用?还是拷贝传值?拷贝是深拷贝还是浅拷贝?在不同的上下文会需要不同的做法。
- 『上下文的传递方式』往往是 上下文的提供者(或说是业务逻辑的框架部分)才能决策处理好的;而 上下文的使用者(或说是业务逻辑的应用部分)往往不(期望)知道上下文的传递方式。这也可以理解成是 依赖,即业务逻辑 依赖/关注/实现了 系统/架构的『上下文的传递方式』。
从整体流程实现角度来看
关注的是 上下文传递流程的规范化。上下文传递到了子线程要做好 清理(或更准确地说是要 恢复 成之前的上下文),需要业务逻辑去处理好。如果业务逻辑对清理的处理不正确,比如:
- 如果清理操作漏了:
- 下一次执行可能是上次的,即『上下文的 污染/串号』,会导致业务逻辑错误。
- 『上下文的 泄漏』,会导致内存泄漏问题。
- 如果清理操作做多了,会出现上下文 丢失。
上面的问题,在业务开发中引发的Bug真是屡见不鲜 !本质原因是:ThreadLocal的set/remove的上下文传递模式 在使用线程池等异步执行组件的情况下不再是有效的。常见的典型例子:
- 当线程池满了且线程池的RejectedExecutionHandler使用的是CallerRunsPolicy时,提交到线程池的任务会在提交线程中直接执行,ThreadLocal.remove操作清理提交线程的上下文导致上下文丢失。
- 类似的,使用ForkJoinPool(包含并行执行Stream与CompletableFuture,底层使用ForkJoinPool)的场景,展开的ForkJoinTask会在任务提交线程中直接执行。同样导致上下文丢失。