ThreadLocal的作用
ThreadLocal
顾名思义线程本地,其意义就在于定义了ThreadLocal变量,每个线程都有一份自己的独立的变量副本,多线程的中对此变量的存取都是在操作各自线程中的变量副本,所以不会影响其他线程,同样也不会收到其他线程影响,达到线程隔离的作用。
通常情况下, ThreadLocal
都是定义为 private static
类型,这样ThreadLocal变量就是从属于类对象,它在类加载的时候就初始化完成,整个内存只存一份,所有实例对象共享。
ThreadLocal变量如何做到线程私有
ThreadLocal变量是如何做到每个线程中保存一份副本以达到线程私有的,它原理很简单,因为每个线程 类Thread
中保存有一个 threadLocals
的属性,此属性是一个 ThreadLocal.ThreadLocalMap
类型
// Thread.class
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
所有ThreadLocal变量的值都存在当前线程的threadlocals引用指向的ThreadLocal.ThreadLocalMap数据结构中,仅从名字看,就知道它是一个类似map的容器,保存了所有ThreadLocal变量的值,当需要存取ThreadLocal变量时,它先获取到当前线程的theadLocals指向的ThreadLocalMap,以ThreadLocal对象为key,存取到value的。
所以说,每个线程中都维护了一个ThreadLocal.ThreadLocalMap,保存了所有当前线程需要用到的ThreadLocal类型的变量副本。
源码
ThreadLocal.ThreadLocalMap数据结构
ThreadLocalMap本身就是ThreadLocal的一个静态内部类,首先看一下这个 ThreadLocalMap
内部最关键几个源码
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
private static final int INITIAL_CAPACITY = 16;
private Entry[] table;
private int threshold; // Default to 0
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
ThreadLocalMap
内部又有一个Entry
类,它继承了弱引用WeekReference
,Entry内部维护了两个变量,一个是ThreadLocal,ThreadLocal即为代码中定义的私有静态的ThreadLocal类型的变量(强引用),唯一的区别是它是一个弱引用;另一个是value,也就是我们使用threadLocal维护的值,它是一个强引用
弱引用的作用:弱引用指向的对象(没有其他强引用或软引用指向它)在gc时会被立即回收
- 同时维护了一个Entry类型的数组,这个数组的长度必须是2的N次幂,初始容量为16(2^4),最重要的是这个数组不会存在下边越界,它首位相接,这点从
nextIndex
和preIndex
可以看出来
源码中凡是用到数组数据结构时,其长度必然都是2的n次幂,这样做是为了存放数据时计算索引时减少hash碰撞
- 这个数组可以动态扩容,加载因子为数组长度的2/3,当数组容量达到时将触发扩容,每次扩容后的容量将是原来的2倍(2^n ->2^(n+1))
Entry[]数据结构
链接
源码
set(T value)
分析源码,就从代码中用到的方法 set(T value)
走起
public void set(T value) {
//先找到本线程,线程是java程序运行时基本单元,所有代码都是在线程中运行的
Thread t = Thread.currentThread();
/* 根据当前线程Thread t找到它的ThreadLocalMap数据结构,
* 就是前文中介绍的ThreadLocal类的threadLocals属性
* ThreadLocalMap getMap(Thread t) {
* return t.threadLocals;
* }
*/
ThreadLocalMap map = getMap(t);
if (map != null)
//ThreadLocalMap如果已经初始化过,调用它的set方法,将this(ThreadLocal)和value传递进去
map.set(this, value);
else
/* ThreadLocalMap如果还没初始化过,初始化Map
* void createMap(Thread t, T firstValue) {
* t.threadLocals = new ThreadLocalMap(this, firstValue);
* }
*/
createMap(t, value);
}
初始化ThreadLocalMap
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
//申请容量为16的Entry[]
table = new Entry[INITIAL_CAPACITY];
//根据ThreadLocal变量的hash值跟table.length-1做&操作,计算firstValue需要存放的索引
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
//数组的索引i指向新创建的Entry实例
table[i] = new Entry(firstKey, firstValue);
//数组容量为1
size = 1;
//设置扩容阈值,table.length * 2/3
setThreshold(INITIAL_CAPACITY);
}
真正的set方法
ThreadLocalMap.set(ThreadLocal<?>keyl, Object value)
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
//使用for循环查找,是因为根据ThreadLocal的Hash定位存放索引时可能会碰到hash碰撞,导致存放位置后移
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//如果当前索引处的存放的Entry引用是ThreadLocal key,直接替换,并返回
if (k == key) {
e.value = value;
return;
}
//如果当前索引或后移索引处(发生hash碰撞)没有找到传递的ThreadLocal的key相通的Entry
//反而先发现有Entry的弱引用不存在(remove()或gc),
//将调用replaceStaleEntry方法用传递的value替换掉这个key == null的Entry,
//并清理一段连续的序列中过期的entry,“一段”:标识两个null中间的连续序列,
//传递的value已经存放完毕,直接返回
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
//如果根据ThreadLocal的hash计算的索引处为空,或不为空但发生hash碰撞,
//且后移索引没有找到此ThreadLocal存放过
//则在当前索引或找到后移索引的第一个空槽,将value存放在此处
tab[i] = new Entry(key, value);
//容量+1
int sz = ++size;
//根据传入的索引i清除i+1及之后的过期的槽位(key= null),清理的次数跟传递的sz参数有关
//传递的参数为存放value时的索引,和数组此时的容量
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
set方法总结:
- 根据ThreadLocal变量的hash计算存放的索引处已经存放过此变量的值,二话不说,直接替换,结束流程
- 如果当前索引处为空,同样二话不说,把ThreadLocal和value构造成Entry直接存放此处,然后,清理一次过期的Entry,如果没有过期Entry,并且数组达到阈值,则扩容。
如果发生hash碰撞,且发现找到存放value的索引处存在过期的Entry,则替换掉过期的Entry,并清理一段过期的Entry
先看replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)方法
```java private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
//A run is a sequence of entries between two null slots.
//前文说了这个方法要清理"一段"(a "run")中过期的Entry,一段即两个空slot之间的所有位置
//所以第一个for循环就是往前找需要清理的Entry的索引,直到碰到null
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever
// occurs first
//将索引后移,找到第一个null slot 或者当前ThreadLocal存放位置任意一个后停止查找,存放key和value,
//然后开始先清理一段expungeStaleEntry,再清理一些cleanSomeSlots
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
//根据传递进来的存放过期Entry的索引,往后查找,找是否有当前ThreadLocal存放过的Entry
//如果找到了,则替换为最新的value,并把和传递的staleSlot处的过期Entry交换位置
//
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
//如果往前没有找到过期的Entry,则把交换后过期的Entry处的索引赋给slotToExpunge
if (slotToExpunge == staleSlot)
slotToExpunge = i;
//先调用expungeStaleEntry清理一段(a run)过期的Entry,
//expungeStaleEntry参数slotToExpunge为清理开始位置的索引,清理到下一个null slot
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
//先前往前扫描没有发现过期Entry,且在往后扫描过程中第一次发现过期的Entry时,
//将此时索引付给slotToExpunge,标识从此处开始清理
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
//可以看到replaceStaleEntry方法调用时,当前的slot就是需要存放key和value的位置
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
//slotToExpunge如果不等于传递进来的staleSlot,
//则表示除了当前staleSlot处(已存放key value,不再是被清理对象),还有其他entry需要清理
//清理过程同上
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
<a name="82mgj"></a>
##### replaceStaleEntry总结:
1. 当调用 `replaceStaleEntry` 方法时,说明set Value的索引处是过期的Entry,需要干掉它,放置新的value
1. 与此同时,要扫描Entry[]的一个段,看是否有过期的Entry,如果有,则调用清理方法 `expungeStaleEntry` 和 `cleanSomeSlots`
<a name="QsAIW"></a>
#### 清除一段过期数据方法expungeStaleEntry(int staleSlot)
此方法清除staleSlot处的和下一个null slot之前所有的过期的entry,并返回下一个null slot的索引
```java
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
//清除当前stalSlot处的entry
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
//扫描下一个null slot之前的所有过期的entry,并清除
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {//过期的entry 清除
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
//没有过期的entry,重新rehash,如果当前的entry没有处在正确位置,
//则交换到其真正的位置
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
//如果真正位置处已有entry,则往后扫描到null slot处,挪动之
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
//返回stalSlot后第一个null slot的索引
return i;
}
清除过期entry的另一个方法cleanSomeSlots(int i, int n)
此方法在添加一个新元素的时候(table[i] = new Entry(key,value))或者清理过期entry(repalceStaleEntry)的时候调用
参数n使用来控制扫描的次数
- 当insert新元素(Entry)时,n为数组中元素个数
- 当replaceStaleEntry的中调用时,传递table的长度
扫描log2(n)次数,这个扫描控制源码中有说可以更改
private boolean cleanSomeSlots(int i, int n) {
//标识 :是否清除了entry
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
//取下一个索引
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
//一旦有可以清除的entry,n就变成table.length
n = len;
removed = true;
//真正清除entry还是调用了expungeStaleEntry,清除i到下一个null slot之间的过期entry
//并返回 null slot的索引
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);//如果logN次都没有扫描到过期的就结束扫描
//当null slot后连续logN的索引处至少有一个过期entry,那么将清除整个table中的过期entry
return removed;
}
再回到set方法的最后两句代码,当调用cleanSomeSlots没有清除到过期entry,并且table数组元素个数达到阈值时,将调用rehash()
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
rehash()
private void rehash() {
//清除所有过期entry
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
//使用较低的加倍阈值以避免滞后
//当元素个数大于等于table.length/2,调用resize进行扩容
if (size >= threshold - threshold / 4)
resize();
}
真正清理所有过期entry的方法expungeStaleEntries()
其实真正的清理方法还是expungeStaleEntry,功臣啊
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
//是你,是你,还是你
//如果使用返回的索引,可以减少循环的次数,
//因为expungeStaleEntry清理了a "run"的所有stale entry
expungeStaleEntry(j);
}
}
resize()扩容
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
//扩容原来的2倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
//挪动旧table中的entry到新的entry中
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
//重新hash定位,存储
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
至此ThreadLocal.set(T value)已经分析完毕,下面看另外连个方法get()和remove(),这两个方法就比较简单了
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
//当获取ThreadLocal变量的值时,如果ThreadLocalMap已经初始化过,则直接从map中获取
if (map != null) {
//使用ThreadLocalMap.getEntry(ThreadLocal t)获取
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
//如果ThreadLocalMap不存在,则初始化map,返回,使用初始化的值,并返回
return setInitialValue();
}
getEntry(ThreadLocal t)
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
//计算的索引处刚好是当前ThreadLocal(没有碰撞),直接返回
if (e != null && e.get() == key)
return e;
else//有碰撞,则调用getEntryAfterMiss
return getEntryAfterMiss(key, i, e);
}
getEntryAfterMiss()
向后查找直到下一个null slot处,如果都没有找到,返回null,如果过程中有过期的entry,则调用expungeStaleEntry(i)清除
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
remove()方法
这个方法很简单,计算索引,索引处刚好是,则清除,查找区间 索引处至null slot之前
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
//手动清除弱引用,前文中都是弱引用为null时,才调expungeStaleEntry清理
e.clear();
expungeStaleEntry(i);
return;
}
}
}
注:
每一个ThreadLocal实例都有一个自己的hash值,它使用到了一个模数0x61c88647
通过理论与实践,当我们用0x61c88647作为魔数累加为每个ThreadLocal分配各自的ID也就是threadLocalHashCode再与2的幂取模,得到的结果分布很均匀。ThreadLocalMap使用的是线性探测法,均匀分布的好处在于很快就能探测到下一个临近的可用slot,从而保证效率。这就是为什么Entry[]数组大小要为2的幂的问题,为了优化效率。
ThreadLocal源码分析完毕,至此ThreadLocal已经没什么秘密了。