一、线程
1.1 线程简介
现在操作系统在运行一个程序时,会为其创建一个进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。
1.2 线程的状态
Java线程在运行的生命周期中可能处于6中不同的状态。
状态名称 | 说明 |
---|---|
NEW | 初始状态,线程被构建,但是还没有调用start()方法 |
RUNNABLE | 运行状态,Java线程将OS中的就绪和运行两种状态统称为“运行中” |
BLOCKED | 阻塞状态,表示线程阻塞于锁 |
WAITING | 等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其他线程做出一些特定动作(通知或中断) |
TIME_WAITING | 超时等待状态,该状态不同于WAITING,它是可以在指定时间自行返回的 |
TERMINATED | 终止状态,表示当前线程已经执行完毕 |
1.3 创建线程
有三种使用线程的方法:
- 实现 Runnable 接口;
- 实现 Callable 接口;
- 继承 Thread 类。
实现 Runnable 和 Callable 接口的类只能当做一个可以在线程中运行的任务,不是真正意义上的线程,因此最后还需要通过 Thread 来调用。可以理解为任务是通过线程驱动从而执行的。
先来看看Thread类的声明结构:public class Thread implements Runnable
,发现Thread类实现了Runnable接口。
使用继承Thread类的方式创建新线程时,最大的局限时不支持多继承
1.实现Runnable接口
需要实现接口中的 run() 方法。
public class MyRunnable implements Runnable {
@Override
public void run() {
// ...
}
}
使用 Runnable 实例再创建一个 Thread 实例,然后调用 Thread 实例的start()
方法来启动线程。
如果调用代码
thread.run();
而不是thread.start();
就不是异步执行了,而是同步执行,那么此线程对象并不交给线程规划器来进行处理,而是由main主线程来调用run()方法
public static void main(String[] args) {
MyRunnable instance = new MyRunnable();
Thread thread = new Thread(instance);
thread.start();
}
2. 实现Callable接口
与 Runnable 相比,Callable 可以有返回值,返回值通过 FutureTask
进行封装。
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
3. 继承Thread类
同样也是需要实现 run() 方法,因为 Thread 类也实现了 Runable 接口。
当调用 start() 方法启动一个线程时,虚拟机会将该线程放入就绪队列中等待被调度,当一个线程被调度时会执行该线程的 run() 方法。
public class MyThread extends Thread {
public void run() {
// ...
}
}
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
1.4 停止线程
停止一个线程意味着在线程处理完任务之前停止正在做的操作。
停止一个线程可以使用Thread.stop()
方法,但不推荐使用,因为这个方法是不安全的。在大多数情况下,停止一个线程使用Thread.interrupt()
但这个方法不会终止一个正在运行的线程,还需要加入一个判断才可以完成线程的停止。
Java中有3中方法可以使正在运行的线程终止运行:
- 使用退出标志使线程正常退出
- 使用stop()方法强制终止线程,但不推荐使用
- 使用interrupt()方法中断线程
interrupt()
interrupt()方法的使用不是马上停止循环,调用interrupt()方法仅仅是在当前线程中做了一个停止的标记,并不是真正的停止线程。
Thread类中提供了两个判断方法:
public static boolean interrupted()
:测试currentThread()是否已经中断。方法具有清除状态的功能。public boolean this.isInterrupted()
:测试this关键字所在类的对象是否已经中断,未具有清楚状态的功能。1.5 其他线程方法
- suspend()方法暂停线程。
- resume()方法恢复线程的执行。
但是使用不当容易造成公共同步对象被独占,其他线程无法访问公共同步对象。不建议使用的原因主要是:在调用后,线程不会释放已经占有的资源(比如锁),而是占着资源进入睡眠状态,容易引发死锁问题
- yield()方法的作用是放弃当前的CPU资源,让其他任务去占用CPU执行时间,放弃的时间不确定,有可能刚刚放弃,又马上获得时间片。
1.6 守护线程
Java中有两种线程:一种是用户线程,也成为非守护线程;另一种是守护线程。守护线程是一种特殊的线程,当进程中不存在非守护线程了,则守护线程自动销毁。典型的守护线程是垃圾回收线程。在线程启动之前使用 setDaemon() 方法可以将一个线程设置为守护线程。二、线程间的通信
2.1 synchronized关键字
关键字synchronized
可用来保证原子性、可见性和有序性。Java中的每一个对象都可以作为锁,具体表现为以下3种形式:
- 对于普通同步方法,锁的是当前实例对象
- 对于静态同步方法,锁的是当前类的Class对象
- 对于同步方法块,锁的是Synchonized括号里配置的对象。
当一个线程试图访问同步代码块时,必须首先得到锁,退出或抛出异常时必须释放锁。那么锁到底存在哪里呢?锁里面存储什么信息?
- JVM基于进入和退出Monitor对象来实现方法同步和代码块同步。
- 代码块同步是使用monitorenter和monitorexit指令实现的
- 方法同步是使用另一种方式实现的,细节在JVM规范中没有详细说明,但是方法的同步同样可以使用这两个指令实现。
monitorenter
指令是在编译后插入到同步代码块的开始位置,而monitorexit
是插入到方法结束处和异常处,JVM要保证每个monitorenter
必须有对应的monitorexit
与之配对。任何对象都有一个monitor
与之关联,当且一个monitor
被持有后,它处于锁定状态。线程执行到monitorenter
指令时,将会尝试获取对象所对应的monitor
的所有权,即尝试获得对象的锁。
在方法中使用synchronized关键字实现同步的原因是使用了flag标记ACC_SYNCHRONIZED,当调用方法时,调用指令会检查方法的ACC_SYNCHRONIZED访问标志是否设置,如果设置了,执行线程先持有同步锁,然后执行方法,最后在方法完成时释放锁。
2.2 Java对象头
synchronized用的锁是存在Java对象头中的。
如果对象是数组类型,则虚拟机用3个字宽(32位虚拟机中,1字宽=4字节=32bit)存储对象头。
- 如果对象是非数组类型,则用2字宽存储对象头。 | 长度 | 内容 | 说明 | | —- | —- | —- | | 32/64bit | Mark Word | 存储对象的hashCode或锁信息 | | 32/64bit | Class Metadata Address | 存储到对象类型数据的指针 | | 32/64bit | Array length | 数组的长度(如果当前对象是数组) |
Java对象头里的Mark Word里默认存储对象的HashCode、分代年龄和锁标记位。32位JVM的Mark Word的默认存储结构如表
锁状态 | 25bit | 4bit | 1bit是否是偏向锁 | 2bit锁标志位 |
---|---|---|---|---|
无锁状态 | 对象的hashCode | 对象分代年龄 | 0 | 01 |
在运行期间,Mark Word里存储的数据会随着锁标志位的变化而变化。
2.3 volatile关键字
Java 语言提供了一种稍弱的同步机制,即 volatile
变量,用来确保将变量的更新操作通知到其他线程。volatile
变量具备两种特性,volatile 变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取 volatile
类型的变量时总会返回最新写入的值。volatile
是轻量级的synchronized
,它在多处理器开发中保证了共享变量的“可见性”。它不会引起线程上下文的切换和调度。
1. volatile的定义与实现原理
Java语言规范第三版中对
volatile
的定义如下:Java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致地更新,线程应该确保通过排他锁单独获得这个变量。如果一个字段被声明成volatile
,Java线程内存模型确保所有线程看到这个变量的值是一致的。
volatile是如何来保证可见性的呢?
通过JIT编译器生成的汇编指令来查看对volatile来进行写操作时,会多出一个lock前缀的指令。Lock前缀的指令在多核处理器下会引发两件事情:
- 将当前处理器缓存行的数据写回到系统内存。
- 这个写回内存的操作会使在其他CPU里缓存了该内存地址的数据无效。
为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存(L1,L2或其他)后再进行操作,但操作完不知道何时会写到内存。
如果对声明了volatile的变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。并且在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态。
当处理器对这个数据进行修改操作的时候,就会重新从系统内存中把数据读到处理器缓存里。
2. volatile的使用优化
著名的Java并发大师Doug lea在JDK7的并发包里新增了一个队列集合类LinkedTransferQueue
,它在使用volatile
变量时,用一种追加字节的方式来优化队列出队和入队的性能。
// 队列中的头部节点
private transient final PaddedAtomicReference<QNode> head;
// 队列中的尾部节点
private transient final PaddedAtomicReference<QNode> tail;
static final class PaddedAtomicReference<T> extends AtomicReference<T>{
// 使用很多4个字节的引用追加到64字节
Object p0,p1,p2p,p3,p4,p5,p6,p7,p8,p9,pa,pb,pc,pd,pe;
PaddedAtomicReference(T r){
super(r);
}
}
public class AtomicReference<V> implements java.io.Serializable {
private volatile V value;
// ...
}
让我们先来看看LinkedTransferQueue这个类,它使用一个内部类类型来定义队列的头节点(head)和尾节点(tail),而这个内部类PaddedAtomicReference相对于父类AtomicReferenece只做了一件事就是将共享变量追加到64字节(一个对象的引用占4字节,追加了15个变量+父类的value变量)。
为什么追加64字节能够提高并发编程的效率?
因为对于一些厂家的处理器的L1、L2或L3缓存的高速缓存行是64个字节宽,不支持部分填充缓冲行,这意味着,如果队列的头节点和尾节点都不足64字节的话,处理器会将他们都读到同一个高速缓存行中,在多处理器下每个处理器都会缓存同样的头、尾节点,当一个处理器试图修改头节点时,会将整个缓存行锁定,那么在缓存一致性机制的作用下,会导致其他处理器不能访问自己高速缓存中的尾节点,而队列的入队和出队操作需要不停修改头节点和尾节点,所以在多处理器下会严重影响队列的入队和出队效率。
2.4 等待/通知机制
1.wait()
wait()
方法是Object类的方法,它的作用是使当前执行wait()
方法的线程等待,在wait()
所在的代码行出暂停执行,并释放锁,直到接到通知或被中断为止。在调用wait()
之前线程必须获得该对象的对象级别锁。
2. notify() 和 notifyAll()
notify()
方法要在同步方法或同步块中调用,即在调用前,线程必须获得锁。该方法用来通知那些可能等待该锁的其他线程,如果有多个线程等待,则按照执行wait()方法的顺序对处于wait状态的线程发出一次通知,并使该线程重新获取锁。
执行notify()方法后,当前线程不会马上释放锁,呈wait状态的线程也不并不能马上获取该对象锁,要等到执行notify()方法的线程执行完,也就是退出synchronized同步区域后,当前线程才会释放锁,而呈wait状态的线程才能获取该对象锁。
notifyAll()方法执行后,会按照执行wait()方法相反的顺序依次唤醒全部线程。
3. wait(long)
等待某一时间内是否有线程对锁进行notify()通知唤醒,如果超过这个时间则线程自动唤醒,能继续向下运行的前提是再次持有锁。
4. wait()、sleep()、interrupt()
- wait() 是 Object 的方法,而 sleep() 是 Thread 的静态方法;
- wait() 会释放锁,sleep() 不会。
在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放
5. join()
主线程创建并启动子线程,如果子线程要进行大量的耗时计算,主线程往往早于子线程结束之前结束,这时如果主线程想等待子线程执行完之后再结束,就要用到join(),方法join()的作用是等待线程对象销毁。
join()方法的作用是使所属的线程对象x正常执行run()方法中的任务,而使当前线程z进入无限期的阻塞,等待线程x销毁后再继续执行线程z后面的代码。
join()方法具有使线程排队运行的效果,但是与synchronized()的区别是join()方法在内部使用wait()进行等待,而synchronized关键字使用锁作为同步。三、ThreadLocal
ThreadLocal对象可以提供线程局部变量,每个线程Thread拥有一份自己的副本变量,多个线程互不干扰。
3.1 ThreadLocal的数据结构
Thread类有一个类型为ThreadLocal.ThreadLocalMap的实例变量threadLocals,也就是说每个线程有一个自己的ThreadLocalMap。
ThreadLocalMap有自己的独立实现,可以简单地将它的key视作ThreadLocal,value为代码中放入的值(实际上key并不是ThreadLocal本身,而是它的一个弱引用)。
每个线程在往ThreadLocal里放值的时候,都会往自己的ThreadLocalMap里存,读也是以ThreadLocal作为引用,在自己的map里找对应的key,从而实现了线程隔离。
ThreadLocalMap有点类似HashMap的结构,只是HashMap是由数组+链表实现的,而ThreadLocalMap中并没有链表结构。
我们还要注意Entry, 它的key是ThreadLocal<?> k ,继承自WeakReference, 也就是我们常说的弱引用类型。3.2 ThreadLocalMap Hash
既然是Map结构,那么ThreadLocalMap当然也要实现自己的hash算法来解决散列表数组冲突问题。
int i = key.threadLocalHashCode & (len-1);
ThreadLocalMap中hash算法很简单,这里i就是当前 key 在散列表中对应的数组下标位置。
这里最关键的就是threadLocalHashCode值的计算,ThreadLocal中有一个属性为HASH_INCREMENT = 0x61c88647public class ThreadLocal<T> {
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
static class ThreadLocalMap {
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
}
}
每当创建一个ThreadLocal对象,这个ThreadLocal.nextHashCode 这个值就会增长 0x61c88647 。
这个值很特殊,它是斐波那契数 也叫 黄金分割数。hash增量为 这个数字,带来的好处就是 hash 分布非常均匀。
3.3 ThreadLocal.set()刨析
1. 图解
ThreadLocal中的set方法原理如上图所示,很简单,主要是判断ThreadLocalMap是否存在,然后使用ThreadLocal中的set方法进行数据处理。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
往ThreadLocalMap中set数据(新增或者更新数据)分为好几种情况,针对不同的情况我们画图来说说明。
第一种情况: 通过hash计算后的槽位对应的Entry数据为空:这里直接将数据放到该槽位即可。
第二种情况: 槽位数据不为空,key值与当前ThreadLocal通过hash计算获取的key值一致:这里直接更新该槽位的数据。
第三种情况: 槽位数据不为空,往后遍历过程中,在找到Entry为null的槽位之前,没有遇到key过期的Entry:遍历散列数组,线性往后查找,如果找到Entry为null的槽位,则将数据放入该槽位中,或者往后遍历过程中,遇到了key 值相等的数据,直接更新即可。
第四种情况: 槽位数据不为空,往后遍历过程中,在找到Entry为null的槽位之前,遇到key过期的Entry,如下图,往后遍历过程中,移到了index=7的槽位数据Entry的key=null:散列数组下标为 7 位置对应的Entry数据key为null,表明此数据key值已经被垃圾回收掉了,此时就会执行replaceStaleEntry()方法,该方法含义是替换过期数据的逻辑,以index=7位起点开始遍历,进行探测式数据清理工作。
初始化探测式清理过期数据扫描的开始位置:slotToExpunge = staleSlot = 7
以当前staleSlot开始 向前迭代查找,找其他过期的数据,然后更新过期数据起始扫描下标slotToExpunge。for循环迭代,直到碰到Entry为null结束。
如果找到了过期的数据,继续向前迭代,直到遇到Entry=null的槽位才停止迭代,如下图所示,slotToExpunge 被更新为 0:
以当前节点(index=7)向前迭代,检测是否有过期的Entry数据,如果有则更新slotToExpunge值。碰到null则结束探测。以上图为例slotToExpunge被更新为 0。
上面向前迭代的操作是为了更新探测清理过期数据的起始下标slotToExpunge的值,这个值在后面会讲解,它是用来判断当前过期槽位staleSlot之前是否还有过期元素。
接着开始以staleSlot位置(index=7)向后迭代,如果找到了相同 key 值的 Entry 数据:
从当前节点staleSlot向后查找key值相等的Entry元素,找到后更新Entry的值并交换staleSlot元素的位置(staleSlot位置为过期元素),更新Entry数据,然后开始进行过期Entry的清理工作,如下图所示:
向后遍历过程中,如果没有找到相同 key 值的 Entry 数据:
从当前节点staleSlot向后查找key值相等的Entry元素,直到Entry为null则停止寻找。通过上图可知,此时table中没有key值相同的Entry。
创建新的Entry,替换table[stableSlot]位置:
替换完成后也是进行过期元素清理工作,清理工作主要是有两个方法:expungeStaleEntry()和cleanSomeSlots(),具体细节后面会讲到。
2. 源码详解
java.lang.ThreadLocal.ThreadLocalMap.set():
private void set(ThreadLocal<?> key, Object value) {
// 这里会通过key来计算在散列表中的对应位置,
// 然后以当前key对应的桶的位置向后查找,找到可以使用的桶。
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// key相同直接替换值
if (k == key) {
e.value = value;
return;
}
// key为空,执行替换过期数据
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
什么情况下桶才是可以使用的呢?
- k = key 说明是替换操作,可以使用
- 碰到一个过期的桶,执行替换逻辑,占用过期桶
- 查找过程中,碰到桶中Entry=null的情况,直接使用
接着就是执行for循环遍历,向后查找,我们先看下nextIndex()、prevIndex()方法实现:
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
接着看剩下for循环中的逻辑:
- 遍历当前key值对应的桶中Entry数据为空,这说明散列数组这里没有数据冲突,跳出for循环,直接set数据到对应的桶中
- 如果key值对应的桶中Entry数据不为空
2.1 如果k = key,说明当前set操作是一个替换操作,做替换逻辑,直接返回
2.2 如果key = null,说明当前桶位置的Entry是过期数据,执行replaceStaleEntry()方法(核心方法),然后返回
- for循环执行完毕,继续往下执行说明向后迭代的过程中遇到了entry为null的情况
3.1 在Entry为null的桶中创建一个新的Entry对象
3.2 执行++size操作
- 调用cleanSomeSlots()做一次启发式清理工作,清理散列数组中Entry的key过期的数据 4.1 如果清理工作完成后,未清理到任何数据,且size超过了阈值(数组长度的 2/3),进行rehash()操作
4.2 rehash()中会先进行一轮探测式清理,清理过期key,清理完成后如果size >= threshold - threshold / 4,就会执行真正的扩容逻辑。
接着重点看下replaceStaleEntry()方法,replaceStaleEntry()方法提供替换过期数据的功能,我们可以对应上面第四种情况的原理图来再回顾下,具体代码如下:
java.lang.ThreadLocal.ThreadLocalMap.replaceStaleEntry():
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// 探测式清理过期数据的开始下标
int slotToExpunge = staleSlot;
// 向前迭代查找,找到没有过期的数据,for循环一直碰到Entry为null才会结束
for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
//找到了过期数据,更新探测清理过期数据的开始下标为 i
if (e.get() == null)
slotToExpunge = i;
// 从staleSlot向后查找,也是碰到Entry为null的桶结束
for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 替换逻辑,替换新数据并且交换当前staleSlot位置
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 说明replaceStaleEntry()一开始向前查找过期数据时并未找到过期的Entry数据,
// 接着向后查找过程中也未发现过期数据,修改开始探测式清理过期数据的下标为当前循环的index
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 最后进行启发式过期数据清理
/**
* cleanSomeSlots()和expungeStaleEntry()方法.这两个是和清理相关的方法.
* 一个是过期key相关Entry的启发式清理(Heuristically scan),
* 另一个是过期key相关Entry的探测式清理。
*/
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 说明当前遍历的Entry是一个过期数据并且一开始的向前查找数据并未找到过期的Entry
if (k == null && slotToExpunge == staleSlot)
// 更新slotToExpunge 为当前位置,这个前提是前驱节点扫描时未发现过期数据。
slotToExpunge = i;
}
//结束当前的迭代操作。此时说明这里是一个添加的逻辑,
//将新的数据添加到table[staleSlot] 对应的slot中。
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// 最后判断除了staleSlot以外,还发现了其他过期的slot数据,就要开启清理数据的逻辑:
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
3.4 ThreadLocal.get() 刨析
1. 图解
第一种情况: 通过查找key值计算出散列表中slot位置,然后该slot位置中的Entry.key和查找的key一致,则直接返回:
第二种情况: slot位置中的Entry.key和要查找的key不一致:
我们以get(ThreadLocal1)为例,通过hash计算后,正确的slot位置应该是 4,而index=4的槽位已经有了数据,且key值不等于ThreadLocal1,所以需要继续往后迭代查找。
迭代到index=5的数据时,此时Entry.key=null,触发一次探测式数据回收操作,执行expungeStaleEntry()方法,执行完后,index 5,8的数据都会被回收,而index 6,7的数据都会前移,此时继续往后迭代,到index = 6的时候即找到了key值相等的Entry数据,如下图所示:
2.源码详解
java.lang.ThreadLocal.ThreadLocalMap.getEntry():
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 正好存放的key和要查找的key相同直接返回
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
// 过期数据
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
3.5 ThreadLocalMap过期key的探测式清理流程——expungeStaleEntry
上面我们有提及ThreadLocalMap的两种过期key数据清理方式:探测式清理和启发式清理。
我们先讲下探测式清理,也就是expungeStaleEntry()方法:
遍历散列数组,从开始位置向后探测清理过期数据,将过期数据的Entry设置为null,沿途中碰到未过期的数据则将此数据rehash后重新在table数组中定位,如果定位的位置已经有了数据,则会将未过期的数据放到最靠近此位置的Entry=null的桶中,使rehash后的Entry数据距离正确的桶的位置更近一些。操作逻辑如下:
如上图,set(27) 经过 hash 计算后应该落到index=4的桶中,由于index=4桶已经有了数据,所以往后迭代最终数据放入到index=7的桶中,放入后一段时间后index=5中的Entry数据key变为了null
如果再有其他数据set到map中,就会触发探测式清理操作。
如上图,执行探测式清理后,index=5的数据被清理掉,继续往后迭代,到index=7的元素时,经过rehash后发现该元素正确的index=4,而此位置已经已经有了数据,往后查找离index=4最近的Entry=null的节点(刚被探测式清理掉的数据:index=5),找到后移动index= 7的数据到index=5中,此时桶的位置离正确的位置index=4更近了。
经过一轮探测式清理后,key过期的数据会被清理掉,没过期的数据经过rehash重定位后所处的桶位置理论上更接近i= key.hashCode & (tab.len - 1)的位置。这种优化会提高整个散列表查询性能。
接着看下expungeStaleEntry()具体流程,我们还是以先原理图后源码讲解的方式来一步步梳理:
我们假设expungeStaleEntry(3) 来调用此方法,如上图所示,我们可以看到ThreadLocalMap中table的数据情况,接着执行清理操作:
第一步是清空当前staleSlot位置的数据,index=3位置的Entry变成了null。然后接着往后探测:
执行完第二步后,index=4 的元素挪到 index=3 的槽位中。
继续往后迭代检查,碰到正常数据,计算该数据位置是否偏移,如果被偏移,则重新计算slot位置,目的是让正常数据尽可能存放在正确位置或离正确位置更近的位置
在往后迭代的过程中碰到空的槽位,终止探测,这样一轮探测式清理工作就完成了,接着我们继续看看具体实现源代码:
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 将槽位的数据清空,并且设置size--
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
Entry e;
int i;
// 以staleSlot的位置向后迭代
for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 遇到过期数据,则清空该槽位的数据,并设置size--
if (k == null) {
e.value = null;
tab[i] = null;
size--;
}
// key没有过期。
else {
//重新计算当前key是否产生了hash冲突。
int h = k.threadLocalHashCode & (len - 1);
// 如果不等则说明产生hash冲突
if (h != i) {
//此时以新计算出来正确的槽位位置往后迭代,找到最近一个可以存放entry的位置。
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
这里我们还是以staleSlot=3 来做示例说明,首先是将tab[staleSlot]槽位的数据清空,然后设置size— 接着以staleSlot位置往后迭代,如果遇到k==null的过期数据,也是清空该槽位数据,然后size—
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
}
如果key没有过期,重新计算当前key的下标位置是不是当前槽位下标位置,如果不是,那么说明产生了hash冲突,此时以新计算出来正确的槽位位置往后迭代,找到最近一个可以存放entry的位置。
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
这里是处理正常的产生Hash冲突的数据,经过迭代后,有过Hash冲突数据的Entry位置会更靠近正确位置,这样的话,查询的时候 效率才会更高。
3.6 ThreadLocalMap过期key的启发式清理流程——cleanSomeSlots
探测式清理是以当前Entry 往后清理,遇到值为null则结束清理,属于线性探测清理。
而启发式清理被作者定义为:Heuristically scan some cells looking for stale entries.
具体代码如下:
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
3.7 ThreadLocalMap扩容机制
在ThreadLocalMap.set()方法的最后,如果执行完启发式清理工作后,未清理到任何数据,且当前散列数组中Entry的数量已经达到了列表的扩容阈值(len*2/3),就开始执行rehash()逻辑:
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
接着看下rehash()具体实现:
private void rehash() {
// 进行探测式清理工作
expungeStaleEntries();
// 判断是否需要扩容
if (size >= threshold - threshold / 4)
resize();
}
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
清理完成之后,table中可能有一些key为null的Entry数据被清理掉,所以此时通过判断size >= threshold - threshold / 4 也就是size >= threshold 3/4 来决定是否扩容。
我们还记得上面进行rehash()的阈值是size >= threshold,所以当面试官套路我们ThreadLocalMap扩容机制的时候 我们一定要说清楚这两个步骤:
接着看看具体的resize()方法,为了方便演示,我们以oldTab.len=8来举例:
扩容后的tab的大小为oldLen 2,然后遍历老的散列表,重新计算hash位置,然后放到新的tab数组中,如果出现hash冲突则往后寻找最近的entry为null的槽位,遍历完成之后,oldTab中所有的entry数据都已经放入到新的tab中了。重新计算tab下次扩容的阈值,具体代码如下:
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 扩容后tab的大小为之前的大小的两倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
// 遍历老的散列表。
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
}
else {
// 重新计算hash位置,然后放到新的tab数组中
int h = k.threadLocalHashCode & (newLen - 1);
// 如果出现hash冲突则往后寻找最近的entry为null的槽位
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
//重新计算tab下次扩容的阈值
setThreshold(newLen);
size = count;
table = newTab;
}
3.8 InheritableThreadLocal
我们使用ThreadLocal的时候,在异步场景下是无法给子线程共享父线程中创建的线程副本数据的。
为了解决这个问题,JDK 中还有一个InheritableThreadLocal类。
InheritableThreadLocal
类中的源代码中存在3个方法,这3个方法都是对父类ThreadLocal中的同名方法进行重写,但在源代码中没有使用@override进行标识。public calss InheritableThreadLocal<T> extends ThreadLocal<T> {
protected T childValue(T parentValue){
return parentValue;
}
ThreadLocalMap getMap(Thread t){
return t.inheritableThreadLocals;
}
void createMap(Thread t,T firstValue){
t.inheritableThreadLocals=new ThreadLocalMap(this, firstValue);
}
}
在main()方法中使用main线程执行
InheritableThreadLocal.set()
方法,其实就是调用TreadLocal类中的set()方法。接着看一下ThreadLocal类中的set()方法,执行set()方法时,有两个方法已经InheritableThreadLocal
类重写了,分别是getMap(t)
和createMap(t,value)
。public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
所以在执行这两个方法时,调用的是
InheritableThreadLocal
中的。public class InheritableThreadLocal<T> extends ThreadLocal<T>{
ThreadLocalMap getMap(Thread t){
return t.inheritableThreadLocals;
}
void createMap(Thread t,T firstValue){
t.inheritableThreadLocals=new ThreadLocalMap(this,firstValue);
}
}
通过查看
InheritableThreadLocal
类中的方法后发现,不再向Thread类中的ThreadLocal.ThreadLocalMap threadLocals
存入数据了,而是向ThreadLocal.ThreadLocalMap inheritableThreadLocals
存入数据。那么子线程如何实现从父线程中的
inheritableThreadLocals
对象继承值呢?思路就是在创建子线程ThreadA时,子线程主动引用父线程main中的inheritableThreadLocals
对象值。public class Thread implements Runnable{
private void init(ThreadGroup g,Runnable target,String name,
long stackSize,AccessControlContext acc,
boolean inheritableThreadLocals){
if(inheritableThreadLocals && parent.inheritableThreadLocals!=null){
this.inheritableThreadLocals=ThreadLocal
.createInheritedMap(parent.inheritableThreadLocals);
......
}
}
}
方法
init(ThreadGroup g,Runnable target,String name, long stackSize,AccessControlContext acc, boolean inheritableThreadLocals)
是被Thread的构造方法调用的,所以在new ThreadA()时,Thread类会自动调用init(ThreadGroup g,Runnable target,String name, long stackSize,AccessControlContext acc, boolean inheritableThreadLocals)
。在本方法中最后一个参数inheritableThreadLocals代表当前线程对象是否会从父进程继承值,而且这个值永远传入true。this.inheritableThreadLocals=ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
中的this就是当前ThreadA类的对象,执行createInheritedMap()
目的是创建一个新的ThreadLocalMap对象,然后将新的ThreadLocalMap对象赋值给ThreadA中的inheritableThreadLocals变量static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap){
return new ThreadLocalMap(parentMap);
}
在构造方法的代码中可以发现,子线程将父线程中的table对象以复制的方式赋值给子线程的table数组,这个过程是在创建Thread类对象时发生的,也就说明当子线程对象创建完毕后,子线程中的数据就是主线程中旧的数据,主线程使用新的数据时,子线程还是使用旧的数据,因为主子线程使用两个Entry[]对象数组各自存储自己的值。
private ThreadLocalMap(ThreadLocalMap parentMap){
Entry[] parentTable=parentMap.table;
int len=parentTable.length;
setThreshold(len);
//新建Entry[] 数组
table=new Entry[len];
for(int j=0;j<len;j++){
Entry e=parentTable[j];
if(e!=null){
@SuppressWarnings("unchecked")
ThreadLocal<Object> key=(ThreadLocal<Object>) e.get();
if(key!=null){
Object value=key.childValue(e.value);
// 实例化新的Entry对象
Entry c=new Entry(key,value);
int h=key.threadLocalHashCode & (len-1);
while(table[h]!=null){
h=nextIndex(h,len);
}
// 将父线程中的数据复制到新数组中
table[h]=c;
size++;
}
}
}
}