1. CurrentHashMap概念

JDK8中ConcurrentHashMap参考了JDK8 HashMap的实现,采用了数组+链表+红黑树的实现方式来设计,内部大量采用CAS操作,这里我简要介绍下CAS。
CAS是compare and swap的缩写,即我们所说的比较交换。cas是一种基于锁的操作,而且是乐观锁。在java中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加version来获取数据,性能较悲观锁有很大的提高。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存地址里面的值和A的值是一样的,那么就将内存里面的值更新成B。CAS是通过无限循环来获取数据的,若果在第一轮循环中,a线程获取地址里面的值被b线程修改了,那么a线程需要自旋,到下次循环才有可能机会执行。
JDK8中彻底放弃了Segment转而采用的是Node,其设计思想也不再是JDK1.7中的分段锁思想。
Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。

  1. static class Node<K,V> implements Map.Entry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V val;
  5. volatile Node<K,V> next;
  6. Node(int hash, K key, V val, Node<K,V> next) {
  7. this.hash = hash;
  8. this.key = key;
  9. this.val = val;
  10. this.next = next;
  11. }
  12. ...
  13. }

2. CurrentHashMap实现原理

2.1 构造函数

  1. //带1个参数构造器
  2. public ConcurrentHashMap(int initialCapacity) {
  3. // 小于抛出异常
  4. if (initialCapacity < 0)
  5. throw new IllegalArgumentException();
  6. // 对于给定的预期容量作出合理规划。注:MAXIMUM_CAPACITY为2的30次方,MAXIMUM_CAPACITY >>> 1为2的30次幂为536870912,一般initialCapacity不会设置这么大的
  7. int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
  8. MAXIMUM_CAPACITY :
  9. tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
  10. //tableSizeFor方法可以转换为sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】
  11. // 例如,initialCapacity为7,sizeCtl=cap=16
  12. this.sizeCtl = cap;
  13. }

2.2 put

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }
  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. // key键和value值不能为 null
  3. if (key == null || value == null) throw new NullPointerException();
  4. // 计算hash值,将Key的hashCode值与其高16位作异或再按位与int的最大值从而保证最高位为0(从而保证最终结果为正整数)
  5. // 通过spread函数,int hash = (key.hashCode() ^ (key.hashCode() >>> 16)) & HASH_BITS
  6. // HASH_BITS=int型的最大值,即十六进制0x7fffffff,二进制 0111 1111 1111 1111 1111 1111 1111 1111
  7. int hash = spread(key.hashCode());
  8. // 局部变量,binCount默认是0,只有hash冲突了才会大于1.且他的大小是链表的长度(如果不是红黑数结构的话)。
  9. int binCount = 0;
  10. //循环,因为后面是CAS操作,可能会需要大量的重试
  11. for (Node<K,V>[] tab = table;;) {
  12. Node<K,V> f; int n, i, fh;
  13. // 如果没数组为空,调用initTable方法初始化创建数组
  14. if (tab == null || (n = tab.length) == 0)
  15. tab = initTable();
  16. // 找到下标,如果为空,采用CAS进行插入
  17. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  18. //如果没放成功,继续向下走,因为这肯定是出现了并发操作,所以去判断没放成功的理由.如果放成功了,那就结束循环
  19. if (casTabAt(tab, i, null,
  20. new Node<K,V>(hash, key, value, null)))
  21. break; // no lock when adding to empty bin
  22. }
  23. // 如果 hash 冲突了,且 hash 值为 -1,说明是 forwarding node 对象(这是一个占位符对象,保存了扩容后的容器)
  24. else if ((fh = f.hash) == MOVED)
  25. tab = helpTransfer(tab, f);// 帮助数据迁移
  26. else { // 这里就是数组已经有元素了,这时候就该挂链表或者挂树了
  27. V oldVal = null;
  28. // 获取头节点的监视器锁
  29. synchronized (f) {
  30. if (tabAt(tab, i) == f) {
  31. // 头节点的hash值,大于0表示这下面有点东西
  32. if (fh >= 0) {
  33. binCount = 1;
  34. for (Node<K,V> e = f;; ++binCount) { //for循环,表示遍历链表,循环一次后binCount加1
  35. K ek;
  36. // 如果发现了"相等"的 key,判断是否要进行值覆盖
  37. if (e.hash == hash &&
  38. ((ek = e.key) == key ||
  39. (ek != null && key.equals(ek)))) {
  40. oldVal = e.val;
  41. if (!onlyIfAbsent)
  42. e.val = value;
  43. break;
  44. }
  45. Node<K,V> pred = e;
  46. // 到最后了没重复的key,就把新值向后面挂
  47. if ((e = e.next) == null) {
  48. pred.next = new Node<K,V>(hash, key,
  49. value, null);
  50. break;
  51. }
  52. }
  53. }
  54. // 如果是个树
  55. else if (f instanceof TreeBin) {
  56. Node<K,V> p;
  57. binCount = 2;
  58. // 插节点,调用TreeBin的putTreeVal方法
  59. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  60. value)) != null) {
  61. oldVal = p.val;
  62. if (!onlyIfAbsent)
  63. p.val = value;
  64. }
  65. }
  66. }
  67. }
  68. if (binCount != 0) {
  69. // 判断链表的长度,如果大于8,然后转树
  70. if (binCount >= TREEIFY_THRESHOLD)
  71. // 这里要注意一个地方!!!!! --不是说像HashMap那样转树就没事了
  72. // 这里涉及到一个核心思路,CurrentHashMap做了优化,这里如果数组长度小于64,它会先扩容,扩容代表什么含义
  73. // -- 原来的链表会被1分为2 分别散落在不同的节点上
  74. treeifyBin(tab, i);
  75. // 如果key已存在,有原值,返回原值
  76. if (oldVal != null)
  77. return oldVal;
  78. break; // 结束外层死循环
  79. }
  80. }
  81. }
  82. // 元素计数加1,根据binCount来检验是否需要检查和扩容
  83. addCount(1L, binCount);
  84. return null;
  85. }

2.2.1 initTable方法进行数组初始化

  1. // 在上面的putVal源码里,当数组为空或长度为0的时候,需初始化,调用了initTable()方法
  2. private final Node<K,V>[] initTable() {
  3. Node<K,V>[] tab; int sc;
  4. while ((tab = table) == null || tab.length == 0) {
  5. // 需要注意的是,当整形的变量sc(即sizeCtl)小于0,那么说明有其他线程在在扩容,就调用线程的yield()方法
  6. if ((sc = sizeCtl) < 0)
  7. Thread.yield(); // lost initialization race; just spin
  8. // 使用sun.misc.Unsafe的compareAndSwapInt方法设置当前对象的sizeCtl为-1,设置成功后,初始化数组,默认容量 DEFAULT_CAPACITY 为16
  9. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  10. try {
  11. if ((tab = table) == null || tab.length == 0) {
  12. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  13. @SuppressWarnings("unchecked")
  14. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  15. table = tab = nt;
  16. // 变量sc的值为(n-(n>>>2)),n-(n>>>2)=n-(n/2^2)=n-n/4=3/4*n=0.75*n,默认n开始为16,16*0.75=12
  17. // 可知,负载因子为0.75
  18. sc = n - (n >>> 2);
  19. }
  20. } finally {
  21. sizeCtl = sc;
  22. }
  23. break;
  24. }
  25. }
  26. return tab;
  27. }

2.2.2 链表转树,treeifyBin方法分析

  1. private final void treeifyBin(Node<K,V>[] tab, int index) {
  2. Node<K,V> b; int n, sc;
  3. if (tab != null) {
  4. //MIN_TREEIFY_CAPACITY为64
  5. // 虽然进入到转树方法,如果数组长度小于64,那么先扩容
  6. if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
  7. tryPresize(n << 1); // 扩容,下面详细说
  8. // 确定头节点没问题开始加锁,转树
  9. else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
  10. synchronized (b) {
  11. if (tabAt(tab, index) == b) {
  12. TreeNode<K,V> hd = null, tl = null;
  13. // 遍历链表,生成一棵树
  14. for (Node<K,V> e = b; e != null; e = e.next) {
  15. TreeNode<K,V> p =
  16. new TreeNode<K,V>(e.hash, e.key, e.val,
  17. null, null);
  18. if ((p.prev = tl) == null)
  19. hd = p;
  20. else
  21. tl.next = p;
  22. tl = p;
  23. }
  24. // 把数据放到树中
  25. setTabAt(tab, index, new TreeBin<K,V>(hd));
  26. }
  27. }
  28. }
  29. }
  30. }

2.3 tryPresize方法进行扩容

  1. //putAll批量插入或者插入节点后发现链表长度达到8个或以上,但数组长度为64以下时触发的扩容会调用到这个方法
  2. private final void tryPresize(int size) {
  3. int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
  4. int sc;
  5. //如果不满足条件,也就是 sizeCtl < 0 ,说明有其他线程正在扩容当中,这里也就不需要自己去扩容了,结束该方法
  6. while ((sc = sizeCtl) >= 0) {
  7. Node<K,V>[] tab = table; int n;
  8. //如果数组初始化则进行初始化,这个选项主要是为批量插入操作方法 putAll 提供的
  9. if (tab == null || (n = tab.length) == 0) {
  10. n = (sc > c) ? sc : c;
  11. //初始化时将 sizeCtl 设置为 -1 ,保证单线程初始化
  12. if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  13. try {
  14. if (table == tab) {
  15. @SuppressWarnings("unchecked")
  16. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  17. table = nt;
  18. sc = n - (n >>> 2);
  19. }
  20. } finally {
  21. //初始化完成后 sizeCtl 用于记录当前集合的负载容量值,也就是触发集合扩容的阈值
  22. sizeCtl = sc;
  23. }
  24. }
  25. }
  26. else if (c <= sc || n >= MAXIMUM_CAPACITY)
  27. break;
  28. //插入节点后发现链表长度达到8个或以上,但数组长度为64以下时触发的扩容会进入到下面这个 else if 分支
  29. else if (tab == table) {
  30. int rs = resizeStamp(n);
  31. //下面的内容基本跟上面 addCount 方法的 while 循环内部一致,可以参考上面的注释
  32. if (sc < 0) {
  33. Node<K,V>[] nt;
  34. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
  35. break;
  36. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
  37. transfer(tab, nt);
  38. }
  39. else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
  40. transfer(tab, null);
  41. }
  42. }
  43. }

2.3.1 transfer函数,移动和拷贝节点到新数组

  1. //调用该扩容方法的地方有:
  2. //java.util.concurrent.ConcurrentHashMap#addCount 向集合中插入新数据后更新容量计数时发现到达扩容阈值而触发的扩容
  3. //java.util.concurrent.ConcurrentHashMap#helpTransfer 扩容状态下其他线程对集合进行插入、修改、删除、合并、compute 等操作时遇到 ForwardingNode 节点时触发的扩容
  4. //java.util.concurrent.ConcurrentHashMap#tryPresize putAll批量插入或者插入后发现链表长度达到8个或以上,但数组长度为64以下时触发的扩容
  5. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  6. int n = tab.length, stride;
  7. //计算每条线程处理的桶个数,每条线程处理的桶数量一样,如果CPU为单核,则使用一条线程处理所有桶
  8. //每条线程至少处理16个桶,如果计算出来的结果少于16,则一条线程处理16个桶
  9. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  10. stride = MIN_TRANSFER_STRIDE; // subdivide range
  11. if (nextTab == null) { // 初始化新数组(原数组长度的2倍)
  12. try {
  13. @SuppressWarnings("unchecked")
  14. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  15. nextTab = nt;
  16. } catch (Throwable ex) { // try to cope with OOME
  17. sizeCtl = Integer.MAX_VALUE;
  18. return;
  19. }
  20. nextTable = nextTab;
  21. //将 transferIndex 指向最右边的桶,也就是数组索引下标最大的位置
  22. transferIndex = n;
  23. }
  24. int nextn = nextTab.length;
  25. //新建一个占位对象,该占位对象的 hash 值为 -1 该占位对象存在时表示集合正在扩容状态,key、value、next 属性均为 null ,nextTable 属性指向扩容后的数组
  26. //该占位对象主要有两个用途:
  27. // 1、占位作用,用于标识数组该位置的桶已经迁移完毕,处于扩容中的状态。
  28. // 2、作为一个转发的作用,扩容期间如果遇到查询操作,遇到转发节点,会把该查询操作转发到新的数组上去,不会阻塞查询操作。
  29. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  30. //该标识用于控制是否继续处理下一个桶,为 true 则表示已经处理完当前桶,可以继续迁移下一个桶的数据
  31. boolean advance = true;
  32. //该标识用于控制扩容何时结束,该标识还有一个用途是最后一个扩容线程会负责重新检查一遍数组查看是否有遗漏的桶
  33. boolean finishing = false; // to ensure sweep before committing nextTab
  34. //这个循环用于处理一个 stride 长度的任务,i 后面会被赋值为该 stride 内最大的下标,而 bound 后面会被赋值为该 stride 内最小的下标
  35. //通过循环不断减小 i 的值,从右往左依次迁移桶上面的数据,直到 i 小于 bound 时结束该次长度为 stride 的迁移任务
  36. //结束这次的任务后会通过外层 addCount、helpTransfer、tryPresize 方法的 while 循环达到继续领取其他任务的效果
  37. for (int i = 0, bound = 0;;) {
  38. Node<K,V> f; int fh;
  39. while (advance) {
  40. int nextIndex, nextBound;
  41. //每处理完一个hash桶就将 bound 进行减 1 操作
  42. if (--i >= bound || finishing)
  43. advance = false;
  44. else if ((nextIndex = transferIndex) <= 0) {
  45. //transferIndex <= 0 说明数组的hash桶已被线程分配完毕,没有了待分配的hash桶,将 i 设置为 -1 ,后面的代码根据这个数值退出当前线的扩容操作
  46. i = -1;
  47. advance = false;
  48. }
  49. //只有首次进入for循环才会进入这个判断里面去,设置 bound 和 i 的值,也就是领取到的迁移任务的数组区间
  50. else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
  51. bound = nextBound;
  52. i = nextIndex - 1;
  53. advance = false;
  54. }
  55. }
  56. if (i < 0 || i >= n || i + n >= nextn) {
  57. int sc;
  58. //扩容结束后做后续工作,将 nextTable 设置为 null,表示扩容已结束,将 table 指向新数组,sizeCtl 设置为扩容阈值
  59. if (finishing) {
  60. nextTable = null;
  61. table = nextTab;
  62. sizeCtl = (n << 1) - (n >>> 1);
  63. return;
  64. }
  65. //每当一条线程扩容结束就会更新一次 sizeCtl 的值,进行减 1 操作
  66. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  67. //(sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 成立,说明该线程不是扩容大军里面的最后一条线程,直接return回到上层while循环
  68. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  69. return;
  70. //(sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT 说明这条线程是最后一条扩容线程
  71. //之所以能用这个来判断是否是最后一条线程,因为第一条扩容线程进行了如下操作:
  72. // U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)
  73. //除了修改结束标识之外,还得设置 i = n; 以便重新检查一遍数组,防止有遗漏未成功迁移的桶
  74. finishing = advance = true;
  75. i = n; // recheck before commit
  76. }
  77. }
  78. else if ((f = tabAt(tab, i)) == null)
  79. //遇到数组上空的位置直接放置一个占位对象,以便查询操作的转发和标识当前处于扩容状态
  80. advance = casTabAt(tab, i, null, fwd);
  81. else if ((fh = f.hash) == MOVED)
  82. //数组上遇到hash值为MOVED,也就是 -1 的位置,说明该位置已经被其他线程迁移过了,将 advance 设置为 true ,以便继续往下一个桶检查并进行迁移操作
  83. advance = true; // already processed
  84. else {
  85. synchronized (f) {
  86. if (tabAt(tab, i) == f) {
  87. Node<K,V> ln, hn;
  88. //该节点为链表结构
  89. if (fh >= 0) {
  90. int runBit = fh & n;
  91. Node<K,V> lastRun = f;
  92. //遍历整条链表,找出 lastRun 节点
  93. for (Node<K,V> p = f.next; p != null; p = p.next) {
  94. int b = p.hash & n;
  95. if (b != runBit) {
  96. runBit = b;
  97. lastRun = p;
  98. }
  99. }
  100. //根据 lastRun 节点的高位标识(0 或 1),首先将 lastRun设置为 ln 或者 hn 链的末尾部分节点,后续的节点使用头插法拼接
  101. if (runBit == 0) {
  102. ln = lastRun;
  103. hn = null;
  104. }
  105. else {
  106. hn = lastRun;
  107. ln = null;
  108. }
  109. //使用高位和低位两条链表进行迁移,使用头插法拼接链表
  110. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  111. int ph = p.hash; K pk = p.key; V pv = p.val;
  112. if ((ph & n) == 0)
  113. ln = new Node<K,V>(ph, pk, pv, ln);
  114. else
  115. hn = new Node<K,V>(ph, pk, pv, hn);
  116. }
  117. //setTabAt方法调用的是 Unsafe 类的 putObjectVolatile 方法
  118. //使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
  119. //使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
  120. setTabAt(nextTab, i, ln);
  121. //使用 volatile 的方式将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的位置上
  122. setTabAt(nextTab, i + n, hn);
  123. //迁移完成后使用 volatile 的方式将占位对象设置到该 hash 桶上,该占位对象的用途是标识该hash桶已被处理过,以及查询请求的转发作用
  124. setTabAt(tab, i, fwd);
  125. //advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
  126. advance = true;
  127. }
  128. //该节点为红黑树结构
  129. else if (f instanceof TreeBin) {
  130. TreeBin<K,V> t = (TreeBin<K,V>)f;
  131. //lo 为低位链表头结点,loTail 为低位链表尾结点,hi 和 hiTail 为高位链表头尾结点
  132. TreeNode<K,V> lo = null, loTail = null;
  133. TreeNode<K,V> hi = null, hiTail = null;
  134. int lc = 0, hc = 0;
  135. //同样也是使用高位和低位两条链表进行迁移
  136. //使用for循环以链表方式遍历整棵红黑树,使用尾插法拼接 ln 和 hn 链表
  137. for (Node<K,V> e = t.first; e != null; e = e.next) {
  138. int h = e.hash;
  139. //这里面形成的是以 TreeNode 为节点的链表
  140. TreeNode<K,V> p = new TreeNode<K,V>
  141. (h, e.key, e.val, null, null);
  142. if ((h & n) == 0) {
  143. if ((p.prev = loTail) == null)
  144. lo = p;
  145. else
  146. loTail.next = p;
  147. loTail = p;
  148. ++lc;
  149. }
  150. else {
  151. if ((p.prev = hiTail) == null)
  152. hi = p;
  153. else
  154. hiTail.next = p;
  155. hiTail = p;
  156. ++hc;
  157. }
  158. }
  159. //形成中间链表后会先判断是否需要转换为红黑树:
  160. //1、如果符合条件则直接将 TreeNode 链表转为红黑树,再设置到新数组中去
  161. //2、如果不符合条件则将 TreeNode 转换为普通的 Node 节点,再将该普通链表设置到新数组中去
  162. //(hc != 0) ? new TreeBin<K,V>(lo) : t 这行代码的用意在于,如果原来的红黑树没有被拆分成两份,那么迁移后它依旧是红黑树,可以直接使用原来的 TreeBin 对象
  163. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
  164. (hc != 0) ? new TreeBin<K,V>(lo) : t;
  165. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  166. (lc != 0) ? new TreeBin<K,V>(hi) : t;
  167. //setTabAt方法调用的是 Unsafe 类的 putObjectVolatile 方法
  168. //使用 volatile 方式的 putObjectVolatile 方法,能够将数据直接更新回主内存,并使得其他线程工作内存的对应变量失效,达到各线程数据及时同步的效果
  169. //使用 volatile 的方式将 ln 链设置到新数组下标为 i 的位置上
  170. setTabAt(nextTab, i, ln);
  171. //使用 volatile 的方式将 hn 链设置到新数组下标为 i + n(n为原数组长度) 的位置上
  172. setTabAt(nextTab, i + n, hn);
  173. //迁移完成后使用 volatile 的方式将占位对象设置到该 hash 桶上,该占位对象的用途是标识该hash桶已被处理过,以及查询请求的转发作用
  174. setTabAt(tab, i, fwd);
  175. //advance 设置为 true 表示当前 hash 桶已处理完,可以继续处理下一个 hash 桶
  176. advance = true;
  177. }
  178. }
  179. }
  180. }
  181. }
  182. }

2.4 get

读取操作,不需要同步控制,比较简单
1. 空tab,直接返回null
2. 计算hash值,找到相应的bucket位置,为node节点直接返回,否则返回null

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode());
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) {
  6. if ((eh = e.hash) == h) { //总是检查头结点
  7. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  8. return e.val;
  9. }
  10. else if (eh < 0) //在迁移或都是TreeBin
  11. return (p = e.find(h, key)) != null ? p.val : null;
  12. while ((e = e.next) != null) { //链表则直接遍历查询
  13. if (e.hash == h &&
  14. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  15. return e.val;
  16. }
  17. }
  18. return null;
  19. }

2.5 remove

  1. public V remove(Object key) {
  2. return replaceNode(key, null, null);
  3. }
  4. final V replaceNode(Object key, V value, Object cv) {
  5. int hash = spread(key.hashCode());
  6. for (Node<K,V>[] tab = table;;) {
  7. Node<K,V> f; int n, i, fh;
  8. if (tab == null || (n = tab.length) == 0 ||
  9. (f = tabAt(tab, i = (n - 1) & hash)) == null)
  10. break;
  11. else if ((fh = f.hash) == MOVED) //删除时也需要确实扩容完成后才可以操作。
  12. tab = helpTransfer(tab, f);
  13. else {
  14. V oldVal = null;
  15. boolean validated = false;
  16. synchronized (f) {
  17. if (tabAt(tab, i) == f) {
  18. if (fh >= 0) {
  19. validated = true;
  20. for (Node<K,V> e = f, pred = null;;) {
  21. K ek;
  22. if (e.hash == hash &&
  23. ((ek = e.key) == key ||
  24. (ek != null && key.equals(ek)))) {
  25. V ev = e.val;
  26. if (cv == null || cv == ev ||
  27. (ev != null && cv.equals(ev))) { //cv不为null则替换,否则是删除。
  28. oldVal = ev;
  29. if (value != null)
  30. e.val = value;
  31. else if (pred != null)
  32. pred.next = e.next;
  33. else
  34. //没前置节点就是头节点
  35. setTabAt(tab, i, e.next);
  36. }
  37. break;
  38. }
  39. pred = e;
  40. if ((e = e.next) == null)
  41. break;
  42. }
  43. }
  44. else if (f instanceof TreeBin) {
  45. //...
  46. }
  47. }
  48. }
  49. if (validated) {
  50. if (oldVal != null) {
  51. if (value == null)
  52. addCount(-1L, -1);
  53. return oldVal;
  54. }
  55. break;
  56. }
  57. }
  58. }
  59. return null;
  60. }

2.6 CurrentHashMap 扩容过程图解

2.6.1 触发扩容的操作

image.png
image.png
总结一下:
(1) 元素个数达到扩容阈值。
(2) 调用 putAll 方法,但目前容量不足以存放所有元素时。
(3) 某条链表长度达到8,但数组长度却小于64时。

2.6.2 CPU核数与迁移任务hash桶数量分配的关系

image.png

2.6.3 单线程下线程的任务分配与迁移操作

image.png

2.6.4 多线程如何分配任务?

image.png

2.6.5 普通链表如何迁移?

image.png
什么是 lastRun 节点?
image.png

2.6.6 红黑树如何迁移?

image.png

2.6.7 hash桶迁移中以及迁移后如何处理存取请求?

image.png

2.6.8 多线程迁移任务完成后的操作

image.png
image.png

2.7. 扩展问题

2.7.1、为什么HashMap的容量会小于数组长度?

答:HashMap是为了通过hash值计算出index,从而最快速的访问 。如果容量大于数组很多的话再加上散列算法不是非常优秀的情况下很容易出现链表过长的情况,虽然现在出现了红黑树,但是速度依旧不如直接定位到某个数组位置直接获取元素的速度快,所以最理想的情况是数组的每个位置放入一个元素,这样定位最快,从而访问也最快,集合容量小于数组长度的原因在于尽量去分散元素的分布,相当于是拉长了分布的范围,尽量减少集中到一起的概率,从而提高访问的速度,同时,负载因子只要小于 1 ,就不存在容量等于数组长度的情况 。

2.7.2、扩容期间在未迁移到的hash桶插入数据会发生什么?

答:只要插入的位置扩容线程还未迁移到,就可以插入,当迁移到该插入的位置时,就会阻塞等待插入操作完成再继续迁移 。

2.7.3、正在迁移的hash桶遇到 get 操作会发生什么?

答:在扩容过程期间形成的 hn 和 ln链 是使用的类似于复制引用的方式,也就是说 ln 和 hn 链是复制出来的,而非原来的链表迁移过去的,所以原来 hash 桶上的链表并没有受到影响,因此从迁移开始到迁移结束这段时间都是可以正常访问原数组 hash 桶上面的链表,迁移结束后放置上fwd,往后的访问请求就直接转发到扩容后的数组去了 。

2.7.4、如果 lastRun 节点正好在一条全部都为高位或者全部都为低位的链表上,会不会形成死循环?

答:在数组长度为64之前会导致一直扩容,但是到了64或者以上后就会转换为红黑树,因此不会一直死循环 。

2.7.5、扩容后 ln 和 hn 链不用经过 hash 取模运算,分别被直接放置在新数组的 i 和 n + i 的位置上,那么如何保证这种方式依旧可以用过 h & (n - 1) 正确算出 hash 桶的位置?

答:如果 fh & n-1 = i ,那么扩容之后的 hash 计算方法应该是 fh & 2n-1 。 因为 n 是 2 的幂次方数,所以 如果 n=16, n-1 就是 1111(二进制), 那么 2n-1 就是 11111 (二进制) 。 其实 fh & 2n-1 和 fh & n-1 的值区别就在于多出来的那个 1 => fh & (10000) 这个就是两个 hash 的区别所在 。而 10000 就是 n 。所以说 如果 fh 的第五 bit 不是 1 的话 fh & n = 0 => fh & 2n-1 == fh & n-1 = i 。 如果第5位是 1 的话 。fh & n = n => fh & 2n-1 = i+n 。

2.7.6、我们都知道,并发情况下,各线程中的数据可能不是最新的,那为什么 get 方法不需要加锁?

答:get操作全程不需要加锁是因为Node的成员val是用volatile修饰的 。

2.7.7、ConcurrentHashMap 的数组上插入节点的操作是否为原子操作,为什么要使用 CAS 的方式?

答:待解决 。

2.7.8、扩容完成后为什么要再检查一遍?

答:为了避免遗漏hash桶,至于为什么会遗漏hash桶,有待后续补充 。

2.7.9、什么时候触发扩容?

链表转换为红黑树时(链表节点个数达到8个会转换为树),数组长度小于64
数组中总节点数大于阈值(数组长度的0.75倍)

2.7.10、如何hash定位

h^(h>>>16)&0x7fffffff,先将hashCode的高16位和低16位异或运算,这个做目的是为了让hash值更加随机。和0x7fffffff(int的最大值)相与运算是为了得到正数,因为负数的hash有特殊用途,如-1表forwarding node(表示该位置正在扩容)

2.7.11、扩容时,扩容后的容量是原先的几倍?单线程扩容吗?

2倍,多线程协同扩容,每个线程负责一块区域的复制迁移任务


3 总结