1. 如何使 HashMap 变得线程安全?
要使用线程安全的 HashMap,我们其实有多种方法。例如可以调用 Collections 工具类里的 synchronizedMap 方法,或者是直接使用 Hashtable。由于 ConcurrentHashMap 中很大部分都与 HashMap 相同,因此本文仅介绍 ConcurrentHashMap 与 HashMap 的不同之处,相同之处可以参考 《HashMap 解析》这篇文章。
使用 synchronizedMap
为了实现线程安全,Collections 类实现了一系列线程安全的类,如 SynchronizedList、SynchronizedMap等。从名字就可以看出,这些类的底层是通过 synchronized 轻量锁来实现的。
使用 Hashtable
ConcurrentHashMap 与 HashMap 的不同之处
- 红黑树结构略有不同,HashMap 的红黑树中的节点叫做 TreeNode,TreeNode 不仅仅有属性,还维护着红黑树的结构,比如说查找,新增等等;ConcurrentHashMap 中红黑树被拆分成两块,TreeNode 仅仅维护的属性和查找功能,新增了 TreeBin,来维护红黑树结构,并负责根节点的加锁和解锁;
- 新增 ForwardingNode(转移)节点,扩容的时候会使用到,通过使用该节点,来保证扩容时的线程安全。
2. ConcurrentHashMap 源码解析
ConcurrentHashMap 的重要属性
先来看看 ConcurrentHashMap 的几个重要属性。 ```java // 采用volatile关键字修饰,一旦被修改会立马通知其他线程 transient volatile Node[] table;
private static final int DEFAULT_CAPACITY = 16;
// 转换为红黑树的长度阈值 static final int TREEIFY_THRESHOLD = 8;
// 用于识别扩容时正在转移数据 static final int MOVED = -1;
// 计算哈希值时用到的参数,用来去除符号位 static final int HASH_BITS = 0x7fffffff;
// 转移数据时,新的哈希表数组
private transient volatile Node
<a name="5504d209"></a>
## ConcurrentHashMap 的重要的类
了解完上面的重要属性后,再来看看 ConcurrentHashMap 中一些重要的类。
<a name="Node"></a>
### Node
链表中的元素为 Node 对象。它是链表上的一个节点,内部存储了 key、value 值,以及他的下一个节点的引用。这样一系列的 Node 就串成一串,组成一个链表。
<a name="ForwardingNode"></a>
### ForwardingNode
当进行扩容时,要把链表迁移到新的哈希表,在做这个操作时,会在把数组中的头节点替换为 ForwardingNode 对象。ForwardingNode 中不保存 key 和 value,只保存了扩容后哈希表(nextTable)的引用。此时查找相应 node 时,需要去 nextTable 中查找。
<a name="TreeBin"></a>
### TreeBin
当链表转为红黑树后,数组中保存的引用不再是 Node,而是 TreeBin,TreeBin 内部不保存 key/value,他保存了 TreeNode 的 list 以及红黑树 root。
<a name="TreeNode"></a>
### TreeNode
红黑树的节点。
<a name="dde7e601"></a>
## ConcurrentHashMap 的 put() 方法源码分析
```java
final V putVal(K key, V value, boolean onlyIfAbsent) { //onlyIfAbsent为 false 则覆盖已有的value值
//key和value不能为空
if (key == null || value == null) throw new NullPointerException();
//计算key的hash值,后面我们会看spread方法的实现
int hash = spread(key.hashCode());
int binCount = 0;
//开始自旋,table属性采取懒加载,第一次put的时候进行初始化
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果table未被初始化,则初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//通过key的hash值计算出要存入的位置
// 这里写法很严谨,使用CAS而不是直接赋值,因为在判断槽点为空的瞬间可能其他线程就做了赋值操作了
//如果该位置的值为空,那么使用CAS生成新的node来存储该key、value,放入此位置,结束for自旋
//否则继续自旋
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 具体地看看CAS,拿出tab中对应下标为i的内存值,与旧的预期值null做比较,如果相同,那么使用new出的Node执行赋值
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果该位置节点元素的hash值为MOVED,也就是-1,代表正在做扩容的复制。那么该线程参与复制工作。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//下面分支处理table映射的位置已经存在node的情况
else {
V oldVal = null;
// 锁定当前位置,使其余线程不能操作,保证了安全
synchronized (f) {
//再次确认该位置的值是否已经发生了变化
if (tabAt(tab, i) == f) {
//第一种情况:fh大于等于0,表示此位置存储的是链表
if (fh >= 0) {
//遍历链表,binCount用于统计node数量
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果存在一样hash值的node,那么根据onlyIfAbsent的值选择覆盖value或者不覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//如果找到最后一个元素,也没有找到相同hash的node,那么生成新的node存储key/value,作为尾节点放入链表。
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//第二种情况:fh小于0,表示 hash 值映射哈希表对应位置存储的是红黑树
//那么通过 TreeBin 对象的 putTreeVal 方法保存 key/value
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//node保存完成后,判断链表长度是否已经超出阈值,则进行哈希表扩容或者将链表转化为红黑树
//binCount 是用来记录链表保存 node 的数量的,可以看到当其大于 TREEIFY_THRESHOLD,也就是 8 的时候进行扩容。
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//treeifyBin()会选择是把此时保存数据所在的链表转为红黑树,还是对整个哈希表扩容。
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 计数,并且判断哈希表中使用的桶位是否超出阈值75%,超出的话进行扩容
// 与treeifyBin中的扩容不一样,treeifyBin中的扩容是由链表过长引起的,这里的扩容是由于75%的位置被使用引起的
addCount(1L, binCount);
return null;
}
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
主线流程如下图:
在判断 hash 映射是否为空的时候,取出哈希表中该 hash 值对应位置的代码如下。
tabAt(tab, i = (n - 1) & hash)
initTable() 源码分析
数组初始化时,首先通过自旋来保证一定可以初始化成功,然后通过 CAS 设置 sizeCtl 变量的值,来保证同一时刻只能有一个线程对数组进行初始化,CAS 成功之后,还会再次判断当前数组是否已经初始化完成,如果已经初始化完成,就不会再次初始化,通过自旋 + CAS + 双重 check 等手段保证了数组初始化时的线程安全,源码如下:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// table为空时进入循环
while ((tab = table) == null || tab.length == 0) {
//如果sizeCtl<0,那么有其他线程正在创建table,所以本线程让出CPU的执行权。直到table创建完成,while循环跳出。if中同时还把sizeCtl的值赋值给了sc。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//以CAS方式修改sizeCtl为-1,表示本线程已经开始创建table的工作。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 双重检查,再次确认是否table还是空的
if ((tab = table) == null || tab.length == 0) {
//如果sc有值,那么使用sc的值作为table的size,否则使用默认值16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//sc被设置为table大小的3/4(无符号右移两位,相当于除以4)
sc = n - (n >>> 2);
}
} finally {
//sizeCtl被设置为table大小的3/4
sizeCtl = sc;
}
break;
}
}
return tab;
}
里面有个关键的值 sizeCtl,这个值有多个含义。
- -1 代表有线程正在创建 table;
- -N 代表有 N-1 个线程正在复制 table;
- 在 table 被初始化前,代表根据构造函数传入的值计算出的应被初始化的大小;
- 在 table 被初始化后,则被设置为 table 大小 的 75%,代表 table 的容量(数组容量)。
initTable 中使用到 1 和 4,2 和 3 在其它方法中会有使用。下面我们可以先看下 ConcurrentHashMap 的构造方法,里面会使用上面的 3 。
ConcurrentHashMap 的构造函数源码分析
ConcurrentHashMap 带容量参数的构造函数源码如下:
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
//如果传入的初始化容量值超过最大容量的一半,那么sizeCtl会被设置为最大容量。
//否则通过tableSizeFor方法就算出一个2的n次方数值作为size
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
这是一个有参数的构造方法。如果你对未来存储的数据量有预估,我们可以指定哈希表的大小,避免频繁的扩容操作。tableSizeFor 这个方法确保了哈希表的大小永远都是 2 的 n 次方。如果 size 不是 2 的 n 次方,那么 hash 算法计算的下标发生的碰撞概率会大大增加。因此通过 tableSizeFor 方法确保了返回大于传入参数的最小 2 的 n 次方。注意这里传入的参数不是 initialCapacity,而是 initialCapacity 的 1.5 倍 + 1。这样做是为了保证在默认 75% 的负载因子下,能够足够容纳 initialCapacity 数量的元素,避免了再一次的扩容。讲到这里你一定好奇 tableSizeFor 是如何实现向上取得最接近入参 2 的 n 次方的。下面我们来看 tableSizeFor 源代码:
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
依旧是二进制按位操作,这样一顿操作后,得到的数值就是大于 c 的最小 2 的 n 次。我们推演下过程,假设 c 是 9:
1、int n = 9 - 1
n=8
2、n |= n >>> 1
n=1000
n >>> 1=0100
两个值按位或后
n=1100
3、n |= n >>> 2
n=1100
n >>> 2=0011
n=1111
到这里可以看出规律来了。如果 c 足够大,使得 n 很大,那么运算到 n |= n >>> 16 时,n 的 32 位都为 1。
总结一下这一段逻辑,其实就是把 n 有数值的 bit 位全部置为 1。这样就得到了一个肯定大于等于 n 的值。我们再看最后一行代码,最终返回的是 n+1,那么一个所有位都是 1 的二进制数字,+1 后得到的就是一个 2 的 n 次方数值。
关于 ConcurrentHashMap (int initialCapacity) 构造函数的分析我们总结下:
- 构造函数中并不会初始化哈希表;
- 构造函数中仅设置哈希表大小的变量 sizeCtl;
- initialCapacity 并不是哈希表大小,哈希表大小 sizeCtl 为 initialCapacity1.5+1 后,*向上取最小的 2 的 n 次方。如果超过最大容量一半,那么就是最大容量。
treeifyBin() 方法源码分析
首先我们要理解为什么 Map 需要扩容,这是因为我们采用哈希表存储数据,当固定大小的哈希表存储数据越来越多时,链表长度会越来越长,这会造成 put 和 get 的性能下降。此时我们希望哈希表中多一些桶位,预防链表继续堆积的更长。接下来我们分析 treeifyBin 方法代码,这个代码中会选择是把此时保存数据所在的链表转为红黑树,还是对整个哈希表扩容。
我们再重点看一下 tryPresize,此方法中实现了对数组的扩容,传入的参数 size 是原来哈希表大小的一倍。我们假定原来哈希表大小为 16,那么传入的 size 值为 32,以此数值作为例子来分析源代码。注意 while 中第一个 if 此时不会进入,但为了讲解代码我也在注释中一并讲解了,大家看的时候在这个分支中不要以 size=16 作为前提来分析。private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //如果哈希表长度小于64,那么选择扩大哈希表的大小,而不是把链表转为红黑树 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); //将哈希表中index位置的链表转为红黑树 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { //下面逻辑将node链表转化为TreeNode链表 if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } //TreeBin代表红黑树,将TreeBin保存在哈希表的index位置 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
讲到这里我们再回一下 put 方法中最后有如下一行代码://size为32 //sizeCtl为原大小16的3/4,也就是12 private final void tryPresize(int size) { //根据tableSizeFor计算出满足要求的哈希表大小,对齐为2的n次方。c被赋值为64,这是扩容的上限,扩容一般都是扩容为原来的2倍,这里c值为了处理一些特殊的情况,确保扩容能够正常退出。 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; //此时sc和sizeCtl均为12,进入while循环 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; //这里处理的table还未初始化的逻辑,这是由于putAll操作不调用initTable,而是直接调用tryPresize if (tab == null || (n = tab.length) == 0) { //putAll第一次调用时,假设putAll进来的map只有一个元素,那么size传入1,计算出c为2.而sc和sizeCtl都为0,因此n=2 n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; //经过计算sc=2 sc = n - (n >>> 2); } } finally { //sizeCtl设置为2.第二次循环时,因为sc和c相等,都为2,进入下面的else if分支,结束while循环。 sizeCtl = sc; } } } //扩容已经达到C值,结束扩容 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; //table已经存在,那么就对已有table进行扩容 else if (tab == table) { int rs = resizeStamp(n); //sc小于0,说明别的线程正在扩容,本线程协助扩容 if (sc < 0) { Node<K,V>[] nt; //判断是否扩容的线程达到上限,如果达到上限,退出 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //未达上限,参与扩容,更新sizeCtl值。transfer方法负责把当前哈希表数据移入新的哈希表。 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //本线程为第一个扩容线程,transfer第二个参数传入null,代表需要新建扩容后的哈希表 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //这个扩容方法就不展开了,核心是每次都是将数组大小扩容到原来 2 倍,这样保证了哈希表的大小始终为 2 的 n 次方。 transfer(tab, null); } } }
这行代码其实是对哈希表保存的元素数量进行计数。同时根据当前保存状况,判断是否进行扩容。你可能会问,在添加元素的过程中不是已经执行了扩容的逻辑了吗?其实上面的扩容逻辑是链表过长引起的,还有另外一种因素可能引起链表扩容,不过这两种扩容的逻辑是基本一致的。addCount(1L, binCount);
触发扩容的因素其实有两种(HashMap 也是这样):
- 链表过长引起。
- 链表长度超过 TREEIFY_THRESHOLD 8 时,触发 treeifyBin(),treeifyBin() 检查当前哈希表长度是否小于 MIN_TREEIFY_CAPACITY 64,如果没超过,则进行扩容;如果超过,就将链表转为红黑树。
- addCount 方法中判断哈希表是否超过 75% 的位置已经被使用,如果超过则进行扩容。
ConcurrentHashMap 的 get() 方法源码分析
put 的源代码比较复杂,其实 put 方法的复杂是为了 get 服务,以提高 get 的效率。相比较 put 方法而言,get 方法就简单多了。先获取数组的下标,然后通过判断数组下标的 key 是否和我们的 key 相等,相等的话直接返回,如果下标的槽点是链表或红黑树的话,分别调用相应的查找数据的方法,整体思路和 HashMap 很像。
ConcurrentHashMap 中,通过大量的 CAS 操作加上 Synchronized 来确保线程安全。对 ConcurrentHashMap 的学习我们把重点放在哈希算法和扩容上,面试的时候是考察的重点。public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //获取key值的hash值 int h = spread(key.hashCode()); //这个if判断中做了如下几件事情: //1、哈希表是否存在 //2、哈希表是否保存了数据,同时取得哈希表length //3、哈希表中hash值映射位置保存的对象不为null,并取出给e,e为链表头节点 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //如果e的hash值和传入key的hash值相等 if ((eh = e.hash) == h) { //如果e的key和传入的key引用相同,或者key eaquals ek。那么返回e的value。 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果头节点的hash<0,有两种情况 //1、hash=-1,正在扩容,该节点为ForwardingNode,通过find方法在nextTable中查找 //2、hash=-2,该节点为TreeBin,链表已经转为了红黑树。同样通过TreeBin的find方法查找。 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //以上两种条件不满足,说明hash映射位置保存的还是链表头节点,但是和传入key值不同。那么遍历链表查找即可。 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
3. ConcurrentHashMap 如何体现线程安全的?
数组初始化时的线程安全
数组初始化时,首先通过自旋来保证一定可以初始化成功,然后通过 CAS 设置 SIZECTL 变量的值,来保证同一时刻只能有一个线程对数组进行初始化,CAS 成功之后,还会再次判断当前数组是否已经初始化完成,如果已经初始化完成,就不会再次初始化,通过自旋 + CAS + 双重 check 等手段保证了数组初始化时的线程安全。具体可查看 initTable() 方法。新增槽点值时的线程安全
此时为了保证线程安全,做了四处优化:
1. 通过自旋死循环保证一定可以新增成功。
在新增之前,通过for (Node<K,V>[] tab = table;;)
这样的死循环来保证新增一定可以成功,一旦新增成功,就可以退出当前死循环,新增失败的话,会重复新增的步骤,直到新增成功为止。
2. 当前槽点为空时,通过 CAS 新增。
这里的写法非常严谨,没有在判断槽点为空的情况下直接赋值,因为在判断槽点为空和赋值的瞬间,很有可能槽点已经被其他线程赋值了,所以我们采用 CAS 算法,能够保证槽点为空的情况下赋值成功,如果恰好槽点已经被其他线程赋值,当前 CAS 操作失败,会再次执行 for 自旋,再走槽点有值的 put 流程,这里就是自旋 + CAS 的结合。
3. 当前槽点有值,用 synchronized 锁住当前槽点。
put 时,如果当前槽点有值,就是 key 的 hash 冲突的情况,此时槽点上可能是链表或红黑树,我们通过锁住槽点,来保证同一时刻只会有一个线程能对槽点进行修改。
4. 红黑树旋转时,锁住红黑树的根节点,保证同一时刻,当前红黑树只能被一个线程旋转。
通过以上 4 点,保证了在各种情况下的新增(不考虑扩容的情况下),都是线程安全的,通过自旋 + CAS + 锁三大姿势,实现的很巧妙,值得我们借鉴。
扩容时的线程安全
ConcurrentHashMap 的扩容时机和 HashMap 相同,都是在 put 方法的最后一步检查是否需要扩容,如果需要则进行扩容,但两者扩容的过程完全不同,ConcurrentHashMap 扩容的方法叫做 transfer,从 put 方法的 addCount 方法进去,就能找到 transfer 方法,transfer 方法的主要思路是:
- 首先需要把老数组的值全部拷贝到扩容之后的新数组上,先从数组的队尾开始拷贝;
- 拷贝数组的槽点时,先把原数组槽点锁住,保证原数组槽点不能操作,成功拷贝到新数组时,把原数组槽点赋值为转移节点;
- 这时如果有新数据正好需要 put 到此槽点时,发现槽点为转移节点,就会一直等待,所以在扩容完成之前,该槽点对应的数据是不会发生变化的;
- 从数组的尾部拷贝到头部,每拷贝成功一次,就把原数组中的节点设置成转移节点;
直到所有数组数据都拷贝到新数组时,直接把新数组整个赋值给数组容器,拷贝完成。
4. JDK1.7 和 JDK1.8 中 ConcurrentHashMap 的锁机制区别
JDK 1.7 中,采用分段锁的机制,实现并发的更新操作,底层采用数组+链表的存储结构,包括两个核心静态内部类 Segment 和 HashEntry。
Segment 继承 ReentrantLock(重入锁) 用来充当锁的角色,每个 Segment 对象守护每个散列映射表的若干个桶;
- HashEntry 用来封装映射表的键-值对(就是 JDK1.8 的 Node);
- 每个桶是由若干个 HashEntry 对象链接起来的链表
JDK 1.8 中,采用Node + CAS + Synchronized来保证并发安全。取消 Segment,直接用 table 数组存储键值对;当 HashEntry 对象组成的链表长度超过 TREEIFY_THRESHOLD 时,链表转换为红黑树,提升性能。底层变更为数组 + 链表 + 红黑树。
JDK 1.8 中直接使用 synchronized 对槽点上锁,而不是使用 ReentrantLock 的好处是:
- 粒度降低了。
- JVM 开发团队没有放弃 synchronized,而且基于 JVM 的 synchronized 优化空间更大,更加自然。
- 在大量的数据操作下,对于 JVM 的内存压力,基于 API 的 ReentrantLock 会开销更多的内存。