前言

我们知道 HashMap 在并发中是不安全的,资源竞争激烈时常常会使用 ConcurrentHashMap,因为它可以支持并发读写。
与同是线程安全的老大哥 HashTable 相比,它更胜一筹,因为它的锁更加细化,而不是像 HashTable ,几乎为每个方法添加了 synchronized 锁,过度使用重量级锁势必会影响程序性能。
与 JDK1.7 相比,不再使用原有锁(Segment),利用原语 CAS 和 volatile 支持并发操作,引用了红黑树,不用过度使 hash 值离散,在查询效率上花费了不少心思。
ConcurrentHashMap 内部的方法很多,这篇文章只会介绍其中比较重要的方法。
内部结构
成员变量
值得注意的是 sizeCtl 这个变量,它在 ConcurrentHashMap 里面出镜率十分高,而且在不同的方法里有不同的用处,代表了不同的含义:
- 负数:hash 表正在初始化或扩容;
 - -1:hash 表正在初始化;
 - -N:有 n-1 个线程正在扩容;
 正数或 0:hash 表还未初始化,代表初始化或下一次进行扩容的大小。此时它的值是 ConcurrentHashMap 容量的 3/4,与
loadfactor是对应的。public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>implements ConcurrentMap<K,V>, Serializable {private static final long serialVersionUID = 7249069246763182397L;// 表的最大容量private static final int MAXIMUM_CAPACITY = 1 << 30;// 默认表的大小private static final int DEFAULT_CAPACITY = 16;// 最大数组大小static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;// 默认并发数private static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 装载因子private static final float LOAD_FACTOR = 0.75f;// 转化为红黑树的阈值static final int TREEIFY_THRESHOLD = 8;// 由红黑树转化为链表的阈值static final int UNTREEIFY_THRESHOLD = 6;// 转化为红黑树的表的最小容量static final int MIN_TREEIFY_CAPACITY = 64;// 每次进行转移的最小值private static final int MIN_TRANSFER_STRIDE = 16;// 生成sizeCtl所使用的bit位数private static int RESIZE_STAMP_BITS = 16;// 进行扩容所允许的最大线程数private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;// 记录sizeCtl中的大小所需要进行的偏移位数private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;// 一系列的标识static final int MOVED = -1; // hash for forwarding nodesstatic final int TREEBIN = -2; // hash for roots of treesstatic final int RESERVED = -3; // hash for transient reservationsstatic final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash///** Number of CPUS, to place bounds on some sizings */// 获取可用的CPU个数static final int NCPU = Runtime.getRuntime().availableProcessors();///** For serialization compatibility. */// 进行序列化的属性private static final ObjectStreamField[] serialPersistentFields = {new ObjectStreamField("segments", Segment[].class),new ObjectStreamField("segmentMask", Integer.TYPE),new ObjectStreamField("segmentShift", Integer.TYPE)};// 表transient volatile Node<K,V>[] table;// 下一个表private transient volatile Node<K,V>[] nextTable;///*** Base counter value, used mainly when there is no contention,* but also as a fallback during table initialization* races. Updated via CAS.*/// 基本计数private transient volatile long baseCount;///*** Table initialization and resizing control. When negative, the* table is being initialized or resized: -1 for initialization,* else -(1 + the number of active resizing threads). Otherwise,* when table is null, holds the initial table size to use upon* creation, or 0 for default. After initialization, holds the* next element count value upon which to resize the table.*//*** 重要控制变量* 根据变量的数值不同,类实例处于不同阶段* 1. = -1 : 正在初始化* 2. < -1 : 正在扩容,数值为 -(1 + 参与扩容的线程数)* 3. = 0 : 创建时初始为0* 4. > 0 : 下一次扩容的大小*/private transient volatile int sizeCtl;/*** The next table index (plus one) to split while resizing.*/// 扩容下另一个表的索引private transient volatile int transferIndex;/*** Spinlock (locked via CAS) used when resizing and/or creating CounterCells.*/// 旋转锁private transient volatile int cellsBusy;/*** Table of counter cells. When non-null, size is a power of 2.*/// counterCell表private transient volatile CounterCell[] counterCells;// views// 视图private transient KeySetView<K,V> keySet;private transient ValuesView<K,V> values;private transient EntrySetView<K,V> entrySet;// Unsafe mechanicsprivate static final sun.misc.Unsafe U;private static final long SIZECTL;private static final long TRANSFERINDEX;private static final long BASECOUNT;private static final long CELLSBUSY;private static final long CELLVALUE;private static final long ABASE;private static final int ASHIFT;static {try {U = sun.misc.Unsafe.getUnsafe();Class<?> k = ConcurrentHashMap.class;SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));TRANSFERINDEX = U.objectFieldOffset(k.getDeclaredField("transferIndex"));BASECOUNT = U.objectFieldOffset(k.getDeclaredField("baseCount"));CELLSBUSY = U.objectFieldOffset(k.getDeclaredField("cellsBusy"));Class<?> ck = CounterCell.class;CELLVALUE = U.objectFieldOffset(ck.getDeclaredField("value"));Class<?> ak = Node[].class;ABASE = U.arrayBaseOffset(ak);int scale = U.arrayIndexScale(ak);if ((scale & (scale - 1)) != 0)throw new Error("data type scale not a power of two");ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);} catch (Exception e) {throw new Error(e);}}}
Node
Node 是 ConcurrentHashMap 中最核心的内部类,它包装了 key-value 键值对,所有插入 ConcurrentHashMap 的数据都包装在这里面。它与 HashMap 内的 Node 很相似,但是对 value 和 next 属性添加了 volatile 修饰符,保证了可见性。
Node 中的 key 和 value 不能为 null。子类的 hash 值可以为负数,代表特殊的并发意义。
static class Node<K,V> implements Map.Entry<K,V> {// Node节点的hash值和key的hash值相同// TreeNode节点的hash值final int hash;final K key;volatile V val; //带有同步锁的value(保证可见性)volatile Node<K,V> next;//带有同步锁的next指针Node(inthash, K key, V val, Node<K,V> next) {this.hash = hash;this.key = key;this.val = val;this.next = next;}public final K getKey() { return key; }public final V getValue() { return val; }// HashMap调用Objects.hashCode(),最终也是调用Object.hashCode();效果一样public final int hashCode() { returnkey.hashCode() ^ val.hashCode(); }public final String toString(){ returnkey + "=" + val; }//不允许直接改变value的值public final V setValue(V value) { // 不允许修改value值,HashMap允许throw new UnsupportedOperationException();}// HashMap使用if (o == this),且嵌套if;concurrent使用&&public final boolean equals(Object o) {Object k, v, u; Map.Entry<?,?> e;return ((oinstanceof Map.Entry) &&(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&(v = e.getValue()) != null &&(k == key || k.equals(key)) &&(v == (u = val) || v.equals(u)));}// 增加find方法辅助get方法Node<K,V> find(inth, Object k) {Node<K,V> e = this;if (k != null) {do {K ek;if (e.hash == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))return e;/*** 以链表形式查找桶中下一个Node信息* 当转换为subclass红黑树节点TreeNode* 则使用TreeNode中的find进行查询操作*/} while ((e = e.next) != null);}return null;}}
TreeNode
Node 的子类,红黑树节点。当 Node 链表过长时,会转换为红黑树。
但它不是直接转换为红黑树,而是把这些节点包装成 TreeNode 放在 TreeBin 对象中,由 TreeBin 完成对红黑树的包装。而且 TreeNode 继承于 ConcurrentHashMap 的 Node,说明它也带有 next 指针,这样做的目的是方便 TreeBin 的访问。
// Nodes for use in TreeBins,链表>8,才可能转为TreeNode.// HashMap的TreeNode继承至LinkedHashMap.Entry;而这里继承至自己实现的Node,将带有next指针,便于treebin访问。static final class TreeNode<K,V> extends Node<K,V> {TreeNode<K,V> parent; // red-black tree linksTreeNode<K,V> left;TreeNode<K,V> right;TreeNode<K,V> prev; // needed to unlink next upon deletionboolean red;TreeNode(inthash, K key, V val, Node<K,V> next,TreeNode<K,V> parent) {super(hash, key, val, next);this.parent = parent;}Node<K,V> find(inth, Object k) {return findTreeNode(h, k, null);}/*** Returns the TreeNode (or null if not found) for the given key* starting at given root.*/ // 查找hash为h,key为k的节点final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {if (k != null) { // 比HMap增加判空TreeNode<K,V> p = this;do {int ph, dir; K pk; TreeNode<K,V> q;TreeNode<K,V> pl = p.left, pr = p.right;if ((ph = p.hash) > h)p = pl;else if (ph < h)p = pr;else if ((pk = p.key) == k || (pk != null && k.equals(pk)))returnp;else if (pl == null)p = pr;else if (pr == null)p = pl;else if ((kc != null ||(kc = comparableClassFor(k)) != null) &&(dir = compareComparables(kc, k, pk)) != 0)p = (dir < 0) ? pl : pr;else if ((q = pr.findTreeNode(h, k, kc)) != null)return q;elsep = pl;} while (p != null);}return null;}}// 和HashMap相比,这里的TreeNode相当简洁;ConcurrentHashMap链表转树时,并不会直接转,// 正如注释(Nodes for use in TreeBins)所说,只是把这些节点包装成TreeNode放到TreeBin中,// 再由TreeBin来转化红黑树。
TreeBin
红黑树的操作是针对 TreeBin 的,它会将 TreeNode 再一次封装。
它用作树的头节点,只存储 root 和 first 节点,不存储节点的 key、value 值。
// TreeBin用于封装维护TreeNode,包含putTreeVal、lookRoot、UNlookRoot、remove、// balanceInsetion、balanceDeletion等方法,这里只分析其构造函数。当链表转树时,// 用于封装TreeNode,也就是说,ConcurrentHashMap的红黑树存放的是TreeBin,而不是treeNode。TreeBin(TreeNode<K,V> b) {super(TREEBIN, null, null, null);//hash值为常量TREEBIN= -2, 表示 roots of treesthis.first = b;TreeNode<K,V> r = null;for (TreeNode<K,V> x = b, next; x != null; x = next) {next = (TreeNode<K,V>)x.next;x.left = x.right = null;if (r == null) {x.parent = null;x.red = false;r = x;}else {K k = x.key;int h = x.hash;Class<?> kc = null;for (TreeNode<K,V> p = r;;) {int dir, ph;K pk = p.key;if ((ph = p.hash) > h)dir = -1;elseif (ph < h)dir = 1;elseif ((kc == null &&(kc = comparableClassFor(k)) == null) ||(dir = compareComparables(kc, k, pk)) == 0)dir = tieBreakOrder(k, pk);TreeNode<K,V> xp = p;if ((p = (dir <= 0) ? p.left : p.right) == null) {x.parent = xp;if (dir <= 0)xp.left = x;elsexp.right = x;r = balanceInsertion(r, x);break;}}}}this.root = r;assert checkInvariants(root);}
ForwardingNode
ForwardingNode 在转移时放在头部的节点,是一个空节点。
它包含一个 nextTable 指针,用于指向下一张表。而且这个节点的 key、value 和 next 指针都是 null,其 hash 值为 -1。
这里定义的 find 方法是从 nextTable 里查询节点,而不是自身为头节点进行查询。
// A node inserted at head of bins during transfer operations.连接两个table// 并不是我们传统的包含key-value的节点,只是一个标志节点,并且指向nextTable,提供find方法而已。// 生命周期:仅存活于扩容操作且bin不为null时,一定会出现在每个bin的首位。static final class ForwardingNode<K,V> extends Node<K,V> {final Node<K,V>[] nextTable;ForwardingNode(Node<K,V>[] tab) {super(MOVED, null, null, null); // 此节点hash=-1,key、value、next均为nullthis.nextTable = tab;}Node<K,V> find(int h, Object k) {// 查nextTable节点,outer避免深度递归outer: for (Node<K,V>[] tab = nextTable;;) {Node<K,V> e; intn;if (k == null || tab == null || (n = tab.length) == 0 ||(e = tabAt(tab, (n - 1) & h)) == null)return null;for (;;) { // CAS算法多和死循环搭配!直到查到或nullint eh; K ek;if ((eh = e.hash) == h &&((ek = e.key) == k || (ek != null && k.equals(ek))))returne;if (eh < 0) {if (e instanceof ForwardingNode) {tab = ((ForwardingNode<K,V>)e).nextTable;continue outer;}elsereturn e.find(h, k);}if ((e = e.next) == null)return null;}}}}
核心方法
构造方法
在使用 ConcurrentHashMap 第一件事自然而然就是 new 出来一个 ConcurrentHashMap 对象,一共提供了如下几个构造器方法:
// 1. 构造一个空的map,即table数组还未初始化,初始化放在第一次插入数据时,默认大小为16ConcurrentHashMap()// 2. 给定map的大小ConcurrentHashMap(int initialCapacity)// 3. 给定一个mapConcurrentHashMap(Map<? extends K, ? extends V> m)// 4. 给定map的大小以及加载因子ConcurrentHashMap(int initialCapacity, float loadFactor)// 5. 给定map大小,加载因子以及并发度(预计同时操作数据的线程)ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
我们来看看第2种构造器,传入指定大小时的情况,该构造器源码为:
public ConcurrentHashMap(int initialCapacity) {//1. 小于0直接抛异常if (initialCapacity < 0)throw new IllegalArgumentException();//2. 判断是否超过了允许的最大值,超过了话则取最大值,否则再对该值进一步处理int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));//3. 赋值给sizeCtlthis.sizeCtl = cap;}
这段代码的逻辑请看注释,很容易理解,如果小于 0 就直接抛出异常,如果指定值大于了所允许的最大值的话就取最大值,否则,在对指定值做进一步处理。最后将 cap 赋值给 sizeCtl,关于 sizeCtl 的说明请看上面的说明,当调用构造器方法之后,sizeCtl 的大小应该就代表了 ConcurrentHashMap 的大小,即 table 数组长度。tableSizeFor 做了哪些事情了?源码为:
private static final int tableSizeFor(int c) {int n = c - 1;n |= n >>> 1;n |= n >>> 2;n |= n >>> 4;n |= n >>> 8;n |= n >>> 16;return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;}
该方法会将调用构造器方法时指定的大小转换成一个 2 的幂次方数,也就是说 ConcurrentHashMap 的大小一定是 2 的幂次方,比如,当指定大小为 18 时,为了满足 2 的幂次方特性,实际上 ConcurrentHashMap 的大小为 2 的 5 次方(32)。
另外,需要注意的是,调用构造器方法的时候并未构造出 table 数组(可以理解为 ConcurrentHashMap 的数据容器),只是算出 table 数组的长度,当第一次向 ConcurrentHashMap 插入数据的时候才真正的完成初始化创建 table 数组的工作。
定位节点位置
ConcurrentHashMap 有几个方法可以快速定位元素在数组的位置,并且提供 CAS 原子操作。
@SuppressWarnings("unchecked")//获得在i位置上的Node节点static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}/**这边为什么i要等于((long)i << ASHIFT) + ABASE呢,计算偏移量*ASHIFT是指tab[i]中第i个元素在相对于数组第一个元素的偏移量,而ABASE就算第一数组的内存素的偏移地址*所以呢,((long)i << ASHIFT) + ABASE就算i最后的地址* 那么compareAndSwapObject的作用就算tab[i]和c比较,如果相等就tab[i]=v否则tab[i]=c;*///利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少//在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改//因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 有点类似于SVNstatic final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);}//利用volatile方法设置节点位置的值static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
putVal
putVal 是将一个新 key-value 插入到当前 ConcurrentHashMap 的关键方法。
此方法的具体流程是:
final V putVal(K key, V value, boolean onlyIfAbsent) {// 不允许 key 和 value 为空if (key == null || value == null) throw new NullPointerException();// 1.计算 key 的 hash 值(计算新节点的hash值)int hash = spread(key.hashCode()); // 返回 (h^(h>>>16))&HASH_BITSint binCount = 0;// 获取当前table,进入死循环,直到插入成功!for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 2. 如果当前 table 还没初始化先调用 initTable 方法将 tab 进行初始化if (tab == null || (n = tab.length) == 0)tab = initTable(); // 如果table为空,执行初始化,也即是延迟初始化// 3. tab中索引为i的位置的元素为null,则直接使用 CAS 将值插入即可// 如果bin为空,则采用cas算法赋值,无需加锁else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))// 直接设置为桶首节点成功,退出死循环(出口之一)break;}// 4. 当前正在扩容// 当前桶首节点正在特殊的扩容状态下,当前线程尝试参与扩容// 然后重新进入死循环//f.hash == MOVED 表示为:ForwardingNode,说明其他线程正在扩容else if ((fh = f.hash) == MOVED) // MOVED = -1tab = helpTransfer(tab, f); // 当发现其他线程扩容时,帮其扩容// 通过桶首节点,将新节点加入tableelse {V oldVal = null;// 获取桶首节点实例对象锁,进入临界区进行添加操作synchronized (f) {// 再判断以此f是否仍是第一个Node,如果不是,退出临界区,重复添加操作if (tabAt(tab, i) == f) {//5. 当前为链表,在链表中插入新的键值对if (fh >= 0) { // 桶首节点hash值>0,表示为链表binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;// 找到hash值相同的key,覆盖旧值即可if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;// 仅 putIfAbsent() 方法中的 onlyIfAbsend 为 true;if (!onlyIfAbsent)// putIfAbsend() 包含 key 则返回 get ,否则 put 并返回e.val = value;break;}Node<K,V> pred = e;//如果到链表末尾仍未找到,则直接将新值插入到链表末尾即可if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 桶首节点为Node子类型TreeBin,表示为红黑树// 6.当前为红黑树,将新的键值对插入到红黑树中else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;// 调用putTreeVal方法,插入新值if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {// key已经存在,则替换oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 7.插入完键值对后再根据实际大小看是否需要转换成红黑树if (binCount != 0) {if (binCount >= TREEIFY_THRESHOLD)// 插入新节点后,达到链表转换红黑树阈值,则执行转换操作// 此函数内部会判断是树化,还是扩容:tryPresizetreeifyBin(tab, i);// 退出死循环(出口之二)if (oldVal != null)return oldVal;break;}}}// 更新计算count时的base和counterCells数组//8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容addCount(1L, binCount);return null;}
当 table[i] 为链表的头节点,在链表中插入新值时,table[i] 不为 null,也不是 ForwardingNode,且当前 Node f 的 hash 值大于 0(fh>0),说明当前节点 f 为当前链表的头节点。如果在链表中找到了与 key 相同的节点,直接覆盖,如果整条链表上都没找到,就直接将新节点追加到链表的尾部。
如果 table[i] 为 null,直接设置链表头节点。
spread 方法
该方法主要将 key 的 hashCode 的低 16 位和高 16 位进行异或运算。JDK1.8 引进了红黑树,即使冲突了查询的效率也很高,所以这里的位运算比以前减少了。
static final int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}
initTable 方法
数组初始化方法,允许多线程同时进入,但只有一个线程可以完成 table 的初始化,其它线程都会通过 yield 方法让出 CPU。
private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {// 前文提及sizeCtl是重要的控制变量// sizeCtl = -1 表示正在初始化if ((sc = sizeCtl) < 0)// 已经有其他线程在执行初始化,则主动让出cpu// 1. 保证只有一个线程正在进行初始化操作Thread.yield();// 利用CAS操作设置sizeCtl为-1// 设置成功表示当前线程为执行初始化的唯一线程// 此处进入临界区else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {// 由于让出cpu的线程也会后续进入该临界区// 需要进行再次确认table是否为nullif ((tab = table) == null || tab.length == 0) {// 2. 得出数组的大小int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")// 3. 这里才真正的初始化数组,即分配Node数组Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;// 默认负载为0.75// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)sc = n - (n >>> 2);}} finally {sizeCtl = sc;}// 退出死循环的唯一出口break;}}return tab;}
为了保证能够正确初始化,如果当前已经有一个线程正在初始化(sizeCtl=-1),其它线程判断 sizeCtl == -1 为 true 后调用 Thread.yield 方法让出 CPU 时间片。正在初始化的线程会调用 U.conpareAndSwapInt 原子方法将 sizeCtl 修改为 -1。
初始化成功后,将 sizeCtl 设置为数组长度的 3/4。这里计算 sizeCtl 也挺有意思的,直接使用位运算 n - (n >>> 2) 。
threeifyBin 方法
private final void treeifyBin(Node<K,V>[] tab, int index) {Node<K,V> b; intn, sc;if (tab != null) {// 数组的大小还未超过64if ((n = tab.length) < MIN_TREEIFY_CAPACITY)tryPresize(n << 1); // 容量<64,则table两倍扩容,不转树了else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {synchronized (b) { // 读写锁if (tabAt(tab, index) == b) {TreeNode<K,V> hd = null, tl = null;for (Node<K,V> e = b; e != null; e = e.next) {TreeNode<K,V> p =new TreeNode<K,V>(e.hash, e.key, e.val,null, null);if ((p.prev = tl) == null)hd = p;elsetl.next = p;tl = p;}setTabAt(tab, index, new TreeBin<K,V>(hd));}}}}}
tryPresize(扩容)
private final void tryPresize(int size) {//计算扩容的目标size// 给定的容量若>=MAXIMUM_CAPACITY的一半,直接扩容到允许的最大值,否则调用函数扩容int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :tableSizeFor(size + (size >>> 1) + 1);int sc;while ((sc = sizeCtl) >= 0) { //没有正在初始化或扩容,或者说表还没有被初始化Node<K,V>[] tab = table; int n;//tab没有初始化if(tab == null || (n = tab.length) == 0) {n = (sc > c) ? sc : c; // 扩容阀值取较大者// 期间没有其他线程对表操作,则CAS将SIZECTL状态置为-1,表示正在进行初始化//初始化之前,CAS设置sizeCtl=-1if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if (table == tab) {@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = nt;//sc=0.75n,相当于扩容阈值sc = n - (n >>> 2); //无符号右移2位,此即0.75*n}} finally {// 此时并没有通过CAS赋值,因为其他想要执行初始化的线程,// 发现sizeCtl=-1,就直接返回,从而确保任何情况,// 只会有一个线程执行初始化操作。sizeCtl = sc;}}}// 若欲扩容值不大于原阀值,或现有容量>=最值,什么都不用做了//目标扩容size小于扩容阈值,或者容量超过最大限制时,不需要扩容else if (c <= sc || n >= MAXIMUM_CAPACITY)break;//扩容else if (tab == table) {int rs = resizeStamp(n);// sc<0表示,已经有其他线程正在扩容if (sc < 0) {Node<K,V>[] nt;//RESIZE_STAMP_SHIFT=16,MAX_RESIZERS=2^15-1// 1. (sc >>> RESIZE_STAMP_SHIFT) != rs :扩容线程数 > MAX_RESIZERS-1// 2. sc == rs + 1 和 sc == rs + MAX_RESIZERS :表示什么???// 3. (nt = nextTable) == null :表示nextTable正在初始化// transferIndex <= 0 :表示所有hash桶均分配出去if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0)//如果不需要帮其扩容,直接返回break;//CAS设置sizeCtl=sizeCtl+1if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))//帮其扩容transfer(tab, nt);}// 第一个执行扩容操作的线程,将sizeCtl设置为:// (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))transfer(tab, null);}}}
transfer 方法
/*** Moves and/or copies the nodes in each bin to new table. See* above for explanation.** transferIndex 表示转移时的下标,初始为扩容前的 length。** 我们假设长度是 32*/private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {int n = tab.length, stride;// 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。// 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围 stridea:TODO// 新的 table 尚未初始化if (nextTab == null) { // initiatingtry {// 扩容 2 倍Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];// 更新nextTab = nt;} catch (Throwable ex) { // try to cope with OOME// 扩容失败, sizeCtl 使用 int 最大值。sizeCtl = Integer.MAX_VALUE;return;// 结束}// 更新成员变量nextTable = nextTab;// 更新转移下标,就是 老的 tab 的 lengthtransferIndex = n;}// 新 tab 的 lengthint nextn = nextTab.length;// 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);// 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进boolean advance = true;// 完成状态,如果是 true,就结束此方法。boolean finishing = false; // to ensure sweep before committing nextTab// 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标for (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 如果当前线程可以向后推进;这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要转移的桶的区间while (advance) {int nextIndex, nextBound;// 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)// 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。// 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。if (--i >= bound || finishing)advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进// 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。else if ((nextIndex = transferIndex) <= 0) {// 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了// 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断i = -1;advance = false;// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进}// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;// 这个值就是当前线程可以处理的最小当前区间最小下标i = nextIndex - 1; // 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。}}// 如果 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束)// 如果 i >= tab.length(不知道为什么这么判断)// 如果 i + tab.length >= nextTable.length (不知道为什么这么判断)if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) { // 如果完成了扩容nextTable = null;// 删除成员变量table = nextTab;// 更新 tablesizeCtl = (n << 1) - (n >>> 1); // 更新阈值return;// 结束方法。}// 如果没完成if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一。if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。return;// 不相等,说明没结束,当前线程结束方法。finishing = advance = true;// 如果相等,扩容结束了,更新 finising 变量i = n; // 再次循环检查一下整张表}}else if ((f = tabAt(tab, i)) == null) // 获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。advance = casTabAt(tab, i, null, fwd);// 如果成功写入 fwd 占位,再次推进一个下标else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED。advance = true; // already processed // 说明别的线程已经处理过了,再次推进一个下标else {// 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据synchronized (f) {// 判断 i 下标处的桶节点是否和 f 相同if (tabAt(tab, i) == f) {Node<K,V> ln, hn;// low, height 高位桶,低位桶// 如果 f 的 hash 值大于 0 。TreeBin 的 hash 是 -2if (fh >= 0) {// 对老长度进行与运算(第一个操作数的的第n位于第二个操作数的第n位如果都是1,那么结果的第n为也为1,否则为0)// 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1// 如果是结果是0 ,Doug Lea 将其放在低位,反之放在高位,目的是将链表重新 hash,放到对应的位置上,让新的取于算法能够击中他。int runBit = fh & n;Node<K,V> lastRun = f; // 尾节点,且和头节点的 hash 值取于不相等// 遍历这个桶for (Node<K,V> p = f.next; p != null; p = p.next) {// 取于桶中每个节点的 hash 值int b = p.hash & n;// 如果节点的 hash 值和首节点的 hash 值取于结果不同if (b != runBit) {runBit = b; // 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。lastRun = p; // 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环}}if (runBit == 0) {// 如果最后更新的 runBit 是 0 ,设置低位节点ln = lastRun;hn = null;}else {hn = lastRun; // 如果最后更新的 runBit 是 1, 设置高位节点ln = null;}// 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果)for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;// 如果与运算结果是 0,那么就还在低位if ((ph & n) == 0) // 如果是0 ,那么创建低位节点ln = new Node<K,V>(ph, pk, pv, ln);else // 1 则创建高位hn = new Node<K,V>(ph, pk, pv, hn);}// 其实这里类似 hashMap// 设置低位链表放在新链表的 isetTabAt(nextTab, i, ln);// 设置高位链表,在原有长度上加 nsetTabAt(nextTab, i + n, hn);// 将旧的链表设置成占位符setTabAt(tab, i, fwd);// 继续向后推进advance = true;}// 如果是红黑树else if (f instanceof TreeBin) {TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> lo = null, loTail = null;TreeNode<K,V> hi = null, hiTail = null;int lc = 0, hc = 0;// 遍历for (Node<K,V> e = t.first; e != null; e = e.next) {int h = e.hash;TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);// 和链表相同的判断,与运算 == 0 的放在低位if ((h & n) == 0) {if ((p.prev = loTail) == null)lo = p;elseloTail.next = p;loTail = p;++lc;} // 不是 0 的放在高位else {if ((p.prev = hiTail) == null)hi = p;elsehiTail.next = p;hiTail = p;++hc;}}// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :(lc != 0) ? new TreeBin<K,V>(hi) : t;// 低位树setTabAt(nextTab, i, ln);// 高位数setTabAt(nextTab, i + n, hn);// 旧的设置成占位符setTabAt(tab, i, fwd);// 继续向后推进advance = true;}}}}}}
get 方法
public V get(Object key) {Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;// 1. 重hashint h = spread(key.hashCode());// 2. table[i]桶节点的key与查找的key相同,则直接返回if ((tab = table) != null && (n = tab.length) > 0 &&// 唯一一处volatile读操作(e = tabAt(tab, (n - 1) & h)) != null) {// 注意:因为容器大小为2的次方,所以 h mod n = h & (n -1)if ((eh = e.hash) == h) {// 如果hash值相等// 检查第一个Nodeif ((ek = e.key) == key || (ek != null && key.equals(ek)))return e.val;}// hash为负表示是扩容中的ForwardingNode节点// 直接调用ForwardingNode的find方法(可以是代理到扩容中的nextTable)// 3. 当前节点hash小于0说明为树节点,在红黑树中查找即可else if (eh < 0)return (p = e.find(h, key)) != null ? p.val : null;// 遍历链表,对比key值// 通过next指针,逐一查找while ((e = e.next) != null) {//4. 从链表中查找,查找到则返回该节点的value,否则就返回null即可if (e.hash == h &&((ek = e.key) == key || (ek != null && key.equals(ek))))return e.val;}}return null;}
这个 get 请求,我们需要 cas 来保证变量的原子性。如果 tab[i] 正被锁住,那么 CAS 就会失败,失败之后就会不断的重试。这也保证了在高并发情况下不会出错。
我们来分析一下哪些情况会导致 get 在并发的情况下可能取不到值。
- 一个线程在 get 的时候,另一个线程在对同一个 key 的 node 进行 remove 操作
 - 一个线程在 get 的时候,另一个线程正在重排 table 。可能导致旧 table 取不到值
 
那么本质是,在 get 的时候,有其他线程在对同一桶的链表或树进行修改。那么 get 是怎么保证同步性的呢?我们看到 e = tabAt(tab, (n - 1) & h)) != null,在看下 tablAt 到底是干嘛的:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}
它是对 tab[i] 进行原子性的读取,因为我们知道 putVal 等对 table 的桶操作是有加锁的,那么一般情况下我们对桶的读也是要加锁的,但是我们这边为什么不需要加锁呢?因为我们用了 Unsafe 的 getObjectVolatile,因为 table 是 volatile 类型,所以对 tab[i] 的原子请求也是可见的。
因为如果同步正确的情况下,根据 happens-before 原则,对 volatile 域的写入操作 happens-before 于每一个后续对同一域的读操作。所以不管其他线程对 table 链表或树的修改,都对 get 读取可见。
remove 方法
和 put 方法一样,多个 remove 线程请求不同的hash桶时,可以并发执行。
删除的 node 节点的 next 依然指着下一个元素。此时若有一个遍历线程正在遍历这个已经删除的节点,这个遍历线程依然可以通过 next 属性访问下一个元素。从遍历线程的角度看,他并没有感知到此节点已经删除了,这说明了 ConcurrentHashMap 提供了弱一致性的迭代器。
public V remove(Object key) {return replaceNode(key, null, null);}// 当参数 value == null 时,删除节点。否则更新节点的值为value// cv 是个期望值,当 map[key].value 等于期望值 cv 或 cv == null 时,// 删除节点,或者更新节点的值final V replaceNode(Object key, V value, Object cv) {int hash = spread(key.hashCode());for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// table 还没初始化或key对应的 hash 桶为空if (tab == null || (n = tab.length) == 0 ||(f = tabAt(tab, i = (n - 1) & hash)) == null)break;// 正在扩容else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;boolean validated = false;synchronized (f) {// CAS 获取 tab[i] ,如果此时 tab[i] != f,说明其他线程修改了 tab[i]// 回到 for 循环开始处,重新执行if (tabAt(tab, i) == f) {// node 链表if (fh >= 0) {validated = true;for (Node<K,V> e = f, pred = null;;) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {V ev = e.val;// ev 代表参数期望值// cv == null:直接更新value/删除节点// cv 不为空,则只有在 key 的 oldVal 等于// 期望值的时候,才更新 value/删除节点if (cv == null || cv == ev ||(ev != null && cv.equals(ev))) {oldVal = ev;//更新valueif (value != null)e.val = value;//删除非头节点else if (pred != null)pred.next = e.next;//删除头节点else// 因为已经获取了头结点锁,所以此时// 不需要使用casTabAtsetTabAt(tab, i, e.next);}break;}//当前节点不是目标节点,继续遍历下一个节点pred = e;if ((e = e.next) == null)//到达链表尾部,依旧没有找到,跳出循环break;}}//红黑树else if (f instanceof TreeBin) {validated = true;TreeBin<K,V> t = (TreeBin<K,V>)f;TreeNode<K,V> r, p;if ((r = t.root) != null &&(p = r.findTreeNode(hash, key, null)) != null) {V pv = p.val;if (cv == null || cv == pv ||(pv != null && cv.equals(pv))) {oldVal = pv;if (value != null)p.val = value;else if (t.removeTreeNode(p))setTabAt(tab, i, untreeify(t.first));}}}}}if (validated) {if (oldVal != null) {//如果删除了节点,更新sizeif (value == null)addCount(-1L, -1);return oldVal;}break;}}}return null;}
