ThreadLocal的作用

ThreadLocal 顾名思义线程本地,其意义就在于定义了ThreadLocal变量,每个线程都有一份自己的独立的变量副本,多线程的中对此变量的存取都是在操作各自线程中的变量副本,所以不会影响其他线程,同样也不会收到其他线程影响,达到线程隔离的作用。
通常情况下, ThreadLocal 都是定义为 private static 类型,这样ThreadLocal变量就是从属于类对象,它在类加载的时候就初始化完成,整个内存只存一份,所有实例对象共享。

ThreadLocal变量如何做到线程私有

ThreadLocal变量是如何做到每个线程中保存一份副本以达到线程私有的,它原理很简单,因为每个线程 类Thread 中保存有一个 threadLocals 的属性,此属性是一个 ThreadLocal.ThreadLocalMap 类型

  1. // Thread.class
  2. /* ThreadLocal values pertaining to this thread. This map is maintained
  3. * by the ThreadLocal class. */
  4. ThreadLocal.ThreadLocalMap threadLocals = null;

所有ThreadLocal变量的值都存在当前线程的threadlocals引用指向的ThreadLocal.ThreadLocalMap数据结构中,仅从名字看,就知道它是一个类似map的容器,保存了所有ThreadLocal变量的值,当需要存取ThreadLocal变量时,它先获取到当前线程的theadLocals指向的ThreadLocalMap,以ThreadLocal对象为key,存取到value的。

所以说,每个线程中都维护了一个ThreadLocal.ThreadLocalMap,保存了所有当前线程需要用到的ThreadLocal类型的变量副本。

源码

ThreadLocal.ThreadLocalMap数据结构

ThreadLocalMap本身就是ThreadLocal的一个静态内部类,首先看一下这个 ThreadLocalMap 内部最关键几个源码

  1. static class Entry extends WeakReference<ThreadLocal<?>> {
  2. /** The value associated with this ThreadLocal. */
  3. Object value;
  4. Entry(ThreadLocal<?> k, Object v) {
  5. super(k);
  6. value = v;
  7. }
  8. }
  9. private static final int INITIAL_CAPACITY = 16;
  10. private Entry[] table;
  11. private int threshold; // Default to 0
  12. private void setThreshold(int len) {
  13. threshold = len * 2 / 3;
  14. }
  15. private static int nextIndex(int i, int len) {
  16. return ((i + 1 < len) ? i + 1 : 0);
  17. }
  18. private static int prevIndex(int i, int len) {
  19. return ((i - 1 >= 0) ? i - 1 : len - 1);
  20. }
  • ThreadLocalMap 内部又有一个 Entry 类,它继承了弱引用 WeekReference ,Entry内部维护了两个变量,一个是ThreadLocal,ThreadLocal即为代码中定义的私有静态的ThreadLocal类型的变量(强引用),唯一的区别是它是一个弱引用;另一个是value,也就是我们使用threadLocal维护的值,它是一个强引用

弱引用的作用:弱引用指向的对象(没有其他强引用或软引用指向它)在gc时会被立即回收

  • 同时维护了一个Entry类型的数组,这个数组的长度必须是2的N次幂,初始容量为16(2^4),最重要的是这个数组不会存在下边越界,它首位相接,这点从 nextIndexpreIndex 可以看出来

源码中凡是用到数组数据结构时,其长度必然都是2的n次幂,这样做是为了存放数据时计算索引时减少hash碰撞

  • 这个数组可以动态扩容,加载因子为数组长度的2/3,当数组容量达到时将触发扩容,每次扩容后的容量将是原来的2倍(2^n ->2^(n+1))

Entry[]数据结构
链接584724-20170501020337211-761293878.png

源码

set(T value)

分析源码,就从代码中用到的方法 set(T value) 走起

  1. public void set(T value) {
  2. //先找到本线程,线程是java程序运行时基本单元,所有代码都是在线程中运行的
  3. Thread t = Thread.currentThread();
  4. /* 根据当前线程Thread t找到它的ThreadLocalMap数据结构,
  5. * 就是前文中介绍的ThreadLocal类的threadLocals属性
  6. * ThreadLocalMap getMap(Thread t) {
  7. * return t.threadLocals;
  8. * }
  9. */
  10. ThreadLocalMap map = getMap(t);
  11. if (map != null)
  12. //ThreadLocalMap如果已经初始化过,调用它的set方法,将this(ThreadLocal)和value传递进去
  13. map.set(this, value);
  14. else
  15. /* ThreadLocalMap如果还没初始化过,初始化Map
  16. * void createMap(Thread t, T firstValue) {
  17. * t.threadLocals = new ThreadLocalMap(this, firstValue);
  18. * }
  19. */
  20. createMap(t, value);
  21. }

初始化ThreadLocalMap

  1. ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
  2. //申请容量为16的Entry[]
  3. table = new Entry[INITIAL_CAPACITY];
  4. //根据ThreadLocal变量的hash值跟table.length-1做&操作,计算firstValue需要存放的索引
  5. int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
  6. //数组的索引i指向新创建的Entry实例
  7. table[i] = new Entry(firstKey, firstValue);
  8. //数组容量为1
  9. size = 1;
  10. //设置扩容阈值,table.length * 2/3
  11. setThreshold(INITIAL_CAPACITY);
  12. }

真正的set方法

ThreadLocalMap.set(ThreadLocal<?>keyl, Object value)

  1. private void set(ThreadLocal<?> key, Object value) {
  2. Entry[] tab = table;
  3. int len = tab.length;
  4. int i = key.threadLocalHashCode & (len-1);
  5. //使用for循环查找,是因为根据ThreadLocal的Hash定位存放索引时可能会碰到hash碰撞,导致存放位置后移
  6. for (Entry e = tab[i];
  7. e != null;
  8. e = tab[i = nextIndex(i, len)]) {
  9. ThreadLocal<?> k = e.get();
  10. //如果当前索引处的存放的Entry引用是ThreadLocal key,直接替换,并返回
  11. if (k == key) {
  12. e.value = value;
  13. return;
  14. }
  15. //如果当前索引或后移索引处(发生hash碰撞)没有找到传递的ThreadLocal的key相通的Entry
  16. //反而先发现有Entry的弱引用不存在(remove()或gc),
  17. //将调用replaceStaleEntry方法用传递的value替换掉这个key == null的Entry,
  18. //并清理一段连续的序列中过期的entry,“一段”:标识两个null中间的连续序列,
  19. //传递的value已经存放完毕,直接返回
  20. if (k == null) {
  21. replaceStaleEntry(key, value, i);
  22. return;
  23. }
  24. }
  25. //如果根据ThreadLocal的hash计算的索引处为空,或不为空但发生hash碰撞,
  26. //且后移索引没有找到此ThreadLocal存放过
  27. //则在当前索引或找到后移索引的第一个空槽,将value存放在此处
  28. tab[i] = new Entry(key, value);
  29. //容量+1
  30. int sz = ++size;
  31. //根据传入的索引i清除i+1及之后的过期的槽位(key= null),清理的次数跟传递的sz参数有关
  32. //传递的参数为存放value时的索引,和数组此时的容量
  33. if (!cleanSomeSlots(i, sz) && sz >= threshold)
  34. rehash();
  35. }

set方法总结:
  1. 根据ThreadLocal变量的hash计算存放的索引处已经存放过此变量的值,二话不说,直接替换,结束流程
  2. 如果当前索引处为空,同样二话不说,把ThreadLocal和value构造成Entry直接存放此处,然后,清理一次过期的Entry,如果没有过期Entry,并且数组达到阈值,则扩容。
  3. 如果发生hash碰撞,且发现找到存放value的索引处存在过期的Entry,则替换掉过期的Entry,并清理一段过期的Entry

    先看replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)方法

    ```java private void replaceStaleEntry(ThreadLocal<?> key, Object value,

    1. int staleSlot) {
    2. Entry[] tab = table;
    3. int len = tab.length;
    4. Entry e;
    5. //A run is a sequence of entries between two null slots.
    6. //前文说了这个方法要清理"一段"(a "run")中过期的Entry,一段即两个空slot之间的所有位置
    7. //所以第一个for循环就是往前找需要清理的Entry的索引,直到碰到null
    8. int slotToExpunge = staleSlot;
    9. for (int i = prevIndex(staleSlot, len);
    10. (e = tab[i]) != null;
    11. i = prevIndex(i, len))
    12. if (e.get() == null)
    13. slotToExpunge = i;
    14. // Find either the key or trailing null slot of run, whichever
    15. // occurs first
    16. //将索引后移,找到第一个null slot 或者当前ThreadLocal存放位置任意一个后停止查找,存放key和value,
    17. //然后开始先清理一段expungeStaleEntry,再清理一些cleanSomeSlots
    18. for (int i = nextIndex(staleSlot, len);
    19. (e = tab[i]) != null;
    20. i = nextIndex(i, len)) {
    21. ThreadLocal<?> k = e.get();
    22. //根据传递进来的存放过期Entry的索引,往后查找,找是否有当前ThreadLocal存放过的Entry
    23. //如果找到了,则替换为最新的value,并把和传递的staleSlot处的过期Entry交换位置
    24. //
    25. if (k == key) {
    26. e.value = value;
    27. tab[i] = tab[staleSlot];
    28. tab[staleSlot] = e;
    29. //如果往前没有找到过期的Entry,则把交换后过期的Entry处的索引赋给slotToExpunge
    30. if (slotToExpunge == staleSlot)
    31. slotToExpunge = i;
    32. //先调用expungeStaleEntry清理一段(a run)过期的Entry,
    33. //expungeStaleEntry参数slotToExpunge为清理开始位置的索引,清理到下一个null slot
    34. cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    35. return;
    36. }
    37. // If we didn't find stale entry on backward scan, the
    38. // first stale entry seen while scanning for key is the
    39. // first still present in the run.
    40. //先前往前扫描没有发现过期Entry,且在往后扫描过程中第一次发现过期的Entry时,
    41. //将此时索引付给slotToExpunge,标识从此处开始清理
    42. if (k == null && slotToExpunge == staleSlot)
    43. slotToExpunge = i;
    44. }
    45. // If key not found, put new entry in stale slot
    46. //可以看到replaceStaleEntry方法调用时,当前的slot就是需要存放key和value的位置
    47. tab[staleSlot].value = null;
    48. tab[staleSlot] = new Entry(key, value);
    49. // If there are any other stale entries in run, expunge them
    50. //slotToExpunge如果不等于传递进来的staleSlot,
    51. //则表示除了当前staleSlot处(已存放key value,不再是被清理对象),还有其他entry需要清理
    52. //清理过程同上
    53. if (slotToExpunge != staleSlot)
    54. cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    55. }
  1. <a name="82mgj"></a>
  2. ##### replaceStaleEntry总结:
  3. 1. 当调用 `replaceStaleEntry` 方法时,说明set Value的索引处是过期的Entry,需要干掉它,放置新的value
  4. 1. 与此同时,要扫描Entry[]的一个段,看是否有过期的Entry,如果有,则调用清理方法 `expungeStaleEntry` 和 `cleanSomeSlots`
  5. <a name="QsAIW"></a>
  6. #### 清除一段过期数据方法expungeStaleEntry(int staleSlot)
  7. 此方法清除staleSlot处的和下一个null slot之前所有的过期的entry,并返回下一个null slot的索引
  8. ```java
  9. private int expungeStaleEntry(int staleSlot) {
  10. Entry[] tab = table;
  11. int len = tab.length;
  12. // expunge entry at staleSlot
  13. //清除当前stalSlot处的entry
  14. tab[staleSlot].value = null;
  15. tab[staleSlot] = null;
  16. size--;
  17. // Rehash until we encounter null
  18. //扫描下一个null slot之前的所有过期的entry,并清除
  19. Entry e;
  20. int i;
  21. for (i = nextIndex(staleSlot, len);
  22. (e = tab[i]) != null;
  23. i = nextIndex(i, len)) {
  24. ThreadLocal<?> k = e.get();
  25. if (k == null) {//过期的entry 清除
  26. e.value = null;
  27. tab[i] = null;
  28. size--;
  29. } else {
  30. int h = k.threadLocalHashCode & (len - 1);
  31. //没有过期的entry,重新rehash,如果当前的entry没有处在正确位置,
  32. //则交换到其真正的位置
  33. if (h != i) {
  34. tab[i] = null;
  35. // Unlike Knuth 6.4 Algorithm R, we must scan until
  36. // null because multiple entries could have been stale.
  37. //如果真正位置处已有entry,则往后扫描到null slot处,挪动之
  38. while (tab[h] != null)
  39. h = nextIndex(h, len);
  40. tab[h] = e;
  41. }
  42. }
  43. }
  44. //返回stalSlot后第一个null slot的索引
  45. return i;
  46. }

清除过期entry的另一个方法cleanSomeSlots(int i, int n)

此方法在添加一个新元素的时候(table[i] = new Entry(key,value))或者清理过期entry(repalceStaleEntry)的时候调用
参数n使用来控制扫描的次数

  1. 当insert新元素(Entry)时,n为数组中元素个数
  2. 当replaceStaleEntry的中调用时,传递table的长度

扫描log2(n)次数,这个扫描控制源码中有说可以更改

  1. private boolean cleanSomeSlots(int i, int n) {
  2. //标识 :是否清除了entry
  3. boolean removed = false;
  4. Entry[] tab = table;
  5. int len = tab.length;
  6. do {
  7. //取下一个索引
  8. i = nextIndex(i, len);
  9. Entry e = tab[i];
  10. if (e != null && e.get() == null) {
  11. //一旦有可以清除的entry,n就变成table.length
  12. n = len;
  13. removed = true;
  14. //真正清除entry还是调用了expungeStaleEntry,清除i到下一个null slot之间的过期entry
  15. //并返回 null slot的索引
  16. i = expungeStaleEntry(i);
  17. }
  18. } while ( (n >>>= 1) != 0);//如果logN次都没有扫描到过期的就结束扫描
  19. //当null slot后连续logN的索引处至少有一个过期entry,那么将清除整个table中的过期entry
  20. return removed;
  21. }

再回到set方法的最后两句代码,当调用cleanSomeSlots没有清除到过期entry,并且table数组元素个数达到阈值时,将调用rehash()

  1. if (!cleanSomeSlots(i, sz) && sz >= threshold)
  2. rehash();

rehash()

  1. private void rehash() {
  2. //清除所有过期entry
  3. expungeStaleEntries();
  4. // Use lower threshold for doubling to avoid hysteresis
  5. //使用较低的加倍阈值以避免滞后
  6. //当元素个数大于等于table.length/2,调用resize进行扩容
  7. if (size >= threshold - threshold / 4)
  8. resize();
  9. }

真正清理所有过期entry的方法expungeStaleEntries()

其实真正的清理方法还是expungeStaleEntry,功臣啊

  1. private void expungeStaleEntries() {
  2. Entry[] tab = table;
  3. int len = tab.length;
  4. for (int j = 0; j < len; j++) {
  5. Entry e = tab[j];
  6. if (e != null && e.get() == null)
  7. //是你,是你,还是你
  8. //如果使用返回的索引,可以减少循环的次数,
  9. //因为expungeStaleEntry清理了a "run"的所有stale entry
  10. expungeStaleEntry(j);
  11. }
  12. }

resize()扩容

  1. private void resize() {
  2. Entry[] oldTab = table;
  3. int oldLen = oldTab.length;
  4. //扩容原来的2倍
  5. int newLen = oldLen * 2;
  6. Entry[] newTab = new Entry[newLen];
  7. int count = 0;
  8. //挪动旧table中的entry到新的entry中
  9. for (int j = 0; j < oldLen; ++j) {
  10. Entry e = oldTab[j];
  11. if (e != null) {
  12. ThreadLocal<?> k = e.get();
  13. if (k == null) {
  14. e.value = null; // Help the GC
  15. } else {
  16. //重新hash定位,存储
  17. int h = k.threadLocalHashCode & (newLen - 1);
  18. while (newTab[h] != null)
  19. h = nextIndex(h, newLen);
  20. newTab[h] = e;
  21. count++;
  22. }
  23. }
  24. }
  25. setThreshold(newLen);
  26. size = count;
  27. table = newTab;
  28. }

至此ThreadLocal.set(T value)已经分析完毕,下面看另外连个方法get()和remove(),这两个方法就比较简单了

  1. public T get() {
  2. Thread t = Thread.currentThread();
  3. ThreadLocalMap map = getMap(t);
  4. //当获取ThreadLocal变量的值时,如果ThreadLocalMap已经初始化过,则直接从map中获取
  5. if (map != null) {
  6. //使用ThreadLocalMap.getEntry(ThreadLocal t)获取
  7. ThreadLocalMap.Entry e = map.getEntry(this);
  8. if (e != null) {
  9. @SuppressWarnings("unchecked")
  10. T result = (T)e.value;
  11. return result;
  12. }
  13. }
  14. //如果ThreadLocalMap不存在,则初始化map,返回,使用初始化的值,并返回
  15. return setInitialValue();
  16. }

getEntry(ThreadLocal t)

  1. private Entry getEntry(ThreadLocal<?> key) {
  2. int i = key.threadLocalHashCode & (table.length - 1);
  3. Entry e = table[i];
  4. //计算的索引处刚好是当前ThreadLocal(没有碰撞),直接返回
  5. if (e != null && e.get() == key)
  6. return e;
  7. else//有碰撞,则调用getEntryAfterMiss
  8. return getEntryAfterMiss(key, i, e);
  9. }

getEntryAfterMiss()

向后查找直到下一个null slot处,如果都没有找到,返回null,如果过程中有过期的entry,则调用expungeStaleEntry(i)清除

  1. private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
  2. Entry[] tab = table;
  3. int len = tab.length;
  4. while (e != null) {
  5. ThreadLocal<?> k = e.get();
  6. if (k == key)
  7. return e;
  8. if (k == null)
  9. expungeStaleEntry(i);
  10. else
  11. i = nextIndex(i, len);
  12. e = tab[i];
  13. }
  14. return null;
  15. }

remove()方法

这个方法很简单,计算索引,索引处刚好是,则清除,查找区间 索引处至null slot之前

  1. private void remove(ThreadLocal<?> key) {
  2. Entry[] tab = table;
  3. int len = tab.length;
  4. int i = key.threadLocalHashCode & (len-1);
  5. for (Entry e = tab[i];
  6. e != null;
  7. e = tab[i = nextIndex(i, len)]) {
  8. if (e.get() == key) {
  9. //手动清除弱引用,前文中都是弱引用为null时,才调expungeStaleEntry清理
  10. e.clear();
  11. expungeStaleEntry(i);
  12. return;
  13. }
  14. }
  15. }

注:
每一个ThreadLocal实例都有一个自己的hash值,它使用到了一个模数0x61c88647
通过理论与实践,当我们用0x61c88647作为魔数累加为每个ThreadLocal分配各自的ID也就是threadLocalHashCode再与2的幂取模,得到的结果分布很均匀。ThreadLocalMap使用的是线性探测法,均匀分布的好处在于很快就能探测到下一个临近的可用slot,从而保证效率。这就是为什么Entry[]数组大小要为2的幂的问题,为了优化效率。

ThreadLocal源码分析完毕,至此ThreadLocal已经没什么秘密了。