1、SuppliedThreadLocal
ThreadLocal 的子类,用于实现 Java 8 之后的函数式编程,它从指定的 Supplier 获取其初始值。
static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
// 结果的提供者
private final Supplier<? extends T> supplier;
SuppliedThreadLocal(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier);
}
@Override
protected T initialValue() {
return supplier.get();
}
}
2、ThreadLocalMap
ThreadLocalMap 是一个定制的散列映射,它仅供 ThreadLocal 内部使用。不会在 ThreadLocal 类之外导出任何操作。该类是包私有的,以允许在类 Thread 中声明字段。为了帮助清理非常大且长时间存在的元素,散列表 entries[] 使用 WeakReferences 作为键。但是,由于不使用引用队列,只有当 entries[] 的空间耗尽时,才会删除旧的元素。
2.1、内部类 Entity
从类图可知,Entry 是 ThreadLocalMap 里自定义的节点,它继承了 WeakReference 类。源码如下,Entry 内部定义了一个类型为 Object 的 value 字段,用于存放保在 ThreadLocal 里的值。请注意,空键(即 entry.get() == null)意味着不再引用该键,因此可以从表中将其删除。
static class Entry extends WeakReference<ThreadLocal<?>> {
// 与当前 ThreadLocal 关联的值,保存实际保存的值
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
2.2、属性
ThreadLocalMap 内部维护一个 Entity 数组,并且要求数组的大小必须为 2 的幂,同时记录数组里 entity 的个数以及下一次扩容的阈值。
// table[] 的初始容量,必须是 2 的幂次
private static final int INITIAL_CAPACITY = 16;
// Entity 表,根据需要调整大小
// 该数组的长度必须是 2 的幂次
private Entry[] table;
// table 数组中 entry 的数量
private int size = 0;
// 重新分配表大小的阈值,默认为 0
private int threshold; // Default to 0
从下面的代码可以看出,ThreadLocalMap 需要维持一个最坏 2/3 的负载因子,还有两个方法用于获取上一个和下一个索引,这里实际上是环形数组的上一个和下一个索引。
// 设置扩容后的阈值,以在最坏情况下保持 2/3 的负载因子。
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
// table 数组的下一个索引
// 索引加 1,对数组长度取模
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
// table 数组的前一个索引
// 索引减 1,对数组长度取模
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
ThreadLocalMap 使用的是线性探测法来解决散列冲突,所以实际上 table 数组在程序逻辑上是一个环形的 Hash 数组。其内部存储结构如图所示,ThreadLocalMap 的 key 实际上是指向该 ThreadLocal 对象的弱引用,value 是线程往该 ThreadLocal 中实际设置的值。
2.3、构造函数
2.3.1、ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue)
构造一个最初只包含 (firstKey, firstValue)的新 map。ThreadLocalMap 是惰性构造的,所以只有当至少有一个 Entity 需要保存时才创建它。该构造函数在 set 和 get 的时候都可能会被间接调用,以初始化线程的 ThreadLocalMap。
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 初始化 table 数组
table = new Entry[INITIAL_CAPACITY];
// 获取 firstKey 在数组中的索引 i
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 将 firstValue 保存在 table[] 的 i 位置
// 初始化时,i 位置肯定是空的,直接插入即可
table[i] = new Entry(firstKey, firstValue);
// table[] 中元素的计数
size = 1;
// 设定需要扩容的阈值
setThreshold(INITIAL_CAPACITY);
}
2.3.2、ThreadLocalMap(ThreadLocalMap parentMap)
该方法主要就是以父线程的 inheritableThreadLocals 为数据源,过滤出有效的 entry,初始化到自己的 threadLocals 中。其中 childValue 可以被重写。
需要注意的地方是 InheritableThreadLocal 只是在子线程创建的时候会去拷一份父线程的 inheritableThreadLocals。如果父线程是在子线程创建后再 set 某个 InheritableThreadLocal 对象的值,对子线程是不可见的。
private ThreadLocalMap(ThreadLocalMap parentMap) {
// 获取 parentMap 的 table 数组
Entry[] parentTable = parentMap.table;
// 获取数组长度
int len = parentTable.length;
// 设定需要扩容的阈值
setThreshold(len);
// 初始化当前 ThreadLocalMap 的 table
table = new Entry[len];
// 遍历 parentMap 的 table,将其放入新的 ThreadLocalMap 的 table 中
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
// 先判断 j 位置的元素是否为空
if (e != null) {
@SuppressWarnings("unchecked")
// 获取该节点的 key
// 调用 Reference.get 方法获取 referent
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
// 该方法在 InheritableThreadLocal 中默认实现为返回本身值,可以被重写
Object value = key.childValue(e.value);
// 根据 key 和 value 构建新的 Entity c
Entry c = new Entry(key, value);
// 获取 key 在数组中的索引 h
int h = key.threadLocalHashCode & (len - 1);
// 插入 table 之前,先判断 h 位置是否被占用
// 如果被占用,则使用线性探测法来解决散列冲突
while (table[h] != null)
// 向后遍历,直到找到为空的位置结束循环
h = nextIndex(h, len);
// 将新建的 c 存入 table 数组的 h 处
table[h] = c;
// table[] 中元素的计数加 1
size++;
}
}
}
}
2.4、方法
2.4.1、getEntry
该方法会被 ThreadLocal 的 get 方法直接调用,用于获取 map 中某个 ThreadLocal 存放的数据。此方法本身仅处理快速路径:先获取 key 对应 table 的位置 i;获取 i 位置的元素 e;如果满足 e != null && e.get() == key 则直接返回。否则调用 getEntryAfterMiss 获取。 该方法旨在最大限度地提高直接命中的性能,部分原因是使该方法易于内联。
private Entry getEntry(ThreadLocal<?> key) {
// 获取 key 对应 table 的位置 i
int i = key.threadLocalHashCode & (table.length - 1);
// 直接获取 i 位置的元素 e
Entry e = table[i];
// 判断 e 的引用和 key 是否相等
if (e != null && e.get() == key)
return e;
else
// 继续线性探测寻找 key 对应的元素
return getEntryAfterMiss(key, i, e);
}
2.4.2、getEntryAfterMiss
当调用 getEntry 方法未直接命中的时候,调用此方法线性探测寻找 key
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
// 获取当前 ThreadLocalMap 的 table
Entry[] tab = table;
// 获取 table 的长度
int len = tab.length;
// 继续线性探测法,不断向后探测
// 前提条件是 e 不为 null
while (e != null) {
// 获取 e 的 ThreadLocal 弱引用 k
ThreadLocal<?> k = e.get();
// 判断 k 是否等于 key
// 如果相等,则表示找到目标,直接返回 e
if (k == key)
return e;
// 如果 k 为空,则表示 e 对应的 ThreadLocal 已经被回收了
if (k == null)
// 调用 expungeStaleEntry 清理无效的 entry,从 i 位置
expungeStaleEntry(i);
else
// 获取接下来要访问的索引值
i = nextIndex(i, len);
// e 记录 i 位置的元素
e = tab[i];
}
// 如果没有找到,则表示 table 中没有 key 对应的元素
return null;
}
2.4.3、expungeStaleEntry
该方法是 ThreadLocal 中核心清理方法,它主要从 staleSlot 开始遍历直到遇到 entry 为 null 为止,清除 table[] 中无效的 entry,即弱引用指向的对象被回收。清除操作,其实就是将对应 entry 中的 value 置为 null,并将 table[staleSlot] 置为 null,直到遇到空 entry。另外,在过程中还会对非空的 entry 作 rehash。
这里采用的是线性探测法,删除的元素很可能是多个冲突元素中的一个,所以需要对后面的元素做处理,可以简单理解就是让后面的元素往前面移动。为什么要这样做?主要是开放地址寻找元素的时候,遇到 null 就停止寻找了;前面当 k == null 时,就会设置 table[i] = null,后面数据移动的话,后面冲突的元素就永远都访问不到。
private int expungeStaleEntry(int staleSlot) {
// 获取当前 ThreadLocalMap 的 table
Entry[] tab = table;
// 获取 table 的长度
int len = tab.length;
// expunge entry at staleSlot
// 删除 table 中 staleSlot 位置的 entry
// value 设为 null,显式断开强引用
tab[staleSlot].value = null;
// 显式设置该 entry 为 null,以便垃圾回收
tab[staleSlot] = null;
// 计数减 1
size--;
// rehash、清理 直到 table[i] == null 结束
Entry e;
int i;
// 从 staleSlot 开始线性探测,直到 tab[i] 为 null 为止
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
// 获取下一个要访问的索引位置
i = nextIndex(i, len)) {
// 获取 e 对应的 ThreadLocal 弱引用
ThreadLocal<?> k = e.get();
// 如果 k 为空,则表示 e 对应的 ThreadLocal 已经被回收了,
// 即 e 为无效 entry,需要被清除
if (k == null) {
// 清除 e,计数减 1
e.value = null;
tab[i] = null;
size--;
} else {
// 重新计数 k 对应 table 的索引值
int h = k.threadLocalHashCode & (len - 1);
// 如果 h != i 则说明 k 之前因为 hash 冲突,通过线性探测找到的位置
// 所以这里需要重新寻找 k 的位置 h
if (h != i) {
// 将原来 i 位置的 entry 清除
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
// 必须扫描 tab 直到 tab[h] 为空结束
while (tab[h] != null)
h = nextIndex(h, len);
// 将 e 放到新位置
tab[h] = e;
}
}
}
return i;
}
该方法源码注释中有提到 Knuth 的著作《计算机程序设计艺术》的 6.4 章节中的 R 算法。R 算法描述了如何从使用线性探测的散列表中删除一个元素。R 算法维护了一个上次删除元素的 index,当在非空连续段中扫到某个 entry 的哈希值取模后的索引还没遍历到时,会将该 entry 挪到 index 的位置,并更新当前位置为新的 index,继续向后扫描直到遇到空的 entry。
ThreadLocalMap 因为使用了弱引用,所以其实每个 slot 的状态有三种即有效(value 未回收)、无效(value 已回收)、空(entry == null)。正是因为 ThreadLocalMap 的 entry 有三种状态,所以不能完全套 Knuth 的 R 算法。因为 expungeStaleEntry 方法在扫描过程中还会对无效 slot 清理将其转为空 slot,如果直接套用 R 算法,可能会出现具有相同哈希值的 entry 之间断开。
2.4.4、set
该方法不像 get 方法那样使用快速路径,因为使用 set 方法时,创建新的 entry 和替换现有的 entry 一样常见。在这种情况下,快速路径通常会失败。set 方法一般步骤如下
- 探测过程中 key 都是有效的,并且找到 key == k 的 i,直接替换该位置的 value 即可
- 探测过程中找到无效 slot,即 k == null,则调用 replaceStaleEntry。replaceStaleEntry 方法会把 key 和 value 存入 table[] 中,并且会尽可能清理 table[] 中无效的 Entry。
- 探测没有发现 key,则在连续段末尾的后一个空位置保存该 key value,这也是线性探测的一部分。保存成功后,会做一次启发式清理,如果没清理成功,且当前 table[] 的长度已经达到需要扩容的阈值 threshold,则调用 rehash。
- rehash 方法会先调用 expungeStaleEntries 做一次全量清理,如果清理完,table[] 的长度仍然超过四分之三的 threshold 则会进行两倍扩容
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
// 计算 key 在 table 数组中的下标 i
int i = key.threadLocalHashCode & (len-1);
// 从 i 位置开始线性探测,找到第一个 tab[i] 为 null 的位置 i
// 这里遍历的逻辑是,先通过 hash 找到数组下标,然后寻找相等的 ThreadLocal 对象
// 找不到就往下一个 index 找,有两种可能会退出这个循环
// 1. 找到了相同 ThreadLocal 对象
// 2. 一直往数组下一个下标查询,直到下一个下标对应的是 null 跳出
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 如果 key 相等,覆盖 value
if (k == key) {
e.value = value;
return;
}
// 如果 key 为 null,用新 key、value 覆盖
// 这里表示 table[i] 之前有元素,但是 key 已经无效了
// 这里直接用新的 key、value 覆盖替换即可
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 运行到这里表示,表示找到 i 位置,该位置没有存放过任何元素
// 将 key value,存入 table 数组的 i 处
tab[i] = new Entry(key, value);
// table 数组中 entry 的数量加 1
int sz = ++size;
// 清理一遍无效老数据,并判断是否需要扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 清理失败,并且 sz 大于等于要扩容的阈值
// 调用 rehash 扩容,重新计算 hash
rehash();
}
2.4.5、replaceStaleEntry
使用 key-value 替换 staleSlot 位置的元素。作为副作用,此方法将会清除包含陈旧 Entry 的运行中的所有陈旧 Entry。 其中运行是两个空槽之间的 entry 连续区间。
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// 这里采用的是从 staleSlot 的位置向前遍历 i--
// 是为了把前面所有已经被垃圾回收的也一起清除
// 注意:这里只是 key 被回收,value 还没被回收,entry 更加没回收,所以需要让他们回收
// 同时也避免这样存在很多过期的对象的占用,
// 导致这个时候刚好来了一个新的元素达到阈值而触发一次新的 rehash
int slotToExpunge = staleSlot;
// 向前遍历 table,直到 tab[i] 为 null 为止
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
// slotToExpunge 记录 staleSlot 左手边第一个空 Entry
// 到 staleSlot 之间 key 过期最小的 index
if (e.get() == null)
slotToExpunge = i;
// 从数组 staleSlot 开始往后遍历,i++,跟上面相反
// 这两个遍历就是为了在左边遇到的第一个空的 entry
// 到右边遇到的第一空的 entry 之间,查询所有无效的对象
// 注意:在右边如果找到需要设置值的 key 相同的时候就开始清理
// 然后返回,不再继续遍历下去
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 找到 key,将其与无效的 slot 交换
if (k == key) {
// 更新对应 slot 的 value 值
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 这里的意思是第一个 for 循环向前查找的时候没有找到无效的 entry
// 只有 staleSlot 这个过期,由于前面无效的对象已经通过交换位置的方式放到 i 上
// 所以需要清理的位置时 i,而不是传过来的 staleSlot
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 从 slotToExpunge 开始连续段清理+启发式清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// slotToExpunge == staleSlot 表示在第一个向前遍历的循环中没有找到无效对象
// k == null 表示 i 处为无效对象的位置
// 两个条件同时满足时,将 slotToExpunge 设置为 i 这里是为了保证
// slotToExpunge 的值等于向后遍历的第一个无效对象的位置 i
// 如果数组中存在要设置的 key,那么上面也会通过交换位置的时候把有效值迁移到 staleSlot
// staleSlot 位置上不管怎么样,存放的都是有效值,所以不需要清理
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// 如果 key 在 table 中不存在,则直接新建一个新的存进去即可
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// 在探测过程中,如果发现任何无效 slot,则做一次清理
if (slotToExpunge != staleSlot)
// 连续段清理+启发式清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
2.4.6、cleanSomeSlots
扫描数组中部分槽位,清理无效 entry,通过 n 控制扫描次数。正常情况下,log(len) 次扫描没有发现无效 entry,扫描操作就会结束;但是如果发现了无效 entry 则重置 n 为 table[] 的长度 len,调用 expungeStaleEntry 做一次连续段的清理,再从下一个空的槽位开始继续扫描。该方法是尝试查找部分无效 entry 并进行清理操作,这里为了降低扫描次数,并不一一进行检查操作。
该方法仅在 set 方法执行的时候可能会被调用,可能调用的地方,一个是插入的时候可能会被调用,另一个是在替换无效槽位的时候可能会被调用。两者的区别是,前者传入的 n 为元素的个数,后者为 table[] 的长度。
private boolean cleanSomeSlots(int i, int n) {
// 标志清理状态,true 表示成功s
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
// i 是已知不会持有无效 entry 的位置,所以扫描从 i 之后的元素开始
i = nextIndex(i, len);
Entry e = tab[i];
// 判断 e 是否是无效 entry
if (e != null && e.get() == null) {
// 条件成立,表示 e 是无效 entry,需要清理
// 重置 n 为 table[] 长度
n = len;
// 设置清理状态为 true
removed = true;
// 清理一个连续段
// 从 i 处开始清理,直到遇到 table[i] 为空为止
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
2.4.7、rehash
对整个 table 哈希槽进行 rehash 操作,同时判断是否需要进行扩容操作。
private void rehash() {
// 首先执行一次全量清理
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
// 因为执行了一次全量清理,所以 size 很可能会变小
// 重新判断是否需要扩容
if (size >= threshold - threshold / 4)
// 调用扩容逻辑
resize();
}
2.4.8、expungeStaleEntries
通过 expungeStaleEntries 清理 table[] 中所有无效的 entry。
private void expungeStaleEntries() {
// 记录当前 table[]
Entry[] tab = table;
int len = tab.length;
// 遍历 table[]
for (int j = 0; j < len; j++) {
Entry e = tab[j];
// 校验 table[j] 处的 entry 是否无效
if (e != null && e.get() == null)
// e 为无效 entry 时,则需要调用 expungeStaleEntry 删除 e
// 该方法会删除 [table[j], table[jj] == null] 区间中所有无效 entry
expungeStaleEntry(j);
}
}
2.4.9、resize
扩容,因为需要保证 table 的容量 len 为 2 的幂,所以会每次按其长度的 2 倍扩容。
private void resize() {
// 获取当前 ThreadLocalMap 的 table
Entry[] oldTab = table;
// 原数组的长度
int oldLen = oldTab.length;
// 新数组的长度,即扩容后的长度
int newLen = oldLen * 2;
// 创建长度为 newLen 的新数组
Entry[] newTab = new Entry[newLen];
// 记录新数组中的元素的数量
int count = 0;
// 遍历原数组 oldTab,将元素迁移到扩容后的 newTab
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
// 判断当前位置是否有 entry
if (e != null) {
ThreadLocal<?> k = e.get();
// 如果无效的话,直接舍弃,并将其 value 设置为 null,不需要迁移,
// 这里设置 value 为 null,是为帮助 GC
if (k == null) {
e.value = null; // Help the GC
} else {
// 计算 k 的索引,并使用线性探测解决 hash 冲突
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
// 设置需要扩容的阈值
setThreshold(newLen);
// 设置数组中元素的数量
size = count;
// 将 table 更新为 newTab
table = newTab;
}
2.4.10、remove
该方法相对于 getEntry 和 set 方法比较简单,直接在 table[] 中寻找 key,如果找到了,将其移除并调用 expungeStaleEntry 做一次段清理。
private void remove(ThreadLocal<?> key) {
// 获取当前 ThreadLocalMap 的 table
Entry[] tab = table;
// 获取 table 的长度
int len = tab.length;
// 计算 key 在 table[] 中对应的索引
int i = key.threadLocalHashCode & (len-1);
// 从 i 处开始向后线性探测,直到找到空为止
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
// 调用 Reference.get 方法获取 e 的引用
// 判断 e 对应的弱引用是否等于 key
if (e.get() == key) {
// 调用 Reference.clear 方法清除 e 的引用
e.clear();
// 调用 expungeStaleEntry 清理无效的 entry,从 i 位置
expungeStaleEntry(i);
return;
}
}
}
// Reference 中的方法,e.get() e.clear()
public void clear() {
this.referent = null;
}
public T get() {
return this.referent;
}