哈希值:
- 大于 0 则是链表
- -1 是 ForwardingNode
- -2 是 TreeBin:红黑树 TreeNode 的上层对象
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
putVal:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 初始化数组 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 如果目标位置为 null,通过 cas 设置新节点 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // 如果 CAS 成功,则 break break; // no lock when adding to empty bin } // 如果正在扩容,当前节点已经被移动了。则使用当前线程一起扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // 对链表或红黑树的头节点加锁 synchronized (f) { // 判断该位置的元素还是否为f。因为有可能被 remove if (tabAt(tab, i) == f) { // 头节点的hash>=0,则为链表 if (fh >= 0) { binCount = 1; // 遍历链表,同时统计个数 for (Node<K,V> e = f;; ++binCount) { K ek; // 替换 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; // 如果到尾节点,新增 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 如果是红黑树 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 释放锁了。减少锁的颗粒度。treeifyBin 里面会加锁 if (binCount != 0) { // 如果链表大于阈值,则转成红黑树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 统计个数 addCount(1L, binCount); return null; }
initTable:
```java // 负数:-1:初始化,-(1+n):扩容,n为线程数 // 正数:初始容量用于初始化,初始化完成后,保存阈值 private transient volatile int sizeCtl;
private final Node
<a name="GBguW"></a>
### treeifyBin:
- 树化:里面加锁
```java
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// 尝试扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
// 如果不为null,并且是链表
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 对头节点加锁
synchronized (b) {
// 判断一下头节点
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
// 转换成 TreeNode 节点,此时还不是红黑树
// TreeNode:parent,left,right,prev
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 转成红黑树 TreeBin,并 set 到数组
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
引用 TreeBin 而不是 TreeNode:
- 插入的时候红黑树的根节点会发生变化,此时加锁就会出问题
所以在 TreeBin 内部保存 root,加锁只需要针对 TreeBin 对象即可
addCount:
CounterCell[] 记录个数,多线程
- 扩容
check:if<0, 不扩容,if <= 1,只有在没竞争的情况下检查是否要扩容
- check=2:红黑树
- check=1:当前链表节点个数为1
- check=-1:remove 的时候,就不需要扩容
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 如果 counterCells 为 null,则尝试加到 baseCount // 如果 counterCells 不为 null,或者 baseCount 加失败了 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // as==null 说明 baseCount更新失败。as长度为0 // 当前线程的位置 as[x]==null // 使用 CAS,将当前线程对应的 CounterCell 位置的值+x,失败 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 如果尝试更新失败 fullAddCount(x, uncontended); return; } if (check <= 1) return; // 得到元素个数 s = sumCount(); } // 扩容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 如果元素总个数>=阈值sizeCtl while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }
扩容可以是多线程的
fullAddCount:
LongAdder 类的实现逻辑类似
private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; // 如果 counterCells 已经被创建 if ((as = counterCells) != null && (n = as.length) > 0) { // 判断当前位置的元素为null if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell // 先创建,乐观 CounterCell r = new CounterCell(x); // Optimistic create // 尝试获取锁 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; // 再检查一次。防止获取锁之前,已经被其他线程创建了CounterCell对象 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 赋值 rs[j] = r; created = true; } } finally { // 释放锁 cellsBusy = 0; } // 成功创建即退出 if (created) break; // 不成功则跳过。说明被其他线程创建了CounterCell对象 // 下次循环就会进入其他分支处理 continue; // Slot is now non-empty } } // 如果 cellsBusy 不为 0,或者 CAS 失败。 // 则说明有另外一个线程持有锁。则不扩容 collide = false; } // 如果当前元素不为null // 如果外面CAS失败了,则重置标记,并且结束这次的 if-else。到下面重新hash,新循环 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash // 尝试 CAS 直接更新。成功就退出循环 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; // 如果其他线程把 counterCells 扩容了,或者数组大小>=CPU核心数,停止扩容 else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale // collide 用来控制是否扩容,true才能到扩容分支 // 第一次循环,false->true,然后rehash,第二次循环就到扩容分支 // 两次循环前面的都不能满足,说明冲突太大,则扩容 else if (!collide) collide = true; // 扩容 counterCells,也需要获取锁 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { // Recheck if (counterCells == as) {// Expand table unless stale // 扩容为原来的两倍 CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } // 生成新的 hash,较少冲突 h = ThreadLocalRandom.advanceProbe(h); } // 如果 counterCells 没有被创建,获取锁 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { // 创建 counterCells,大小为 2 CounterCell[] rs = ne w CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } // 如果初始化成功,直接退出返回 if (init) break; } // 如果上面的 if 都获取不到锁,则尝试加 baseCount。Fallback else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }
对于同一个线程,wasUncontended=false 连续两次操作 counterCells 都没有成功,就会触发扩容
- wasUncontended=false 则三次
- 每次循环都会修改线程的 hash
扩容:
addCount 节选:
- 多线程扩容,每次扩容都有一个步长,线程按照步长进行组转移
如果只有一个线程,则每次转移一个组,依次把全部组都转移
... // 扩容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 如果元素总个数>=阈值sizeCtl // while 循环,用于连续扩容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // 其他帮忙扩容的线程 if (sc < 0) { // 如果 nextTable 还是 null,说明第一个线程还未初始化它,则退出 // 如果 transferIndex<=0,说明当前线程已经完成转移 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 每个线程帮忙扩容时,会先将 sc+1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 第一个线程扩容,会初始化 nextTable else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } private static int RESIZE_STAMP_BITS = 16; private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
transfer:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 计算步长,最小16。每个线程都是固定的 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") // 创建新数组 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 赋值给成员属性 nextTable = nextTab; // 旧数组长度 transferIndex = n; } // 新数组长度 int nextn = nextTab.length; // ForwardingNode 用来标识当前对象正在被转移,Put 中会判断 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 当前线程是否要继续计算组的范围 boolean advance = true; // 当前线程是否已完成转移任务 boolean finishing = false; // to ensure sweep before committing nextTab // 一个步长的范围 bound <= n <= i for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; // 如果在范围内,或者已完成,则跳出while往下执行 // 当线程处理完一个位置的元素后,就会走这个分支来定位下一个位置 if (--i >= bound || finishing) advance = false; // 当有线程处理最后一组时,transferIndex=0 // 退出循环,到下面的处理 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } // 每个线程获取当前处理的组的范围。 // - 从旧数组的最高位开始,i=最高位,nextBound=最高位-步长 // nextBound会写入transferIndex,当其他线程来的时候,就会计算新的范围 // 如果CAS失败,会继续循环 // 当一个线程处理完一组后,会重新进来获取下一个组来处理 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } // 结束判断 if (i < 0 || i >= n || i + n >= nextn) { int sc; // 当所有线程都完成了 if (finishing) { // 将新数组赋值给 table nextTable = null; table = nextTab; // 设置新的阈值 sizeCtl = (n << 1) - (n >>> 1); return; } // 每有一个线程完成,则将 sc - 1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sc 的初始值为:rs << RESIZE_STAMP_SHIFT) + 2) // 当 sc 没有恢复到初始值,说明还有线程正在扩容 // 不过当前线程的任务完成了,所以可以返回 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; // 所有线程都完成了 finishing = advance = true; i = n; // recheck before commit } } // 如果当前位置没有元素,则放置 fwd 对象 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 如果当前位置已经处理过了,则继续下一个位置 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 锁柱当前位置的头元素。其他线程put不进来 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // 如果是链表。和 1.7 是一样的,只不过先保存到局部变量 if (fh >= 0) { // 根据新数组长度,计算当前位置 hash 的最高位 int runBit = fh & n; Node<K,V> lastRun = f; // 先把末尾重新哈希后位置一样的链表,保存 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 如果是0,说明在低位。否则在高位 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // 然后再复制其他节点 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 放置两个链表 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); // 将 fwd 放置到旧数组的对应位置 setTabAt(tab, i, fwd); advance = true; } // 红黑树 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 先转换成两个链表 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 再分别判断需不需要转换成红黑树 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
helpTransfer:
当前线程帮助转移,单肯定不是第一个扩容的线程 ```java final Node
[] helpTransfer(Node [] tab, Node f) { Node [] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab;
} // 返回的还是旧的 table return table; }
<a name="ePreE"></a>
## size:
- baseCount + 每个 counterCells 的值
```java
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
get:
- 直接根据 hash 定位查找
如果 hash<0,说明不是链表,则调用各自的 find 方法
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
ForwardingNode.find:
到新的 table 上查找
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes // 到新的表上查找 outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; // 找到,返回 if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { // 如果是 ForwardingNode,说明发生了二次扩容 if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else // 如果是红黑树节点 return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
一致性:
1.7 是弱一致性的:
put 操作对 get 不是立即可见的。
- 虽然 put 操作加了锁,但是 get 操作并没有加锁
- Entry 对象中的 value 和 next 都没用 volatile
从 happen-before 原则来看,put 中对 value 的赋值操作,并不一定是 hb 于 get value 操作
1.8 是强一致性的:
- Node 的 val 和 next 都使用了 volatile
- 其他线程对 val 的修改,立即对当前线程可见