前言
我们知道 HashMap 在并发中是不安全的,资源竞争激烈时常常会使用 ConcurrentHashMap,因为它可以支持并发读写。
与同是线程安全的老大哥 HashTable 相比,它更胜一筹,因为它的锁更加细化,而不是像 HashTable ,几乎为每个方法添加了 synchronized 锁,过度使用重量级锁势必会影响程序性能。
与 JDK1.7 相比,不再使用原有锁(Segment),利用原语 CAS 和 volatile 支持并发操作,引用了红黑树,不用过度使 hash 值离散,在查询效率上花费了不少心思。
ConcurrentHashMap 内部的方法很多,这篇文章只会介绍其中比较重要的方法。
内部结构
成员变量
值得注意的是 sizeCtl
这个变量,它在 ConcurrentHashMap 里面出镜率十分高,而且在不同的方法里有不同的用处,代表了不同的含义:
- 负数:hash 表正在初始化或扩容;
- -1:hash 表正在初始化;
- -N:有 n-1 个线程正在扩容;
正数或 0:hash 表还未初始化,代表初始化或下一次进行扩容的大小。此时它的值是 ConcurrentHashMap 容量的 3/4,与
loadfactor
是对应的。public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
// 表的最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认表的大小
private static final int DEFAULT_CAPACITY = 16;
// 最大数组大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认并发数
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 装载因子
private static final float LOAD_FACTOR = 0.75f;
// 转化为红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
// 由红黑树转化为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 转化为红黑树的表的最小容量
static final int MIN_TREEIFY_CAPACITY = 64;
// 每次进行转移的最小值
private static final int MIN_TRANSFER_STRIDE = 16;
// 生成sizeCtl所使用的bit位数
private static int RESIZE_STAMP_BITS = 16;
// 进行扩容所允许的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 记录sizeCtl中的大小所需要进行的偏移位数
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 一系列的标识
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
//
/** Number of CPUS, to place bounds on some sizings */
// 获取可用的CPU个数
static final int NCPU = Runtime.getRuntime().availableProcessors();
//
/** For serialization compatibility. */
// 进行序列化的属性
private static final ObjectStreamField[] serialPersistentFields = {
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE)
};
// 表
transient volatile Node<K,V>[] table;
// 下一个表
private transient volatile Node<K,V>[] nextTable;
//
/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
// 基本计数
private transient volatile long baseCount;
//
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
/**
* 重要控制变量
* 根据变量的数值不同,类实例处于不同阶段
* 1. = -1 : 正在初始化
* 2. < -1 : 正在扩容,数值为 -(1 + 参与扩容的线程数)
* 3. = 0 : 创建时初始为0
* 4. > 0 : 下一次扩容的大小
*/
private transient volatile int sizeCtl;
/**
* The next table index (plus one) to split while resizing.
*/
// 扩容下另一个表的索引
private transient volatile int transferIndex;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
// 旋转锁
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
// counterCell表
private transient volatile CounterCell[] counterCells;
// views
// 视图
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
}
Node
Node 是 ConcurrentHashMap 中最核心的内部类,它包装了 key-value 键值对,所有插入 ConcurrentHashMap 的数据都包装在这里面。它与 HashMap 内的 Node 很相似,但是对 value 和 next 属性添加了 volatile 修饰符,保证了可见性。
Node 中的 key 和 value 不能为 null。子类的 hash 值可以为负数,代表特殊的并发意义。
static class Node<K,V> implements Map.Entry<K,V> {
// Node节点的hash值和key的hash值相同
// TreeNode节点的hash值
final int hash;
final K key;
volatile V val; //带有同步锁的value(保证可见性)
volatile Node<K,V> next;//带有同步锁的next指针
Node(inthash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
// HashMap调用Objects.hashCode(),最终也是调用Object.hashCode();效果一样
public final int hashCode() { returnkey.hashCode() ^ val.hashCode(); }
public final String toString(){ returnkey + "=" + val; }
//不允许直接改变value的值
public final V setValue(V value) { // 不允许修改value值,HashMap允许
throw new UnsupportedOperationException();
}
// HashMap使用if (o == this),且嵌套if;concurrent使用&&
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((oinstanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
// 增加find方法辅助get方法
Node<K,V> find(inth, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
/**
* 以链表形式查找桶中下一个Node信息
* 当转换为subclass红黑树节点TreeNode
* 则使用TreeNode中的find进行查询操作
*/
} while ((e = e.next) != null);
}
return null;
}
}
TreeNode
Node 的子类,红黑树节点。当 Node 链表过长时,会转换为红黑树。
但它不是直接转换为红黑树,而是把这些节点包装成 TreeNode 放在 TreeBin 对象中,由 TreeBin 完成对红黑树的包装。而且 TreeNode 继承于 ConcurrentHashMap 的 Node,说明它也带有 next 指针,这样做的目的是方便 TreeBin 的访问。
// Nodes for use in TreeBins,链表>8,才可能转为TreeNode.
// HashMap的TreeNode继承至LinkedHashMap.Entry;而这里继承至自己实现的Node,将带有next指针,便于treebin访问。
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(inthash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(inth, Object k) {
return findTreeNode(h, k, null);
}
/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/ // 查找hash为h,key为k的节点
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) { // 比HMap增加判空
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
returnp;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
// 和HashMap相比,这里的TreeNode相当简洁;ConcurrentHashMap链表转树时,并不会直接转,
// 正如注释(Nodes for use in TreeBins)所说,只是把这些节点包装成TreeNode放到TreeBin中,
// 再由TreeBin来转化红黑树。
TreeBin
红黑树的操作是针对 TreeBin 的,它会将 TreeNode 再一次封装。
它用作树的头节点,只存储 root 和 first 节点,不存储节点的 key、value 值。
// TreeBin用于封装维护TreeNode,包含putTreeVal、lookRoot、UNlookRoot、remove、
// balanceInsetion、balanceDeletion等方法,这里只分析其构造函数。当链表转树时,
// 用于封装TreeNode,也就是说,ConcurrentHashMap的红黑树存放的是TreeBin,而不是treeNode。
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);//hash值为常量TREEBIN= -2, 表示 roots of trees
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
elseif (ph < h)
dir = 1;
elseif ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
ForwardingNode
ForwardingNode 在转移时放在头部的节点,是一个空节点。
它包含一个 nextTable 指针,用于指向下一张表。而且这个节点的 key、value 和 next 指针都是 null,其 hash 值为 -1。
这里定义的 find 方法是从 nextTable 里查询节点,而不是自身为头节点进行查询。
// A node inserted at head of bins during transfer operations.连接两个table
// 并不是我们传统的包含key-value的节点,只是一个标志节点,并且指向nextTable,提供find方法而已。
// 生命周期:仅存活于扩容操作且bin不为null时,一定会出现在每个bin的首位。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null); // 此节点hash=-1,key、value、next均为null
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
// 查nextTable节点,outer避免深度递归
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; intn;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) { // CAS算法多和死循环搭配!直到查到或null
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
returne;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}
核心方法
构造方法
在使用 ConcurrentHashMap 第一件事自然而然就是 new 出来一个 ConcurrentHashMap 对象,一共提供了如下几个构造器方法:
// 1. 构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16
ConcurrentHashMap()
// 2. 给定map的大小
ConcurrentHashMap(int initialCapacity)
// 3. 给定一个map
ConcurrentHashMap(Map<? extends K, ? extends V> m)
// 4. 给定map的大小以及加载因子
ConcurrentHashMap(int initialCapacity, float loadFactor)
// 5. 给定map大小,加载因子以及并发度(预计同时操作数据的线程)
ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
我们来看看第2种构造器,传入指定大小时的情况,该构造器源码为:
public ConcurrentHashMap(int initialCapacity) {
//1. 小于0直接抛异常
if (initialCapacity < 0)
throw new IllegalArgumentException();
//2. 判断是否超过了允许的最大值,超过了话则取最大值,否则再对该值进一步处理
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//3. 赋值给sizeCtl
this.sizeCtl = cap;
}
这段代码的逻辑请看注释,很容易理解,如果小于 0 就直接抛出异常,如果指定值大于了所允许的最大值的话就取最大值,否则,在对指定值做进一步处理。最后将 cap 赋值给 sizeCtl,关于 sizeCtl 的说明请看上面的说明,当调用构造器方法之后,sizeCtl 的大小应该就代表了 ConcurrentHashMap 的大小,即 table 数组长度。tableSizeFor 做了哪些事情了?源码为:
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
该方法会将调用构造器方法时指定的大小转换成一个 2 的幂次方数,也就是说 ConcurrentHashMap 的大小一定是 2 的幂次方,比如,当指定大小为 18 时,为了满足 2 的幂次方特性,实际上 ConcurrentHashMap 的大小为 2 的 5 次方(32)。
另外,需要注意的是,调用构造器方法的时候并未构造出 table 数组(可以理解为 ConcurrentHashMap 的数据容器),只是算出 table 数组的长度,当第一次向 ConcurrentHashMap 插入数据的时候才真正的完成初始化创建 table 数组的工作。
定位节点位置
ConcurrentHashMap 有几个方法可以快速定位元素在数组的位置,并且提供 CAS 原子操作。
@SuppressWarnings("unchecked")
//获得在i位置上的Node节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/*
*这边为什么i要等于((long)i << ASHIFT) + ABASE呢,计算偏移量
*ASHIFT是指tab[i]中第i个元素在相对于数组第一个元素的偏移量,而ABASE就算第一数组的内存素的偏移地址
*所以呢,((long)i << ASHIFT) + ABASE就算i最后的地址
* 那么compareAndSwapObject的作用就算tab[i]和c比较,如果相等就tab[i]=v否则tab[i]=c;
*/
//利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
//在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改
//因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 有点类似于SVN
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//利用volatile方法设置节点位置的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
putVal
putVal 是将一个新 key-value 插入到当前 ConcurrentHashMap 的关键方法。
此方法的具体流程是:
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许 key 和 value 为空
if (key == null || value == null) throw new NullPointerException();
// 1.计算 key 的 hash 值(计算新节点的hash值)
int hash = spread(key.hashCode()); // 返回 (h^(h>>>16))&HASH_BITS
int binCount = 0;
// 获取当前table,进入死循环,直到插入成功!
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 2. 如果当前 table 还没初始化先调用 initTable 方法将 tab 进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 如果table为空,执行初始化,也即是延迟初始化
// 3. tab中索引为i的位置的元素为null,则直接使用 CAS 将值插入即可
// 如果bin为空,则采用cas算法赋值,无需加锁
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
// 直接设置为桶首节点成功,退出死循环(出口之一)
break;
}
// 4. 当前正在扩容
// 当前桶首节点正在特殊的扩容状态下,当前线程尝试参与扩容
// 然后重新进入死循环
//f.hash == MOVED 表示为:ForwardingNode,说明其他线程正在扩容
else if ((fh = f.hash) == MOVED) // MOVED = -1
tab = helpTransfer(tab, f); // 当发现其他线程扩容时,帮其扩容
// 通过桶首节点,将新节点加入table
else {
V oldVal = null;
// 获取桶首节点实例对象锁,进入临界区进行添加操作
synchronized (f) {
// 再判断以此f是否仍是第一个Node,如果不是,退出临界区,重复添加操作
if (tabAt(tab, i) == f) {
//5. 当前为链表,在链表中插入新的键值对
if (fh >= 0) { // 桶首节点hash值>0,表示为链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 找到hash值相同的key,覆盖旧值即可
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 仅 putIfAbsent() 方法中的 onlyIfAbsend 为 true;
if (!onlyIfAbsent)
// putIfAbsend() 包含 key 则返回 get ,否则 put 并返回
e.val = value;
break;
}
Node<K,V> pred = e;
//如果到链表末尾仍未找到,则直接将新值插入到链表末尾即可
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 桶首节点为Node子类型TreeBin,表示为红黑树
// 6.当前为红黑树,将新的键值对插入到红黑树中
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 调用putTreeVal方法,插入新值
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
// key已经存在,则替换
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
// 插入新节点后,达到链表转换红黑树阈值,则执行转换操作
// 此函数内部会判断是树化,还是扩容:tryPresize
treeifyBin(tab, i);
// 退出死循环(出口之二)
if (oldVal != null)
return oldVal;
break;
}
}
}
// 更新计算count时的base和counterCells数组
//8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
addCount(1L, binCount);
return null;
}
当 table[i] 为链表的头节点,在链表中插入新值时,table[i] 不为 null,也不是 ForwardingNode,且当前 Node f 的 hash 值大于 0(fh>0),说明当前节点 f 为当前链表的头节点。如果在链表中找到了与 key 相同的节点,直接覆盖,如果整条链表上都没找到,就直接将新节点追加到链表的尾部。
如果 table[i] 为 null,直接设置链表头节点。
spread 方法
该方法主要将 key 的 hashCode 的低 16 位和高 16 位进行异或运算。JDK1.8 引进了红黑树,即使冲突了查询的效率也很高,所以这里的位运算比以前减少了。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
initTable 方法
数组初始化方法,允许多线程同时进入,但只有一个线程可以完成 table 的初始化,其它线程都会通过 yield 方法让出 CPU。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 前文提及sizeCtl是重要的控制变量
// sizeCtl = -1 表示正在初始化
if ((sc = sizeCtl) < 0)
// 已经有其他线程在执行初始化,则主动让出cpu
// 1. 保证只有一个线程正在进行初始化操作
Thread.yield();
// 利用CAS操作设置sizeCtl为-1
// 设置成功表示当前线程为执行初始化的唯一线程
// 此处进入临界区
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 由于让出cpu的线程也会后续进入该临界区
// 需要进行再次确认table是否为null
if ((tab = table) == null || tab.length == 0) {
// 2. 得出数组的大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 3. 这里才真正的初始化数组,即分配Node数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 默认负载为0.75
// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
// 退出死循环的唯一出口
break;
}
}
return tab;
}
为了保证能够正确初始化,如果当前已经有一个线程正在初始化(sizeCtl=-1),其它线程判断 sizeCtl == -1
为 true 后调用 Thread.yield 方法让出 CPU 时间片。正在初始化的线程会调用 U.conpareAndSwapInt 原子方法将 sizeCtl 修改为 -1。
初始化成功后,将 sizeCtl 设置为数组长度的 3/4。这里计算 sizeCtl 也挺有意思的,直接使用位运算 n - (n >>> 2)
。
threeifyBin 方法
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; intn, sc;
if (tab != null) {
// 数组的大小还未超过64
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1); // 容量<64,则table两倍扩容,不转树了
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) { // 读写锁
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
tryPresize(扩容)
private final void tryPresize(int size) {
//计算扩容的目标size
// 给定的容量若>=MAXIMUM_CAPACITY的一半,直接扩容到允许的最大值,否则调用函数扩容
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) { //没有正在初始化或扩容,或者说表还没有被初始化
Node<K,V>[] tab = table; int n;
//tab没有初始化
if(tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c; // 扩容阀值取较大者
// 期间没有其他线程对表操作,则CAS将SIZECTL状态置为-1,表示正在进行初始化
//初始化之前,CAS设置sizeCtl=-1
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
//sc=0.75n,相当于扩容阈值
sc = n - (n >>> 2); //无符号右移2位,此即0.75*n
}
} finally {
// 此时并没有通过CAS赋值,因为其他想要执行初始化的线程,
// 发现sizeCtl=-1,就直接返回,从而确保任何情况,
// 只会有一个线程执行初始化操作。
sizeCtl = sc;
}
}
}// 若欲扩容值不大于原阀值,或现有容量>=最值,什么都不用做了
//目标扩容size小于扩容阈值,或者容量超过最大限制时,不需要扩容
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//扩容
else if (tab == table) {
int rs = resizeStamp(n);
// sc<0表示,已经有其他线程正在扩容
if (sc < 0) {
Node<K,V>[] nt;//RESIZE_STAMP_SHIFT=16,MAX_RESIZERS=2^15-1
// 1. (sc >>> RESIZE_STAMP_SHIFT) != rs :扩容线程数 > MAX_RESIZERS-1
// 2. sc == rs + 1 和 sc == rs + MAX_RESIZERS :表示什么???
// 3. (nt = nextTable) == null :表示nextTable正在初始化
// transferIndex <= 0 :表示所有hash桶均分配出去
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
//如果不需要帮其扩容,直接返回
break;
//CAS设置sizeCtl=sizeCtl+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//帮其扩容
transfer(tab, nt);
}
// 第一个执行扩容操作的线程,将sizeCtl设置为:
// (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
transfer 方法
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*
* transferIndex 表示转移时的下标,初始为扩容前的 length。
*
* 我们假设长度是 32
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
// 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围 stridea:TODO
// 新的 table 尚未初始化
if (nextTab == null) { // initiating
try {
// 扩容 2 倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
// 更新
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 扩容失败, sizeCtl 使用 int 最大值。
sizeCtl = Integer.MAX_VALUE;
return;// 结束
}
// 更新成员变量
nextTable = nextTab;
// 更新转移下标,就是 老的 tab 的 length
transferIndex = n;
}
// 新 tab 的 length
int nextn = nextTab.length;
// 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
boolean advance = true;
// 完成状态,如果是 true,就结束此方法。
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间
while (advance) {
int nextIndex, nextBound;
// 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
// 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。
// 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。
if (--i >= bound || finishing)
advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
// 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。
else if ((nextIndex = transferIndex) <= 0) {
// 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了
// 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断
i = -1;
advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
}// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;// 这个值就是当前线程可以处理的最小当前区间最小下标
i = nextIndex - 1; // 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。
}
}// 如果 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束)
// 如果 i >= tab.length(不知道为什么这么判断)
// 如果 i + tab.length >= nextTable.length (不知道为什么这么判断)
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 如果完成了扩容
nextTable = null;// 删除成员变量
table = nextTab;// 更新 table
sizeCtl = (n << 1) - (n >>> 1); // 更新阈值
return;// 结束方法。
}// 如果没完成
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
return;// 不相等,说明没结束,当前线程结束方法。
finishing = advance = true;// 如果相等,扩容结束了,更新 finising 变量
i = n; // 再次循环检查一下整张表
}
}
else if ((f = tabAt(tab, i)) == null) // 获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。
advance = casTabAt(tab, i, null, fwd);// 如果成功写入 fwd 占位,再次推进一个下标
else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED。
advance = true; // already processed // 说明别的线程已经处理过了,再次推进一个下标
else {// 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据
synchronized (f) {
// 判断 i 下标处的桶节点是否和 f 相同
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;// low, height 高位桶,低位桶
// 如果 f 的 hash 值大于 0 。TreeBin 的 hash 是 -2
if (fh >= 0) {
// 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则为0)
// 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1
// 如果是结果是0 ,Doug Lea 将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中他。
int runBit = fh & n;
Node<K,V> lastRun = f; // 尾节点,且和头节点的 hash 值取于不相等
// 遍历这个桶
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 取于桶中每个节点的 hash 值
int b = p.hash & n;
// 如果节点的 hash 值和首节点的 hash 值取于结果不同
if (b != runBit) {
runBit = b; // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。
lastRun = p; // 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环
}
}
if (runBit == 0) {// 如果最后更新的 runBit 是 0 ,设置低位节点
ln = lastRun;
hn = null;
}
else {
hn = lastRun; // 如果最后更新的 runBit 是 1, 设置高位节点
ln = null;
}// 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果)
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 如果与运算结果是 0,那么就还在低位
if ((ph & n) == 0) // 如果是0 ,那么创建低位节点
ln = new Node<K,V>(ph, pk, pv, ln);
else // 1 则创建高位
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 其实这里类似 hashMap
// 设置低位链表放在新链表的 i
setTabAt(nextTab, i, ln);
// 设置高位链表,在原有长度上加 n
setTabAt(nextTab, i + n, hn);
// 将旧的链表设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}// 如果是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 和链表相同的判断,与运算 == 0 的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} // 不是 0 的放在高位
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树
setTabAt(nextTab, i, ln);
// 高位数
setTabAt(nextTab, i + n, hn);
// 旧的设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}
}
}
}
}
}
get 方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 1. 重hash
int h = spread(key.hashCode());
// 2. table[i]桶节点的key与查找的key相同,则直接返回
if ((tab = table) != null && (n = tab.length) > 0 &&
// 唯一一处volatile读操作
(e = tabAt(tab, (n - 1) & h)) != null) {
// 注意:因为容器大小为2的次方,所以 h mod n = h & (n -1)
if ((eh = e.hash) == h) {// 如果hash值相等
// 检查第一个Node
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash为负表示是扩容中的ForwardingNode节点
// 直接调用ForwardingNode的find方法(可以是代理到扩容中的nextTable)
// 3. 当前节点hash小于0说明为树节点,在红黑树中查找即可
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表,对比key值
// 通过next指针,逐一查找
while ((e = e.next) != null) {
//4. 从链表中查找,查找到则返回该节点的value,否则就返回null即可
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
这个 get 请求,我们需要 cas 来保证变量的原子性。如果 tab[i] 正被锁住,那么 CAS 就会失败,失败之后就会不断的重试。这也保证了在高并发情况下不会出错。
我们来分析一下哪些情况会导致 get 在并发的情况下可能取不到值。
- 一个线程在 get 的时候,另一个线程在对同一个 key 的 node 进行 remove 操作
- 一个线程在 get 的时候,另一个线程正在重排 table 。可能导致旧 table 取不到值
那么本质是,在 get 的时候,有其他线程在对同一桶的链表或树进行修改。那么 get 是怎么保证同步性的呢?我们看到 e = tabAt(tab, (n - 1) & h)) != null
,在看下 tablAt 到底是干嘛的:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
它是对 tab[i] 进行原子性的读取,因为我们知道 putVal 等对 table 的桶操作是有加锁的,那么一般情况下我们对桶的读也是要加锁的,但是我们这边为什么不需要加锁呢?因为我们用了 Unsafe 的 getObjectVolatile,因为 table 是 volatile 类型,所以对 tab[i] 的原子请求也是可见的。
因为如果同步正确的情况下,根据 happens-before 原则,对 volatile 域的写入操作 happens-before 于每一个后续对同一域的读操作。所以不管其他线程对 table 链表或树的修改,都对 get 读取可见。
remove 方法
和 put 方法一样,多个 remove 线程请求不同的hash桶时,可以并发执行。
删除的 node 节点的 next 依然指着下一个元素。此时若有一个遍历线程正在遍历这个已经删除的节点,这个遍历线程依然可以通过 next 属性访问下一个元素。从遍历线程的角度看,他并没有感知到此节点已经删除了,这说明了 ConcurrentHashMap 提供了弱一致性的迭代器。
public V remove(Object key) {
return replaceNode(key, null, null);
}
// 当参数 value == null 时,删除节点。否则更新节点的值为value
// cv 是个期望值,当 map[key].value 等于期望值 cv 或 cv == null 时,
// 删除节点,或者更新节点的值
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// table 还没初始化或key对应的 hash 桶为空
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
// 正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
// CAS 获取 tab[i] ,如果此时 tab[i] != f,说明其他线程修改了 tab[i]
// 回到 for 循环开始处,重新执行
if (tabAt(tab, i) == f) {
// node 链表
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
// ev 代表参数期望值
// cv == null:直接更新value/删除节点
// cv 不为空,则只有在 key 的 oldVal 等于
// 期望值的时候,才更新 value/删除节点
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
//更新value
if (value != null)
e.val = value;
//删除非头节点
else if (pred != null)
pred.next = e.next;
//删除头节点
else
// 因为已经获取了头结点锁,所以此时
// 不需要使用casTabAt
setTabAt(tab, i, e.next);
}
break;
}
//当前节点不是目标节点,继续遍历下一个节点
pred = e;
if ((e = e.next) == null)
//到达链表尾部,依旧没有找到,跳出循环
break;
}
}
//红黑树
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
//如果删除了节点,更新size
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}