创建时间:
2020-7-17 01:30:59更新时间:2020-7-17 02:03:23
多线程与一致性
为了提高我们程序的性能,很多时候我们都会使用多线程以解决各种场景,但随之而来的是多线程带来的数据一致性问题该如何解决。
如何解决一致性问题?
- 排队:如果多个线程操作‘同一份数据’,那就排个队吧,一个一个来,这样后面一个线程总能得到最新的修改值,例如操作系统中的锁,管程,屏障等都是这种排队机制。缺点是:性能低。
- 投票:投票的机制就是多个人同时决策一件事,这个就涉及到了算法,往往会产生很多其他问题,比如欺诈
- 避免:直观意思就是避免多个线程之间产生一致性问题,那该如何去做呢?例如git,ThreadLocal正是采用的这种避免的方式来完成多线程的执行
ThreadLocal定义:
定义:ThreadLocal提供了线程局部变量,一个线程局部变量在多个线程中分别由独立的值(副本)。
提问:既然是每个线程独有的,为什么不直接在调用线程的时候,在相应的线程方法里声明这个局部变量呢?
答:同一个线程可能会调用到很多不同的类和方法,这样就要在不同的地方用到这个变量,自己去实现,代价太大,用ThreadLocal更加方便,且线程安全。
线程模型

对应每个线程来说都有自己的独占数据,这些数据是进程来分配的,每个线程都有一个ThreadLocalMap对象,它本身是一个hash表,里面会放一些线程的局部变量,而ThreadLocal的核心也是这个ThreadLocalMap。
4种核心应用场景
1.资源持有:
例如有三个不同的类,在一次web请求中调用这三个类,但是用户是一个,那么用户数据就可以保存在一个线程里。如图:

2.线程一致:
例如JDBC事务,我们每次对数据库操作都会走getConnection,jdbc保证只要你是同一个线程过来的请求,不管是哪一个part,都返回的是同一个连接,就是使用ThreadLocal来做的,达到维护一致性的目的。Mybatis使用SqlSessionManager保证了我们同一个线程取出来的连接总是同一个。它是如何做到的呢?其实很简单,就是内部使用了一个ThreadLocal。

3.线程安全:
如果一个线程的调用链路比较长,中间出现异常,那我们可以把出错信息放在ThreadLocal里,然后在后续的链路中使用这个值,可以达到多线程在处理这个场景的时候保证线程安全。

4.并发计算:
例如一个大的任务,拆分成多个小任务,分别计算,最后再进行结果汇总,那么我们可以把每个线程的计算结果放进ThreadLocal中,最后进行汇总计算。实现案例:比如需要统计一段时间内某个接口的调用量

线程不安全实现:
@RestController@RequestMapping("orders")public class OrderController {private Integer count = 0;@GetMapping("/visit")public Integer visit() throws InterruptedException {count++;Thread.sleep(100);return 0;}@GetMapping("/stat")public Integer stat() {return count;}}
count++操作,首先我们是从内存里面读取原来的值,放在了线程本地内存里。然后进行 +1 操作,再写回到内存里。这个时候如果多个线程操作的话,有可能线程A这边还没来得及写,线程B那边读取的是原来的值。这样子的话就会造成数据不一致的问题。结果就会比预期的小。结果明显是count的值与我们所期望的值不一致
如何解决?
当然方法很多,比如加锁,但今天我们要用ThreadLocal实现
@RestController@RequestMapping("orders")public class OrderController {private static final ThreadLocal<Integer> TL = ThreadLocal.withInitial(() -> 0);@GetMapping("/visit")public Integer visit() throws InterruptedException {Thread.sleep(100);TL.set(TL.get() + 1);return 0;}@GetMapping("/stat")public Integer stat() {return TL.get();}}
这样即可达到我们的计数目的。
还有很多方法可以实现,比如我们经常用的原子类Automatic或者synchronized等,他们的实现思想不同,加锁和原子类使用的是【排队】思想,而ThreadLocal使用的是【避免】思想,效率更高。
ThreadLocal分析
ThreadLocal类保证了线程内部的变量在多线程环境下相对于其他线程是不可见的。
ThreadLocal数据结构

上述图片为threadLocal的数据结构,每一个线程都维护一个threadLocalMap,key为线程中子线程构造的threadLocal。线程中对threadLocal的set、get、remove操作其实就是对threadLocalMap的操作。
ThreadLocal内部属性
//hash值,底层调用了nextHashCode方法,即每次新增一个threadLocal//就会使原来的hash值加上HASH_INCREMENTprivate final int threadLocalHashCode = nextHashCode();//原子操作private static AtomicInteger nextHashCode =new AtomicInteger();//这是一个黄金分割值,让一个原子类不断加这个数,目的是减少hash冲突private static final int HASH_INCREMENT = 0x61c88647;//cas 当前hash+0x61c88647private static int nextHashCode() {return nextHashCode.getAndAdd(HASH_INCREMENT);}
内部属性和我们认知中的HashMap还是有所不同的,hashMap中key的hash值的确定是根据key的高低位进行与运算求出来的,而ThreadLocaMap中的hash值是不断累加
HASH_INCREMENT这个数而求得hash值。主要原因还是为了解决Hash冲突,而threadLocalMap中采用的方法是数组探测法。
ThreadLocal常用方法
get方法
//获取当前线程的值public T get() {//获取当前线程Thread t = Thread.currentThread();//获取threadLocalMap对象ThreadLocalMap map = getMap(t);//map不为空if (map != null) {//去map中取出存储信息ThreadLocalMap.Entry e = map.getEntry(this);if (e != null) {//线程存放在map中的值T result = (T) e.value;return result;}}//map为空,表明,当前线程没有threadMap对象,需要初始化return setInitialValue();}//初始化线程的threadLocalMapprivate T setInitialValue() {//获取初始化的值T value = initialValue();Thread t = Thread.currentThread();//再次获取mapThreadLocalMap map = getMap(t);if (map != null)//map存在,则设置entry key-threadLocal对象 value-值map.set(this, value); else//否则创建map,当前线程entry为第一个结点createMap(t, value);//返回设置的值return value;}
set方法
//设置值public void set(T value) {//当前线程Thread t = Thread.currentThread();//获取mapThreadLocalMap map = getMap(t);if (map != null)//map如果存在,则直接setmap.set(this, value); else//map不存在,则直接创建,当前线程entry为第一个结点createMap(t, value);}
remove方法
//移除操作public void remove() {//获取当前线程的threadLocalMapThreadLocalMap m = getMap(Thread.currentThread());if (m != null)//移除m.remove(this);}
上述方法的过程都比较简单,但是对threadLocal的操作实际上是在对threadLocalMap进行操作,所以我们需要搞懂ThreadLocalMap即可。
ThreadLocalMap内部类
//Entry为弱引用static class Entry extends WeakReference<java.lang.ThreadLocal<?>> {//value值Object value;//key为threadLocalEntry(java.lang.ThreadLocal<?> k, Object v) {super(k);value = v;}}
可以看出threadLocalMap中的entry为弱引用,即当内存空间不足发生gc时,会把弱引用回收掉❗。
//初始容量private static final int INITIAL_CAPACITY = 16;//table数组private Entry[] table;//元素个数private int size = 0;//扩容临界值private int threshold;//临界值 = table长度 * 2/3private void setThreshold(int len) {threshold = len * 2 / 3;}//下一个数组下标private static int nextIndex(int i, int len) {return ((i + 1 < len) ? i + 1 : 0);}//上一个数组下标private static int prevIndex(int i, int len) {return ((i - 1 >= 0) ? i - 1 : len - 1);}
ThreadLocalMap构造函数
//构造函数 指定key和valueThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {//使用默认初始容量16table = new Entry[INITIAL_CAPACITY];//第一个key为0x61c88647 & 15int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);table[i] = new Entry(firstKey, firstValue);size = 1;//设置临界值setThreshold(INITIAL_CAPACITY);}
指定key和value的构造函数主要被用于threadLocal的set方法中。该构造函数主要做了这些事情,构建了一个默认初始容量16的数组,根据key的hash值和数组下标最大值进行与运算求出数组下标(因为刚构造出来,所以这里的hash值肯定为0,后续如果有新的key那么hash值为0+0x61c88647),将key和value构建成entry放到数组第一个结点中,最后设置size值和扩容临界值。
//指定map的构造函数private ThreadLocalMap(ThreadLocalMap parentMap) {Entry[] parentTable = parentMap.table;int len = parentTable.length;//根据指定的数组大小 设置临界值setThreshold(len);//构建一个新的maptable = new Entry[len];//循环放入值到map中for (int j = 0; j < len; j++) {//指定map中的entryEntry e = parentTable[j];if (e != null) {//获取keyjava.lang.ThreadLocal<Object> key = (java.lang.ThreadLocal<Object>) e.get();if (key != null) {//计算子线程的valueObject value = key.childValue(e.value);//根据指定entry中的数据构建新的entryEntry c = new Entry(key, value);//根据hash值和数组下标最大值 求出下标int h = key.threadLocalHashCode & (len - 1);//如果当前table中存在了entry,则放到table[h+1]中while (table[h] != null)h = nextIndex(h, len);table[h] = c;size++;}}}}
上述为指定map的构造函数,主要过程为需要依次将指定map中的元素放入到新构建出来的数组中。放置的顺序为数组形式放置。
ThreadLocal的get方法底层实现原理
//获取当前线程的值public T get() {//获取当前线程Thread t = Thread.currentThread();//获取threadLocalMap对象ThreadLocalMap map = getMap(t);//map不为空if (map != null) {//去map中取出存储信息ThreadLocalMap.Entry e = map.getEntry(this);if (e != null) {//线程存放在map中的值T result = (T) e.value;return result;}}//map为空,表明,当前线程没有threadMap对象,需要初始化return setInitialValue();}//根据key获取entryprivate Entry getEntry(java.lang.ThreadLocal<?> key) {//求出下标int i = key.threadLocalHashCode & (table.length - 1);Entry e = table[i];//key存在 则直接返回if (e != null && e.get() == key)return e; else//不存在,继续查找return getEntryAfterMiss(key, i, e);}private Entry getEntryAfterMiss(java.lang.ThreadLocal<?> key, int i, Entry e) {Entry[] tab = table;int len = tab.length;while (e != null) {//获取entry 的keyjava.lang.ThreadLocal<?> k = e.get();//key一致,直接返回if (k == key)return e;//key为空,处理过期的数据,因为弱引用,需要删除已经为null的引用if (k == null)expungeStaleEntry(i); else//获取下一个数组下标i = nextIndex(i, len);//下一个数组下标对应的entrye = tab[i];}return null;}//删除对应位置过期数据private int expungeStaleEntry(int staleSlot) {Entry[] tab = table;int len = tab.length;//删除指定位置上的数据tab[staleSlot].value = null;tab[staleSlot] = null;size--;Entry e;int i;//循环当前位置——>数组最大位置for (i = nextIndex(staleSlot, len);(e = tab[i]) != null;i = nextIndex(i, len)) {java.lang.ThreadLocal<?> k = e.get();//若当前位置引用为空,则全部置为nullif (k == null) {e.value = null;tab[i] = null;size--;} else {//计算 下标int h = k.threadLocalHashCode & (len - 1);if (h != i) {tab[i] = null;//根据重新计算的下标继续遍历到最后//因为还可能存在多个过期的实体while (tab[h] != null)h = nextIndex(h, len);tab[h] = e;}}}return i;}//初始化线程的threadLocalMapprivate T setInitialValue() {//获取初始化的值T value = initialValue();Thread t = Thread.currentThread();//再次获取mapThreadLocalMap map = getMap(t);if (map != null)//map存在,则设置entry key-threadLocal对象 value-值map.set(this, value); else//否则创建map,当前线程entry为第一个结点createMap(t, value);//返回设置的值return value;}
get方法主要为以下几个步骤: (1)获取当前线程的ThreadId,根据threadId获取threadLocalMap。 (2)threadLocalMap不存在,则进行初始化操作,即创建threadLocalMap。 (3)threadLocalMap若存在,则进行查询entry操作。 (4)求出数组下标,获取数组下标对应的entry,若entry不为空并且key为当前的threadLocal对象,则为需要查询的数据,直接返回。 (5)若不存在,则还需要继续往下一个数组中进行查找。 (6)遍历数组,如果查询到key并且相等,则直接返回。 (7)否则需要判断当前遍历到的key是否为null,如果为空,说明该弱引用被回收,需要删除已经为null的引用。 (8)key不为空,进行下一次循环查找。 (9)直到循环结束或者查找到对应key则返回。 (10)这里的清数据操作会清楚所有key被回收掉了的数据。
ThreadLocal的set方法底层实现原理
public void set(T value) {//当前线程Thread t = Thread.currentThread();//获取mapThreadLocalMap map = getMap(t);if (map != null)//map如果存在,则直接setmap.set(this, value); else//map不存在,则直接创建,当前线程entry为第一个结点createMap(t, value);}private void set(ThreadLocal<?> key, Object value) {Entry[] tab = table;int len = tab.length;//计算下标int i = key.threadLocalHashCode & (len - 1);//从下标对应的数组循环for (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {java.lang.ThreadLocal<?> k = e.get();//如果存在当前的key,则直接返回if (k == key) {e.value = value;return;}//如果当前位置无keyif (k == null) {//存放新值replaceStaleEntry(key, value, i);return;}}//map中 当位置key不相同并且不为null 直接放置到当前位置中tab[i] = new Entry(key, value);int sz = ++size;//清理数据if (!cleanSomeSlots(i, sz) && sz >= threshold)//扩容 - 如果没有要清理的数据并且容量超过了临界值rehash();}private void replaceStaleEntry(java.lang.ThreadLocal<?> key, Object value,int staleSlot) {Entry[] tab = table;int len = tab.length;Entry e;//往前找 找到第一个已经被清理的下标int slotToExpunge = staleSlot;for (int i = prevIndex(staleSlot, len);(e = tab[i]) != null;i = prevIndex(i, len))if (e.get() == null)slotToExpunge = i;//往后找进行遍历for (int i = nextIndex(staleSlot, len);(e = tab[i]) != null;i = nextIndex(i, len)) {java.lang.ThreadLocal<?> k = e.get();//如果当前遍历的key是要插入的keyif (k == key) {//进行替换e.value = value;tab[i] = tab[staleSlot];tab[staleSlot] = e;if (slotToExpunge == staleSlot)slotToExpunge = i;//清理过期的数据cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);return;}//如果没有往后找到过期实例if (k == null && slotToExpunge == staleSlot)slotToExpunge = i;}//key没有找到,设置新的entry实例tab[staleSlot].value = null;tab[staleSlot] = new Entry(key, value);//清除过期实例if (slotToExpunge != staleSlot)cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);}
set方法主要为以下几个步骤: (1)获取当前线程threadId,获取threadLocalMap,如果map不存在,则创建map。map中的entry就为需要set的值。 (2)map如果存在调用threadLocalMap的set方法。 (3)在set方法中会先根据hash值和数组大小计算下标。 (4)从下标开始进行循环,如果循环过程中存在当前的key则直接返回。 (5)如果map数组下标中对应的key是null,则调用replaceStaleEntry存放新值。 (6)replaceStaleEntry中的操作,主要为清除过期数据和设置entry值。 (7)如果循环查找不存在key,并且位置不为null,则直接将需要set的值放入到计算的下标中。 (8)清理过去数据,判断是否需要扩容。 (9)扩容操作。
扩容操作如下:
//扩容 1、还会删除过期entry实例 2、进行扩容private void rehash() {expungeStaleEntries();//当前容量 >= 阈值的3/4if (size >= threshold - threshold / 4)resize();}private void resize() {Entry[] oldTab = table;int oldLen = oldTab.length;//新容量为原来的2倍int newLen = oldLen * 2;//构建新的数组Entry[] newTab = new Entry[newLen];int count = 0;//对老的数组进行遍历 、 复制到新数组中for (int j = 0; j < oldLen; ++j) {Entry e = oldTab[j];if (e != null) {java.lang.ThreadLocal<?> k = e.get();if (k == null) {//如果key为null,value也置为null,帮助gce.value = null;} else {//重新计算hash值int h = k.threadLocalHashCode & (newLen - 1);//放在数组中下一个空闲位置while (newTab[h] != null)h = nextIndex(h, newLen);newTab[h] = e;count++;}}}//重新设置临界值setThreshold(newLen);size = count;table = newTab;}
ThreadLocal的remove方法底层实现原理
//移除操作public void remove() {//获取当前线程的threadLocalMapThreadLocalMap m = getMap(Thread.currentThread());if (m != null)//移除m.remove(this);}//threadLocalMap - 根据key移除private void remove(java.lang.ThreadLocal<?> key) {//map中的数组Entry[] tab = table;int len = tab.length;int i = key.threadLocalHashCode & (len - 1);for (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {if (e.get() == key) {e.clear();//删除对应位置过期数据expungeStaleEntry(i);return;}}}
remove操作比较简单,主要分为以下几个步骤: (1)根据key的hash值和数组下标计算key存放的下标。 (2)循环数组,如果key存在数组中则进行清除操作。
二、ThreadLocal总结
ThreadLocal类内部维护了一个threadLocalMap,该map和hashMap一样可以进行简单的set、get、remove、扩容等基本操作,不过hashMap对于hash冲突采用的是拉链法+红黑树,而threadLocalMap中采用的是线性探测法。并且threadLocalMap中的key为子线程构造出来的threadLocal对象,是一个弱引用,因此会在一定时机被gc回收,因此在对threadLocal类进行操作的时候内部会有清楚过期entry的操作。
ThreadLocal内部的引用关系
内部中存在thread—>threadLocalMap—>entry—>threadLocal强引用关系,而entry中的key是弱引用,因此即使发生了垃圾回收,key会被置为null,但是entry中存在强引用关系,无法被回收。久而久之容易造成内存泄漏。
虽然在set、get操作时会进行清理过期数据的操作,但是尽量还是保证在使用threadLocal类后进行remove操作,减少内存泄漏的风险 —— 及时清理不必要的值。
ThreadLocal为什么会内存泄漏

当把threadlocal变量置为null以后,没有任何强引用指向threadlocal实例,所以threadlocal将会被gc回收。这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话,这些key为null的Entry的value就会一直存在一条强引用链:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value,而这块value永远不会被访问到了,所以存在着内存泄露。
只有当前thread结束以后,current thread就不会存在栈中,强引用断开,Current Thread、Map value将全部被GC回收。最好的做法是不在需要使用ThreadLocal变量后,都调用它的remove()方法,清除数据。
ThreadLocalMap使用ThreadLocal的弱引用作为key,如果一个ThreadLocal没有外部强引用来引用它,那么系统 GC 的时候,这个ThreadLocal势必会被回收,这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话,这些key为null的Entry的value就会一直存在一条强引用链:Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value永远无法回收,造成内存泄漏。
其实,ThreadLocalMap的设计中已经考虑到这种情况,也加上了一些防护措施:在ThreadLocal的get(),set(),remove()的时候都会清除线程ThreadLocalMap里所有key为null的value。
但是这些被动的预防措施并不能保证不会内存泄漏:
使用static的ThreadLocal,延长了ThreadLocal的生命周期,可能导致的内存泄漏。
分配使用了ThreadLocal又不再调用get(),set(),remove()方法,那么就会导致内存泄漏。
为什么使用弱引用
从表面上看内存泄漏的根源在于使用了弱引用。网上的文章大多着重分析ThreadLocal使用了弱引用会导致内存泄漏,但是另一个问题也同样值得思考:为什么使用弱引用而不是强引用?
我们先来看看官方文档的说法:
To help deal with very large and long-lived usages, the hash table entries use WeakReferences for keys.
为了应对非常大和长时间的用途,哈希表使用弱引用的 key。
两种情况分析:
key 使用强引用:引用的ThreadLocal的对象被回收了,但是ThreadLocalMap还持有ThreadLocal的强引用,如果没有手动删除,ThreadLocal不会被回收,导致Entry内存泄漏。
key 使用弱引用:引用的ThreadLocal的对象被回收了,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收。value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。
比较两种情况,我们可以发现:由于ThreadLocalMap的生命周期跟Thread一样长,如果都没有手动删除对应key,都会导致内存泄漏,但是使用弱引用可以多一层保障:弱引用ThreadLocal不会内存泄漏,对应的value在下一次ThreadLocalMap调用set,get,remove的时候会被清除。
因此,ThreadLocal内存泄漏的根源是:由于ThreadLocalMap的生命周期跟Thread一样长,如果没有手动删除对应key就会导致内存泄漏,而不是因为弱引用。
