ConcurrentHashMap 是一个常用的高并发容器类,也是一种线程安全的哈希表。Java 7 以及之前版本中的 ConcurrentHashMap 使用 Segment(分段锁)技术将数据分成一段一段存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。Java 8 对其内部的存储结构进行了优化,使之在性能上有进一步的提升。ConcurrentHashMap 和同步容器 HashTable 的主要区别在锁的类型和粒度上:HashTable 实现同步是利用 synchronized 关键字进行锁定的,其实是针对整张哈希表进行锁定的,即每次锁住整张表让线程独占,虽然解决了线程安全问题,但是造成了巨大的资源浪费。
HashMap 和 HashTable 的问题
首先,HashMap 是非线程安全的。在 JDK1.7 中,采取的是头插法,先插入链表的头部,那么在扩容的时候,容易引起死循环,具体见 HashMap 为什么线程不安全。在 JDK 1.8 中修改为尾插法,解决了这个问题。但是由于没有加锁,故而多线程下也不能保证所取到的就是上一步保存的数据(线程 A 先保存数据,线程 B 后取数据,有可能线程 B 先调用到)。
HashTable 是线程安全的,其实现是在方法上加上 synchronized 关键字,锁的对象是整张 hash 表,当一个线程访问其中一个方法时,另一个线程访问其他的方法都会进入阻塞或轮询状态,导致效率低下。
HashMap 和 HashTable 的实现原理是一样的,只是有一下两个区别:
- HashTable不允许key和value为null
public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}
// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}
addEntry(hash, key, value, index);
return null;
}
可见 HashTable 没对 key 和 value 做判空,直接抛出异常,因此两者都不能为 null。
- HashTable 使用 synchronized 来保证线程安全,包含
get()/put()
在内的所有相关需要进行同步执行的方法都加上了 synchronized 关键字,对这个 Hash 表进行锁定
JDK 1.7 中 ConcurrentHashMap 的结构
JDK 1.7 的 ConcurrentHashMap 的锁机制基于粒度更小的分段锁,分段锁也是提升多并发程序性能的重要手段之一,和 LongAdder 一样,属于热点分散型的削峰手段。
分段锁其实是一种锁的设计,并不是具体的一种锁,对于 ConcurrentHashMap 而言,分段锁技术将 Key 分成一个一个小 Segment 存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。
JDK 1.7 中的 ConcurrentHashMap 采用了 Segment 分段锁的方式实现。一个 ConcurrentHashMap 中包含一个 Segment 数组,一个 Segment 中包含一个 HashEntry 数组,每个元素是一个链表结构(一个 Hash 表的桶)。
JDK 1.7 中 ConcurrentHashMap 的实现原理
ConcurrentHashMap 类的核心源码如下:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
/**
* 默认初始容量
*/
static final int DEFAULT_INITIAL_CAPACITY = 16;
/**
* 默认扩容因子
*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**
* 哈希表的默认并发级别,即分段锁的数量
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 集合最大容量
*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 每段表的最小容量。 必须是 2 的整数幂,至少是 2,避免在构造之后立即扩容
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
/**
* 分段锁的最大段数;必须是小于 1 << 24 的 2 的幂。
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
/**
* 加锁前的重试次数
*/
static final int RETRIES_BEFORE_LOCK = 2;
/**
* segment的掩码。key的哈希值的高位用来选择具体的Segment
*/
final int segmentMask;
/**
* 偏移量
*/
final int segmentShift;
/**
* segment数组
*/
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;
transient Collection<V> values;
/**
* 默认构造方法,初始容量、加载因子以及并发级数都是用默认值
* DEFAULT_INITIAL_CAPACITY:16
* DEFAULT_LOAD_FACTOR:0.75
* DEFAULT_CONCURRENCY_LEVEL:16
*/
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 限制分段锁数量
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// 查找最佳匹配参数(不小于给定参数最接近的2次幂)
int sshift = 0;
// 表示并发度,也就是分段锁的数量
int ssize = 1;
// concurrencyLevel 默认值为16,while循环结束之后,sshift=4,ssize=16
// while循环是为了保证并发度为2的整数次方
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 偏移量 28
this.segmentShift = 32 - sshift;
// 掩码值 3
this.segmentMask = ssize - 1;
// 初始容量 16
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// c = 16 / 16 = 1
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity) {
++c;
}
// MIN_SEGMENT_TABLE_CAPACITY = 2
// cap表示每段表的容量,默认为2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c) {
cap <<= 1;
}
// create segments and segments[0]
// 第0个Segment的
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
// segments的大小为2的整数次方
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long SBASE;
private static final int SSHIFT;
private static final long TBASE;
private static final int TSHIFT;
static {
int ss, ts;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class tc = HashEntry[].class;
Class sc = Segment[].class;
// 获取数组中第一个元素的地址
TBASE = UNSAFE.arrayBaseOffset(tc);
SBASE = UNSAFE.arrayBaseOffset(sc);
// 获取数组中一个元素占用的字节
ts = UNSAFE.arrayIndexScale(tc);
ss = UNSAFE.arrayIndexScale(sc);
} catch (Exception e) {
throw new Error(e);
}
if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
throw new Error("data type scale not a power of two");
// 返回无符号整数ss/ts的最高非0位前面的0的个数,包括符号位在内;如果ss/ts为负数,将会返回0,因为符号位为1
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
}
- ConcurrentHashMap 默认的初始容量为 16,加载因子为 0.75,并发度为 16。而且,容量必须为2的整数次幂,通过第一个 while 循环进行保证
- 每段表的容量默认为 MIN_SEGMENT_TABLE_CAPACITY,也就是 2。加载因子会传递到 Segment 中,当每个 Segment 的元素数量达到一定的阈值时,需要进行 rehash 。segment 的个数不能扩容,但是其内部的元素数量可以扩容
HashEntry
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
HashEntry 用来保存 K-V 键值对数据,其中 value 是一个 volatile 关键字修饰的值,表示具有可见性。还有一个 HashEntrysetNext(HashEntry<K,V> n)
是一个 CAS 操作,保证线程安全。
Segment
// 继承至 ReentrantLock
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
/**
* 尝试锁定的最大次数,超过该值进入阻塞获取。在多处理器上,使用有限次数的重试来维护
* 在定位节点时获取的缓存
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* The per-segment table. 通过entryAt/setEntryAt来访问元素,提供volatile语义.
*
* table 是由 HashEntry 实例组成的数组。。如果 HashEntry 实例的哈希值发生碰撞,则会将
* 新的HashEntry 以链表的形式连接成一个链表。table 数组的成员表示 Hash 桶的一个桶,每
* 个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分。若并发级别为 16,table 守护
* ConcurrentHashMap 包含桶总数的 1/16
*/
transient volatile HashEntry<K,V>[] table;
/**
* 元素数量。仅在锁内或具备可见性(volatile)的读取操作中访问。
*
* count表示每个Segment实例管理的table数组(若干个HashEntry组成的链表)包含的HashEntry实例的个数
*/
transient int count;
/**
* table被更新的次数
*/
transient int modCount;
/**
* table中包含的HashEntry的数量超过该变量的值,触发rehash操作
*/
transient int threshold;
/**
* 加载因子
*/
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
......
}
- HashEntry 数组类型的成员 table,且因为 HashEntry 是一个单链表结构,因此 table 中的每一个元素对应一个 Hash 表中的桶
- Segment 继承了 ReentrantLock,因此具备了乐观锁的功能。每个 Segment 守护一个 table 数组中的元素(即 HashEntry 链表),每个 HashEntry 需要修改时,都要先获取到它对应的 Segment 锁
- ConcurrentHashMap 在默认并发级别会创建包含 16 个 Segment 对象的数组(见 ConcurrentHashMap 构造器),每个 Segment 大约守护整个哈希表中桶总数的 1/16,其中第 N 个哈希桶由第
N mod 16
个锁来保护。假设使用合理的哈希算法使关键字能够均匀地分部,那么这大约能使对锁的请求减少到原来的 1/16。默认并发级别的情况下,ConcurrentHashMap 支持多达 16 个并发的写入线程。
get(Object key) - 获取元素
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 计算key的hash值
int h = hash(key.hashCode());
// 根据hash值计算分段锁的索引
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// (1)获取对应的Segment对象
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
(2)
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
- 第一次 hash,函数为 ((h >>> segmentShift) & segmentMask),找到 segments 里面对应的 HashEntry 对应的下标,然后遍历该位置的链表
- 第二次 hash,函数为 ((tab.length - 1) & h),即 h 对数组长度取模,找到 Segment 里面对应的 HashEntry 的下标,然后遍历该位置的链表
可以看到,以上都是通过 Unsafe 的 getObjectVolatile() 方法来获取数组中的元素的。这是因为虽然 Segment 对象持有 HashEntry 数组的引用是 volatile 类型的,但是数组内部的元素并不是 volatile 类型,因此多线程对数组元素的修改不是线程安全的,可能会在数组中读到尚未构造完成的对象。
由于分段锁数组在构造时没进行初始化,可能读出来一个空值,因此需要先进行判断。在确定分段锁和它内部的哈希表都不为空之后,再通过哈希码读取HashEntry数组的元素,根据上面的源码可以看到,这时获得的是链表的头节点。之后再从头到尾对链表进行遍历查找,如果找到对应的值就将其返回,否则返回null。以上就是整个查找元素的过程。
另外,get() 方法不需要加锁是因为这是个只读操作,不会修改 Map 的数据结构,因此只要保证涉及到读取的数据的属性为线程可见即可,也就是使用 volatile 修饰涉及到的成员变量。
put(K key, V value) - 添加元素
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
public V putIfAbsent(K key, V value) {
Segment<K,V> s;
// 1. 保证传入的 value 不为 null,否则触发 NPE 异常
if (value == null)
throw new NullPointerException();
// 2. 计算 key 的 hashcode 值
int hash = hash(key.hashCode());
// 3. 计算出所处分段锁的下标并通过 CAS 操作尝试获取分段锁 Segment 对象
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
// 创建一个分段锁
s = ensureSegment(j);
// 4. 调用 Segment#put() 完成添加操作
return s.put(key, hash, value, true);
}
ConcurrentHashMap 有两个 put 方法,put() 与 putIfAbsent() 的区别在于:若 key-value 键值对已存在,前者会进行覆盖,后者则不会。源码可知,两者都是通过 Segment#put(K key, int hash, V value, boolean onlyIfAbsent) 方法调用完成的。整个调用顺序如下:
- 保证传入的 value 不为 null,否则触发 NPE 异常
- 计算 key 的 hashcode 值
- 根据 key 的 hashcode 值计算出所处分段锁的下标并通过 CAS 操作尝试获取分段锁 Segment 对象,若分段锁不存在,则通过 ensureSegment() 方法创建一个分段锁
- 调用 Segment#put(K key, int hash, V value, boolean onlyIfAbsent) 完成添加操作
ensureSegment() - 保证对应下标的分段锁存在
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
// 偏移量
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
该方法的功能就是判断在 segments 的偏移量为 u 的位置是否存在元素,不存在则创建一个新的 HashEntry 数组变量 tab,并通过 CAS 操作添加到 segments 数组中。
Segment#put(K , int , V , boolean) - 添加元素
在 ConcurrentHashMap 中的 put 方法中只完成了分段锁的寻址,真正的元素添加操作是在对应的分段锁中完成的。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 尝试获取锁,获取失败则进入 scanAndLockForPut() 方法,该方法中会检查 key 对应节点
// 对象是否已存在于分段锁的哈希表中,不存在会创建,方便获取锁之后直接进行使用
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
// 获取 table 数组对应索引的元素
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// 替换旧元素,modCount更新
e.value = value;
++modCount;
}
break;
}
e = e.next;
} else {
// 头插法
if (node != null)
node.setNext(first);
else
// 进来直接获取到锁,则可能走到这个分支
node = new HashEntry<K,V>(hash, key, value, first);
// 元素数量+1
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
// 超过阈值,进行扩容
rehash(node);
else
// 设置为桶元素
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 释放锁
unlock();
}
return oldValue;
}
为了保证线程安全,分段锁中的 put 操作是需要进行加锁的,所以线程一开始就会获取锁,若获取成功则继续执行,若获取失败则调用 scanAndLockForPut() 方法进行自旋,在自旋过程中会先扫描哈希表查找指定的 key,如果 key 不存在就会新建一个 HashEntry 返回,这样在获取到锁之后就不必再新建了,为的是在等待锁的过程中顺便做些事情,不至于白白浪费时间。
线程在成功获取到锁之后会根据计算到的下标获取指定下标的元素。此时获取到的是链表的头节点,如果头节点不为空就对链表进行遍历查找,找到之后再根据 onlyIfAbsent 参数的值决定是否进行替换。如果遍历时没找到头节点,就会新建一个 HashEntry 节点作为头节点。在向链表添加元素之后,检查元素总数是否超过阈值,如果超过就调用 rehash 进行扩容,没超过的话就直接将数组对应下标的元素引用指向新添加的节点。setEntryAt() 方法内部是通过调用 UnSafe 的 putOrderedObject() 方法来更改数组元素引用的,这样就保证了其他线程在读取时可以读到最新的值。
Segment#scanAndLockForPut(K, int, V)
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// 从分段 table 中获取 HashEntry 对象
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 尝试获取锁,获取失败继续循环
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
// 保证插入的数据对应的HashEntry存在
if (e == null) {
// 创建 HashEntry
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
} else if (key.equals(e.key)) {
// 根据 equals 方法查找对应的 node
retries = 0;
} else {
e = e.next;
}
} else if (++retries > MAX_SCAN_RETRIES) {
// 重试次数超过最大可尝试直接获取锁次数,进入自旋
lock();
break;
} else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
- 通过扫描对应分段锁中的 hash 表查找对应的 key:
- 如果 key 不存在,则创建 HashEntry 节点并返回
- 如果 key 存在,则根据 equals 方法在链表中查找已存在元素,若元素不存在则创建 HashEntry 节点并返回,否则返回 null
- 若 while 循环获取锁的次数大于 MAX_SCAN_RETRIES,则执行 lock 方法,进入阻塞等待,直到获取到锁为止
Segment#entryForHash(Segment, int) - 获取 key 的 hash 值对应的元素
static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
HashEntry<K,V>[] tab;
return (seg == null || (tab = seg.table) == null) ? null :
// 在hashEntry数组中获取相对偏移量的数据
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
}
该方法的作用就是根据 key 的 hash 值从分段锁的 table 数组中查找相对应的 HashEntry 对象。可能返回 null。
Segment#entryAt(HashEntry, int) - 从分段锁的哈希表中获取index对应的元素
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
该方法的作用就是根据 key 的 hash 值计算出对应哈希桶元素的索引,根据该索引从定位到 HashEntry 对象。可能返回 null。
rehash(HashEntry node) - 扩容
/**
* 将每个列表中的节点重新分类为新表。 因为我们使用的是二次幂扩容,所以每个 bin 中的元素
* 必须保持相同的索引,或者以二次幂的偏移量移动。 我们通过捕获可以重用旧节点的情况来消除
* 不必要的节点创建,因为它们的下一个字段不会改变。 据统计,在默认阈值下,当表翻倍时,
* 只有大约六分之一需要克隆。 一旦它们不再被可能处于并发遍历表中的任何读取器线程引用,
* 它们替换的节点将是可垃圾回收的。 入口访问使用普通数组索引,因为它们之后是 volatile 表写入。
*/
private void rehash(HashEntry<K,V> node) {
// 旧表
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 新表长度翻一倍
int newCapacity = oldCapacity << 1;
// 新表的扩容临界值
threshold = (int)(newCapacity * loadFactor);
// 创建新表
HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
// 方便计算索引
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 在新表中的索引
int idx = e.hash & sizeMask;
// 原来i位置中的元素没有发生冲突,也就是没有构成链表
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
// 查找链表中最后一个非重复元素及其索引,在lastIdx之后都是相同的索引
// 注意,lastRun可能是作为另一个节点(pre)的next存在的,但是没关系,在下一个循环中
// pre节点的next节点会被重置
for (HashEntry<K,V> last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 转移最后一部分相同key的元素到新表对应位置
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
// 创建一个 HashEntry,并将之前数组中相同位置的元素该HashEntry的next
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
- rehash() 方法只在 put() 方法中调用,由于 put() 中已经获取到 lock,所以可以确保线程安全
- 扩容时首先生成一个新的数组,大小为原数组的 2 倍,然后将原数组的元素迁移到新数组中,遍历原 HashEntry 数组,获取到每一个位置的 HashEntry 链表:
- 计算原数据应该转移到新数组的下标,如果原 HashEntry 链表只有一个元素,直接转移到新链表
- 如果有多个,采用 lastRun 机制,先获取最后几个在新数组连续的元素,先转移过去
- 遍历剩余的链表,根据原 HashEntry 元素生成新的 HashEntry 对象,并采用头插法放到新数组中
- 数据转移完之后,将新的元素再计算下标之后,头插法放到新数组中
- 将新数组更新到 Segment 对象中