ConcurrentHashMap 是一个高性能的线程安全的哈希表容器,但它只能保证提供的原子性读写操作是线程安全的,此外还提供了一些原子性的简单复合逻辑方法。比如使用 ConcurrentHashMap 的原子性方法 computeIfAbsent 来做复合逻辑操作。该方法在虚拟机层面确保了写入数据的原子性,比加锁的效率高得多。

诸如 sizeisEmptycontainsValue 等聚合方法,在并发情况下可能会反映 ConcurrentHashMap 的中间状态。因此在并发情况下,这些方法的返回值只能用作参考。以及 putAll 这样的聚合方法也不能确保原子性,在 putAll 的过程中去获取数据可能只会获取到部分数据。并且 ConcurrentHashMap 是弱一致性的,因为诸如 get、size 等方法都没有用到锁,有可能会导致读取时无法马上获取到写入的数据。

JDK7 实现

在 JDK 7 中,ConcurrentHashMap 使用分段锁技术,整个哈希表内部进一步又细分了若干个 Segment。每一个 Segment 都是一把锁,因为其继承了 ReentrantLock。当一个线程访问其中一个 Segment 数据时,其他 Segment 的数据也能被其他线程访问,从而实现真正的并发访问。

1. 内部结构

  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
  2. implements ConcurrentMap<K,V>, Serializable {
  3. // 默认大小
  4. static final int DEFAULT_INITIAL_CAPACITY = 16;
  5. // 默认的并发度,即初始化segments数组的长度
  6. static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  7. // Segment数组
  8. final Segment<K,V>[] segments;
  9. }

其中,Segment 继承了 ReentrantLock,本质上是一个可重入锁,内部结构如下:

  1. static final class Segment<K,V> extends ReentrantLock implements Serializable {
  2. // 真正存放数据的桶
  3. transient volatile HashEntry<K,V>[] table;
  4. // 元素的个数,这里没有加volatile修饰,所以只能在加锁或者确保可见性的情况下进行访问
  5. transient int count;
  6. // segment元素修改次数记录,由于未进行volatile修饰,所以访问规则和count类似
  7. transient int modCount;
  8. // 扩容阈值,针对Segment的大小进行扩容
  9. transient int threshold;
  10. // 负载因子
  11. final float loadFactor;
  12. }

在构造时,segments 数组的大小由所谓的 concurrentcyLevel 决定,默认是 16,也可以在相应的构造函数中直接指定。但是一旦确认初始化 Segment 的数量后,是不可以扩容的。注意,Java 需要它是 2 的幂数值,如果输入是类似 15 这种非幂值,会被自动调整到 16 之类 2 的幂数值。

其中,HashEntry 用于存储键值对数据。一个 ConcurrentHashMap 里包含多个 Segment。一个 Segment 里又包含了一个 HashEntry 数组,每个数组元素都是一个链表结构,每个 Segment 守护着一个 HashEntry 数组里的元素,当对 HashEntry 数组里的数据进行修改时,必须先获得对应 Segment 的锁。

  1. static final class HashEntry<K,V> {
  2. final int hash;
  3. final K key;
  4. // 用volatile修饰,保证了获取时的可见性
  5. volatile V value;
  6. // 形成链表,JDK 7中该字段不是final的,意味着该字段可修改,而且也确实在remove方法中对该字段进行了修改
  7. volatile HashEntry<K,V> next;
  8. }

整体的内部结构如下图所示:
ConcurrentHashMap - 图1

2. put

  1. public V put(K key, V value) {
  2. Segment<K,V> s;
  3. // concurrentHashMap不允许key、value为空
  4. if (value == null) throw new NullPointerException();
  5. // 通过hash函数对key的hashCode重新散列,避免差劲的不合理的hashcode,保证散列均匀
  6. int hash = hash(key);
  7. // 根据哈希值先定位segment
  8. int j = (hash >>> segmentShift) & segmentMask;
  9. if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
  10. // 获取元素所在的segment
  11. s = ensureSegment(j);
  12. // 通过segment加锁进行put,若超出阈值threshold,需要扩容并rehash。
  13. // 扩容后的容量是当前容量的2倍。这样可以最大程度避免之前散列好的entry重新散列。
  14. return s.put(key, hash, value, false);
  15. }

首先通过 key 的 hashCode 定位到某个 Segment,然后尝试获取 Segment 的锁(调用 tryLock 方法),如果获取失败则尝试自旋获取锁。如果自旋重试次数达到了最大重试次数则改为阻塞获取锁,保证能够获取成功。

成功获取 Segment 的锁后,将当前 Segment 中的 table 通过 key 的 hashcode 定位到某个 HashEntry。然后遍历 HashEntry 链表(即同一 hashCode 下的值)。

  • 如果 HashEntry 链表不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value,不相等则插入到链表尾部。

  • 如果 HashEntry 链表为空则先判断是否需要扩容(这里的扩容不是针对整体扩容,而是单独对 Segment 扩容)然后新建一个 HashEntry 并加入到该 Segment 中,对比 HashMap 是插入元素后才判断是否需要扩容,这样可能造成无效扩容。

Segment 中 HashEntry 数组的大小必须是 2**n,扩容时会先生成一个原 HashEntry 数组长度一倍的新数组,然后遍历原来旧的 table_ **_数组,把每一个数组元素迁移(rehash)到新数组里面,迁移完以后再把新数组的引用直接替换旧引用。扩容后的每个链表元素的位置,要么不变,要么是原 table 索引位置 + 原 table 容量大小。

最后释放 Segment 的锁。

3. get

  1. public V get(Object key) {
  2. Segment<K,V> s;
  3. HashEntry<K,V>[] tab;
  4. // 获取key对应hash值
  5. int h = hash(key);
  6. long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
  7. // 根据hash值先定位到Segment
  8. if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
  9. (tab = s.table) != null) {
  10. // 遍历segment中的HashEntry数组
  11. for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
  12. (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
  13. e != null; e = e.next) {
  14. // 这里第一次初始化获取到的是主存中最新的数据,但在后续遍历中,有可能数据被其它线程修改
  15. // 从而导致其实这里最终返回的可能是过时的数据,这里体现了ConcurrentHashMap的弱一致性
  16. K k;
  17. if ((k = e.key) == key || (e.hash == h && key.equals(k)))
  18. return e.value;
  19. }
  20. }
  21. return null;
  22. }

先通过 Key 的 hashCode 定位到具体的 Segment,再通过一次 hash 定位到具体的 HashEntry。由于 HashEntry 中的 value 是用 volatile 修饰的,保证了内存可见性,所以每次获取时都是最新值。因此 get 方法不需要加锁,是非常高效的。但 get 方法是弱一致性的,所以有可能会获取到过时的数据。

4. size

ConcurrentHashMap 的 size() 方法的实现涉及分离锁的一个副作用。如果不进行同步,简单的计算所有 Segment 的总值,可能会因为并发 put,导致结果不准确,但是直接锁定所有 Segment 进行计算,就会变得非常昂贵。

所以,ConcurrentHashMap 的实现是通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数 2)来试图获得可靠值。先尝试两次不锁住 Segment 的方法来统计各 Segment 中 modcount 的大小,如果没有监控到发生变化就直接返回,若前后两次的值发生了变化,则说明在此过程中有线程更新了map,此时循环重试。为了防止一直循环重试,当超过最大重试次数时,直接强制对所有的 Segment 进行加锁统计。

JDK8 实现

ConcurrentHashMap 在 JDK 8 的实现中抛弃了分段锁的设计思想,由于 synchronized 锁在 Java 6 之后的性能已经得到了很大的提升,所以在 JDK 8 中大量使用了 synchronized 及 CAS 无锁操作来保证线程安全。另外将 JDK7 中存放数据的 HashEntry 改为 Node,通过锁住 Node 来减小锁粒度,同时底层数据结构改为【数组 + 链表 + 红黑树】的数据形式。
ConcurrentHashMap - 图2
虽然 ConcurrentHashMap 内部仍然有 Segment 定义,但仅仅是为了保证序列化时的兼容性而已,不再有任何结构上的用处。并且由于不再使用 Segment,初始化操作大大简化,修改为 lazy-load 形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点。

1. 内部结构

  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
  2. implements ConcurrentMap<K,V>, Serializable {
  3. // 默认容量
  4. private static final int DEFAULT_CAPACITY = 16;
  5. // 链表转红黑树的阈值
  6. static final int TREEIFY_THRESHOLD = 8;
  7. // 红黑树退化为链表的阈值
  8. static final int UNTREEIFY_THRESHOLD = 6;
  9. // 在链表转红黑树之前,还会判断键值对数量大于64才会发生转换,避免不必要的转化。
  10. static final int MIN_TREEIFY_CAPACITY = 64;
  11. // 存放数据的容器,采用懒加载的方式,直到第一次插入数据时才会初始化,数组的大小总是为2的幂次方。
  12. transient volatile Node<K,V>[] table;
  13. // 扩容时使用,平时为null,只有在扩容的时候才为非null
  14. private transient volatile Node<K,V>[] nextTable;
  15. // 用来控制table的初始化和扩容操作
  16. // 为 -1 表示正在初始化
  17. // 为 -n 表示 n-1 个线程正在扩容中
  18. // 若 >0 则表示 table 数组的长度
  19. // 若 =0 则为默认值,使用默认容量进行初始化
  20. private transient volatile int sizeCtl;
  21. }

Node:

  1. static class Node<K,V> implements Map.Entry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V val;
  5. volatile Node<K,V> next;
  6. }

我们可以发现 Key 是 final 的,因为在生命周期中,一个条目的 Key 是不可能发生变化的;与此同时 val 则声明为 volatile 以保证值的可见性。

TreeBin(红黑树):

  1. // 并不存储实际数据,维护对桶内红黑树的读写锁,存储对红黑树节点的引用
  2. static final class TreeBin<K,V> extends Node<K,V> {
  3. TreeNode<K,V> root;
  4. volatile TreeNode<K,V> first;
  5. }

TreeNode(树节点):

  1. static final class TreeNode<K,V> extends Node<K,V> {
  2. TreeNode<K,V> parent;
  3. TreeNode<K,V> left;
  4. TreeNode<K,V> right;
  5. TreeNode<K,V> prev;
  6. boolean red;
  7. }

2. put

put 方法的关键逻辑在 putVal 方法中。添加元素时,在没有哈希冲突的情况下,会使用 CAS 进行添加元素操作;如果有冲突,则通过 synchronized 将链表锁定,再执行接下来的操作。

  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. // key、value不能为空
  3. if (key == null || value == null) throw new NullPointerException();
  4. // 计算key的hashcode值,确定这个值在table中的位置
  5. int hash = spread(key.hashCode());
  6. int binCount = 0;
  7. for (Node<K,V>[] tab = table;;) {
  8. Node<K,V> f; int n, i, fh;
  9. // 如果是第一次put,table数组还没有初始化,则先初始化table数组,默认初始大小为16。
  10. if (tab == null || (n = tab.length) == 0)
  11. tab = initTable();
  12. // 如果table数组中索引为i的位置上节点值为null,则尝试通过CAS插入新节点,失败则自旋保证成功。
  13. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  14. if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
  15. break;
  16. // 如果这个位置存在节点,但为MOVED特殊节点,则表示当前ConcurrentHashMap正在扩容,则当前线程帮助进行扩容。
  17. } else if ((fh = f.hash) == MOVED)
  18. tab = helpTransfer(tab, f);
  19. else {
  20. V oldVal = null;
  21. // 如果这个节点为链表或红黑树的头节点,则利用synchronized阻塞插入数据(锁的是node)
  22. synchronized (f) {
  23. if (tabAt(tab, i) == f) {
  24. // 如果当前为链表形式
  25. if (fh >= 0) {
  26. binCount = 1;
  27. // 遍历链表
  28. for (Node<K,V> e = f;; ++binCount) {
  29. K ek;
  30. // 如果有hash值相同的key,覆盖旧值即可
  31. if (e.hash == hash &&
  32. ((ek = e.key) == key ||
  33. (ek != null && key.equals(ek)))) {
  34. oldVal = e.val;
  35. if (!onlyIfAbsent)
  36. e.val = value;
  37. break;
  38. }
  39. // 如果到链表末尾仍未找到,则生成一个新的Node节点插入到链表尾部
  40. Node<K,V> pred = e;
  41. if ((e = e.next) == null) {
  42. pred.next = new Node<K,V>(hash, key, value, null);
  43. break;
  44. }
  45. }
  46. }
  47. // 如果当前为红黑树形式
  48. else if (f instanceof TreeBin) {
  49. Node<K,V> p;
  50. binCount = 2;
  51. // 如果红黑树中存在与待插入键值对的Key相同的节点,则覆盖旧值
  52. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
  53. oldVal = p.val;
  54. // 否则生成一个新的TreeNode节点直接插入到红黑树中
  55. if (!onlyIfAbsent)
  56. p.val = value;
  57. }
  58. }
  59. }
  60. }
  61. // 插入完成后,如果链表长度大于TREEIFY_THRESHOLD,则尝试将链表转为红黑树
  62. if (binCount != 0) {
  63. if (binCount >= TREEIFY_THRESHOLD)
  64. treeifyBin(tab, i);
  65. // 因为putVal返回值是添加前的旧值,所以这里返回oldVal
  66. if (oldVal != null)
  67. return oldVal;
  68. break;
  69. }
  70. }
  71. }
  72. // 添加计数,并对当前容量大小进行检查,如果超过了临界值(实际大小*负载因子)则需要扩容
  73. addCount(1L, binCount);
  74. return null;
  75. }

putVal 方法中会逐步根据当前槽点是未初始化、空槽、链表、红黑树、是否需要扩容等不同情况做出了不同的处理。当第一次 put 时会对数组进行初始化,bucket 为空则 CAS 操作赋值,不为空则判断是链表还是红黑树进行赋值操作,若此时数组正在扩容则调用 helpTransfer 进行多线程并发扩容操作,最后返回 oldValue 并对操作调用 addCount 来更新 size 以及判断是否需要扩容。

初始化操作的实现在 initTable 里面,这是一个典型的 CAS 使用场景,利用 volatile 修饰的 sizeCtl 字段作为互斥手段:如果发现竞争性的初始化,就 spin 在那里,等待条件恢复;否则利用 CAS 设置排他标志。如果成功则进行初始化,否则重试。

3. get

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. // 计算key的hashcode值,确定这个值在table中的位置
  4. int h = spread(key.hashCode());
  5. // 判断数组是否为空,判断当前槽中的数据是否为空
  6. if ((tab = table) != null && (n = tab.length) > 0 &&
  7. (e = tabAt(tab, (n - 1) & h)) != null) {
  8. // 如果table[i]的头结点的key与查找的key相同,则无需遍历node,直接返回
  9. if ((eh = e.hash) == h) {
  10. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  11. return e.val;
  12. }
  13. // 否则,判断当前节点的hash值是否小于 0
  14. else if (eh < 0)
  15. // 如果hash值小于0则说明该节点为树节点,在红黑树中查找即可
  16. return (p = e.find(h, key)) != null ? p.val : null;
  17. // 否则从链表中遍历查找,查到则返回该节点的value,查不到则返回null
  18. while ((e = e.next) != null) {
  19. if (e.hash == h &&
  20. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  21. return e.val;
  22. }
  23. }
  24. return null;
  25. }

4. size

无论是 JDK7 还是 JDK8,ConcurrentHashMap 的 size() 方法都只能返回一个大概数量,无法做到 100% 的精确(高并发情况下获取精确值也没有意义),因为已经统计过的槽在 size() 返回最终结果前有可能又出现了变化,从而导致返回大小与实际大小存在些许差异。

  1. public int size() {
  2. long n = sumCount();
  3. return ((n < 0L) ? 0 :
  4. (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
  5. (int)n);
  6. }
  7. final long sumCount() {
  8. CounterCell[] as = counterCells; CounterCell a;
  9. long sum = baseCount;
  10. if (as != null) {
  11. for (int i = 0; i < as.length; ++i) {
  12. if ((a = as[i]) != null)
  13. sum += a.value;
  14. }
  15. }
  16. return sum;
  17. }

我们发现,虽然思路仍然和以前类似,都是分而治之的进行计数,然后求和处理,但具体实现中却是基于了一个奇怪的 CounterCell 类。难道它的数值就更加准确吗?数据一致性是怎么保证的?

  1. @sun.misc.Contended static final class CounterCell {
  2. volatile long value;
  3. CounterCell(long x) { value = x; }
  4. }

可以看到,Contended 注解表示 JVM 对其使用了伪共享优化,因而具有很高的读写性能。对于 CounterCell 的操作,是改编自 java.util.concurrent.atomic.LongAdder 的实现,是一种 JVM 利用空间换取更高效率的方法,利用了 Striped64 内部的复杂逻辑。这个东西非常小众,一般我们使用 AtomicLong 就足以满足绝大部分应用的性能需求了。

5. 扩容

ConcurrentHashMap 的扩容过程非常巧妙,它并没有完全打乱当前已有的元素位置,而是在数组扩容 2 倍后,将一半的元素移动到新的空间中,所有的元素根据高位是否为 1 分为 low 节点和 high 节点。下图显示了从 8 扩充到 16 时的可能得一种扩容情况,注意,新的位置总是在老位置的后面 n 个槽位(n 为原数组大小)。
image.png
下面来分析下 transfer 方法的执行逻辑:

  1. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  2. int n = tab.length, stride;
  3. // 通过计算 CPU 核心数和 Map 数组的长度得到每个线程(CPU)要帮助处理多少个桶,并且这里每个线程处理都是平均的
  4. // 默认每个线程处理16个桶。因此,如果长度是16的时候,扩容的时候只会有一个线程扩容
  5. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  6. stride = MIN_TRANSFER_STRIDE; // subdivide range
  7. if (nextTab == null) { // initiating
  8. try {
  9. // 初始化临时变量nextTab,将其在原有基础上扩容两倍
  10. @SuppressWarnings("unchecked")
  11. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  12. nextTab = nt;
  13. } catch (Throwable ex) { // try to cope with OOME
  14. sizeCtl = Integer.MAX_VALUE;
  15. return;
  16. }
  17. nextTable = nextTab;
  18. transferIndex = n;
  19. }
  20. int nextn = nextTab.length;
  21. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  22. boolean advance = true;
  23. boolean finishing = false; // to ensure sweep before committing nextTab
  24. // 死循环开始转移node节点,根据一个finishing变量来判断,该变量为true表示扩容结束,否则继续扩容。
  25. for (int i = 0, bound = 0;;) {
  26. Node<K,V> f; int fh;
  27. // 循环分配数组中一个桶的区间给线程
  28. while (advance) {
  29. ......
  30. }
  31. ......
  32. else {
  33. // 处理每个桶的行为都是同步的, 防止 putVal 的时候向链表插入数据。
  34. synchronized (f) {
  35. if (tabAt(tab, i) == f) {
  36. Node<K,V> ln, hn;
  37. // 如果fh>=0,即哈希值不为负数,表示这个桶是链表
  38. // 则将该链表按length拆成两份,取于结果是0的放在新表的低位,取于结果是1放在新表的高位
  39. if (fh >= 0) {
  40. ......
  41. }
  42. // 如果这个桶是红黑数,也拆成2份,方式和链表的方式一样
  43. // 然后,判断拆分过的树的节点数量,如果数量小于等于6,则退化成链表
  44. else if (f instanceof TreeBin) {
  45. ......
  46. }
  47. }
  48. }
  49. }
  50. }
  51. }

ForwardingNode 在 table 扩容时使用,内部记录了扩容后的 table,即 nextTable。当 table 需要进行扩容时,依次遍历当前 table 中的每一个槽,如果不为 null,则需要把其中所有的元素根据 hash 值放入扩容后的 nextTable 中,而原 table 的槽内会放置一个 ForwardingNode 节点。正如其名,此节点会把 find() 请求转发到扩容后的 nextTable 上。而执行 put() 方法的线程如果碰到此节点,也会协助进行迁移。