1. 如何使 HashMap 变得线程安全?

要使用线程安全的 HashMap,我们其实有多种方法。例如可以调用 Collections 工具类里的 synchronizedMap 方法,或者是直接使用 Hashtable。由于 ConcurrentHashMap 中很大部分都与 HashMap 相同,因此本文仅介绍 ConcurrentHashMap 与 HashMap 的不同之处,相同之处可以参考 《HashMap 解析》这篇文章。

使用 synchronizedMap

为了实现线程安全,Collections 类实现了一系列线程安全的类,如 SynchronizedList、SynchronizedMap等。从名字就可以看出,这些类的底层是通过 synchronized 轻量锁来实现的。

使用 Hashtable

HashTable 在操作时,会锁住整个 Map。

ConcurrentHashMap 与 HashMap 的不同之处

  1. 红黑树结构略有不同,HashMap 的红黑树中的节点叫做 TreeNode,TreeNode 不仅仅有属性,还维护着红黑树的结构,比如说查找,新增等等;ConcurrentHashMap 中红黑树被拆分成两块,TreeNode 仅仅维护的属性和查找功能,新增了 TreeBin,来维护红黑树结构,并负责根节点的加锁和解锁;
  2. 新增 ForwardingNode(转移)节点,扩容的时候会使用到,通过使用该节点,来保证扩容时的线程安全。

    2. ConcurrentHashMap 源码解析

    ConcurrentHashMap 的重要属性

    先来看看 ConcurrentHashMap 的几个重要属性。 ```java // 采用volatile关键字修饰,一旦被修改会立马通知其他线程 transient volatile Node[] table;

private static final int DEFAULT_CAPACITY = 16;

// 转换为红黑树的长度阈值 static final int TREEIFY_THRESHOLD = 8;

// 用于识别扩容时正在转移数据 static final int MOVED = -1;

// 计算哈希值时用到的参数,用来去除符号位 static final int HASH_BITS = 0x7fffffff;

// 转移数据时,新的哈希表数组 private transient volatile Node[] nextTable;

  1. <a name="5504d209"></a>
  2. ## ConcurrentHashMap 的重要的类
  3. 了解完上面的重要属性后,再来看看 ConcurrentHashMap 中一些重要的类。
  4. <a name="Node"></a>
  5. ### Node
  6. 链表中的元素为 Node 对象。它是链表上的一个节点,内部存储了 key、value 值,以及他的下一个节点的引用。这样一系列的 Node 就串成一串,组成一个链表。
  7. <a name="ForwardingNode"></a>
  8. ### ForwardingNode
  9. 当进行扩容时,要把链表迁移到新的哈希表,在做这个操作时,会在把数组中的头节点替换为 ForwardingNode 对象。ForwardingNode 中不保存 key 和 value,只保存了扩容后哈希表(nextTable)的引用。此时查找相应 node 时,需要去 nextTable 中查找。
  10. <a name="TreeBin"></a>
  11. ### TreeBin
  12. 当链表转为红黑树后,数组中保存的引用不再是 Node,而是 TreeBin,TreeBin 内部不保存 key/value,他保存了 TreeNode 的 list 以及红黑树 root。
  13. <a name="TreeNode"></a>
  14. ### TreeNode
  15. 红黑树的节点。
  16. <a name="dde7e601"></a>
  17. ## ConcurrentHashMap 的 put() 方法源码分析
  18. ```java
  19. final V putVal(K key, V value, boolean onlyIfAbsent) { //onlyIfAbsent为 false 则覆盖已有的value值
  20. //key和value不能为空
  21. if (key == null || value == null) throw new NullPointerException();
  22. //计算key的hash值,后面我们会看spread方法的实现
  23. int hash = spread(key.hashCode());
  24. int binCount = 0;
  25. //开始自旋,table属性采取懒加载,第一次put的时候进行初始化
  26. for (Node<K,V>[] tab = table;;) {
  27. Node<K,V> f; int n, i, fh;
  28. //如果table未被初始化,则初始化table
  29. if (tab == null || (n = tab.length) == 0)
  30. tab = initTable();
  31. //通过key的hash值计算出要存入的位置
  32. // 这里写法很严谨,使用CAS而不是直接赋值,因为在判断槽点为空的瞬间可能其他线程就做了赋值操作了
  33. //如果该位置的值为空,那么使用CAS生成新的node来存储该key、value,放入此位置,结束for自旋
  34. //否则继续自旋
  35. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  36. // 具体地看看CAS,拿出tab中对应下标为i的内存值,与旧的预期值null做比较,如果相同,那么使用new出的Node执行赋值
  37. if (casTabAt(tab, i, null,
  38. new Node<K,V>(hash, key, value, null)))
  39. break; // no lock when adding to empty bin
  40. }
  41. //如果该位置节点元素的hash值为MOVED,也就是-1,代表正在做扩容的复制。那么该线程参与复制工作。
  42. else if ((fh = f.hash) == MOVED)
  43. tab = helpTransfer(tab, f);
  44. //下面分支处理table映射的位置已经存在node的情况
  45. else {
  46. V oldVal = null;
  47. // 锁定当前位置,使其余线程不能操作,保证了安全
  48. synchronized (f) {
  49. //再次确认该位置的值是否已经发生了变化
  50. if (tabAt(tab, i) == f) {
  51. //第一种情况:fh大于等于0,表示此位置存储的是链表
  52. if (fh >= 0) {
  53. //遍历链表,binCount用于统计node数量
  54. binCount = 1;
  55. for (Node<K,V> e = f;; ++binCount) {
  56. K ek;
  57. //如果存在一样hash值的node,那么根据onlyIfAbsent的值选择覆盖value或者不覆盖
  58. if (e.hash == hash &&
  59. ((ek = e.key) == key ||
  60. (ek != null && key.equals(ek)))) {
  61. oldVal = e.val;
  62. if (!onlyIfAbsent)
  63. e.val = value;
  64. break;
  65. }
  66. Node<K,V> pred = e;
  67. //如果找到最后一个元素,也没有找到相同hash的node,那么生成新的node存储key/value,作为尾节点放入链表。
  68. if ((e = e.next) == null) {
  69. pred.next = new Node<K,V>(hash, key,
  70. value, null);
  71. break;
  72. }
  73. }
  74. }
  75. //第二种情况:fh小于0,表示 hash 值映射哈希表对应位置存储的是红黑树
  76. //那么通过 TreeBin 对象的 putTreeVal 方法保存 key/value
  77. else if (f instanceof TreeBin) {
  78. Node<K,V> p;
  79. binCount = 2;
  80. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  81. value)) != null) {
  82. oldVal = p.val;
  83. if (!onlyIfAbsent)
  84. p.val = value;
  85. }
  86. }
  87. }
  88. }
  89. //node保存完成后,判断链表长度是否已经超出阈值,则进行哈希表扩容或者将链表转化为红黑树
  90. //binCount 是用来记录链表保存 node 的数量的,可以看到当其大于 TREEIFY_THRESHOLD,也就是 8 的时候进行扩容。
  91. if (binCount != 0) {
  92. if (binCount >= TREEIFY_THRESHOLD)
  93. //treeifyBin()会选择是把此时保存数据所在的链表转为红黑树,还是对整个哈希表扩容。
  94. treeifyBin(tab, i);
  95. if (oldVal != null)
  96. return oldVal;
  97. break;
  98. }
  99. }
  100. }
  101. // 计数,并且判断哈希表中使用的桶位是否超出阈值75%,超出的话进行扩容
  102. // 与treeifyBin中的扩容不一样,treeifyBin中的扩容是由链表过长引起的,这里的扩容是由于75%的位置被使用引起的
  103. addCount(1L, binCount);
  104. return null;
  105. }
  106. static final int spread(int h) {
  107. return (h ^ (h >>> 16)) & HASH_BITS;
  108. }

主线流程如下图:
ConcurrentHashMap 解析 - 图1

在判断 hash 映射是否为空的时候,取出哈希表中该 hash 值对应位置的代码如下。

tabAt(tab, i = (n - 1) & hash)

initTable() 源码分析

数组初始化时,首先通过自旋来保证一定可以初始化成功,然后通过 CAS 设置 sizeCtl 变量的值,来保证同一时刻只能有一个线程对数组进行初始化,CAS 成功之后,还会再次判断当前数组是否已经初始化完成,如果已经初始化完成,就不会再次初始化,通过自旋 + CAS + 双重 check 等手段保证了数组初始化时的线程安全,源码如下:

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // table为空时进入循环
    while ((tab = table) == null || tab.length == 0) {
        //如果sizeCtl<0,那么有其他线程正在创建table,所以本线程让出CPU的执行权。直到table创建完成,while循环跳出。if中同时还把sizeCtl的值赋值给了sc。
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //以CAS方式修改sizeCtl为-1,表示本线程已经开始创建table的工作。
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 双重检查,再次确认是否table还是空的
                if ((tab = table) == null || tab.length == 0) {
                    //如果sc有值,那么使用sc的值作为table的size,否则使用默认值16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    //sc被设置为table大小的3/4(无符号右移两位,相当于除以4)
                    sc = n - (n >>> 2);
                }
            } finally {
                  //sizeCtl被设置为table大小的3/4
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

里面有个关键的值 sizeCtl,这个值有多个含义。

  1. -1 代表有线程正在创建 table;
  2. -N 代表有 N-1 个线程正在复制 table;
  3. 在 table 被初始化前,代表根据构造函数传入的值计算出的应被初始化的大小;
  4. 在 table 被初始化后,则被设置为 table 大小 的 75%,代表 table 的容量(数组容量)。

initTable 中使用到 1 和 4,2 和 3 在其它方法中会有使用。下面我们可以先看下 ConcurrentHashMap 的构造方法,里面会使用上面的 3 。

ConcurrentHashMap 的构造函数源码分析

ConcurrentHashMap 带容量参数的构造函数源码如下:

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    //如果传入的初始化容量值超过最大容量的一半,那么sizeCtl会被设置为最大容量。
    //否则通过tableSizeFor方法就算出一个2的n次方数值作为size
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

这是一个有参数的构造方法。如果你对未来存储的数据量有预估,我们可以指定哈希表的大小,避免频繁的扩容操作。tableSizeFor 这个方法确保了哈希表的大小永远都是 2 的 n 次方。如果 size 不是 2 的 n 次方,那么 hash 算法计算的下标发生的碰撞概率会大大增加。因此通过 tableSizeFor 方法确保了返回大于传入参数的最小 2 的 n 次方。注意这里传入的参数不是 initialCapacity,而是 initialCapacity 的 1.5 倍 + 1。这样做是为了保证在默认 75% 的负载因子下,能够足够容纳 initialCapacity 数量的元素,避免了再一次的扩容。讲到这里你一定好奇 tableSizeFor 是如何实现向上取得最接近入参 2 的 n 次方的。下面我们来看 tableSizeFor 源代码:

private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

依旧是二进制按位操作,这样一顿操作后,得到的数值就是大于 c 的最小 2 的 n 次。我们推演下过程,假设 c 是 9:

1、int n = 9 - 1

n=8

2、n |= n >>> 1

n=1000

n >>> 1=0100

两个值按位或后

n=1100

3、n |= n >>> 2

n=1100

n >>> 2=0011

n=1111

到这里可以看出规律来了。如果 c 足够大,使得 n 很大,那么运算到 n |= n >>> 16 时,n 的 32 位都为 1。

总结一下这一段逻辑,其实就是把 n 有数值的 bit 位全部置为 1。这样就得到了一个肯定大于等于 n 的值。我们再看最后一行代码,最终返回的是 n+1,那么一个所有位都是 1 的二进制数字,+1 后得到的就是一个 2 的 n 次方数值。

关于 ConcurrentHashMap (int initialCapacity) 构造函数的分析我们总结下:

  1. 构造函数中并不会初始化哈希表;
  2. 构造函数中仅设置哈希表大小的变量 sizeCtl;
  3. initialCapacity 并不是哈希表大小,哈希表大小 sizeCtl 为 initialCapacity1.5+1 后,*向上取最小的 2 的 n 次方。如果超过最大容量一半,那么就是最大容量。

    treeifyBin() 方法源码分析

    首先我们要理解为什么 Map 需要扩容,这是因为我们采用哈希表存储数据,当固定大小的哈希表存储数据越来越多时,链表长度会越来越长,这会造成 put 和 get 的性能下降。此时我们希望哈希表中多一些桶位,预防链表继续堆积的更长。接下来我们分析 treeifyBin 方法代码,这个代码中会选择是把此时保存数据所在的链表转为红黑树,还是对整个哈希表扩容。
    private final void treeifyBin(Node<K,V>[] tab, int index) {
     Node<K,V> b; int n, sc;
     if (tab != null) {
         //如果哈希表长度小于64,那么选择扩大哈希表的大小,而不是把链表转为红黑树
         if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
             tryPresize(n << 1);
         //将哈希表中index位置的链表转为红黑树
         else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
             synchronized (b) {
                   //下面逻辑将node链表转化为TreeNode链表
                 if (tabAt(tab, index) == b) {
                     TreeNode<K,V> hd = null, tl = null;
                     for (Node<K,V> e = b; e != null; e = e.next) {
                         TreeNode<K,V> p =
                             new TreeNode<K,V>(e.hash, e.key, e.val,
                                               null, null);
                         if ((p.prev = tl) == null)
                             hd = p;
                         else
                             tl.next = p;
                         tl = p;
                     }
                     //TreeBin代表红黑树,将TreeBin保存在哈希表的index位置
                     setTabAt(tab, index, new TreeBin<K,V>(hd));
                 }
             }
         }
     }
    }
    
    我们再重点看一下 tryPresize,此方法中实现了对数组的扩容,传入的参数 size 是原来哈希表大小的一倍。我们假定原来哈希表大小为 16,那么传入的 size 值为 32,以此数值作为例子来分析源代码。注意 while 中第一个 if 此时不会进入,但为了讲解代码我也在注释中一并讲解了,大家看的时候在这个分支中不要以 size=16 作为前提来分析。
    //size为32
    //sizeCtl为原大小16的3/4,也就是12
    private final void tryPresize(int size) {
       //根据tableSizeFor计算出满足要求的哈希表大小,对齐为2的n次方。c被赋值为64,这是扩容的上限,扩容一般都是扩容为原来的2倍,这里c值为了处理一些特殊的情况,确保扩容能够正常退出。
     int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
         tableSizeFor(size + (size >>> 1) + 1);
     int sc;
     //此时sc和sizeCtl均为12,进入while循环
     while ((sc = sizeCtl) >= 0) {
         Node<K,V>[] tab = table; int n;
         //这里处理的table还未初始化的逻辑,这是由于putAll操作不调用initTable,而是直接调用tryPresize
         if (tab == null || (n = tab.length) == 0) {
             //putAll第一次调用时,假设putAll进来的map只有一个元素,那么size传入1,计算出c为2.而sc和sizeCtl都为0,因此n=2
             n = (sc > c) ? sc : c;
             if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                 try {
                     if (table == tab) {
                         @SuppressWarnings("unchecked")
                         Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                         table = nt;
                       //经过计算sc=2
                         sc = n - (n >>> 2);
                     }
                 } finally {
                     //sizeCtl设置为2.第二次循环时,因为sc和c相等,都为2,进入下面的else if分支,结束while循环。
                     sizeCtl = sc;
                 }
             }
         }
         //扩容已经达到C值,结束扩容
         else if (c <= sc || n >= MAXIMUM_CAPACITY)
             break;
         //table已经存在,那么就对已有table进行扩容
         else if (tab == table) {
             int rs = resizeStamp(n);
             //sc小于0,说明别的线程正在扩容,本线程协助扩容
             if (sc < 0) {
                 Node<K,V>[] nt;
                 //判断是否扩容的线程达到上限,如果达到上限,退出
                 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                     sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                     transferIndex <= 0)
                     break;
                 //未达上限,参与扩容,更新sizeCtl值。transfer方法负责把当前哈希表数据移入新的哈希表。
                 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                     transfer(tab, nt);
             }
             //本线程为第一个扩容线程,transfer第二个参数传入null,代表需要新建扩容后的哈希表
             else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                          (rs << RESIZE_STAMP_SHIFT) + 2))
                 //这个扩容方法就不展开了,核心是每次都是将数组大小扩容到原来 2 倍,这样保证了哈希表的大小始终为 2 的 n 次方。
                 transfer(tab, null);
         }
     }
    }
    
    讲到这里我们再回一下 put 方法中最后有如下一行代码:
    addCount(1L, binCount);
    
    这行代码其实是对哈希表保存的元素数量进行计数。同时根据当前保存状况,判断是否进行扩容。你可能会问,在添加元素的过程中不是已经执行了扩容的逻辑了吗?其实上面的扩容逻辑是链表过长引起的,还有另外一种因素可能引起链表扩容,不过这两种扩容的逻辑是基本一致的。

触发扩容的因素其实有两种(HashMap 也是这样):

  1. 链表过长引起。
    • 链表长度超过 TREEIFY_THRESHOLD 8 时,触发 treeifyBin(),treeifyBin() 检查当前哈希表长度是否小于 MIN_TREEIFY_CAPACITY 64,如果没超过,则进行扩容;如果超过,就将链表转为红黑树。
  2. addCount 方法中判断哈希表是否超过 75% 的位置已经被使用,如果超过则进行扩容。

    ConcurrentHashMap 的 get() 方法源码分析

    put 的源代码比较复杂,其实 put 方法的复杂是为了 get 服务,以提高 get 的效率。相比较 put 方法而言,get 方法就简单多了。先获取数组的下标,然后通过判断数组下标的 key 是否和我们的 key 相等,相等的话直接返回,如果下标的槽点是链表或红黑树的话,分别调用相应的查找数据的方法,整体思路和 HashMap 很像。
    public V get(Object key) {
     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
     //获取key值的hash值
     int h = spread(key.hashCode());
     //这个if判断中做了如下几件事情:
     //1、哈希表是否存在
     //2、哈希表是否保存了数据,同时取得哈希表length
     //3、哈希表中hash值映射位置保存的对象不为null,并取出给e,e为链表头节点
     if ((tab = table) != null && (n = tab.length) > 0 &&
         (e = tabAt(tab, (n - 1) & h)) != null) {
         //如果e的hash值和传入key的hash值相等
         if ((eh = e.hash) == h) {
             //如果e的key和传入的key引用相同,或者key eaquals ek。那么返回e的value。
             if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                 return e.val;
         }
         //如果头节点的hash<0,有两种情况
         //1、hash=-1,正在扩容,该节点为ForwardingNode,通过find方法在nextTable中查找
         //2、hash=-2,该节点为TreeBin,链表已经转为了红黑树。同样通过TreeBin的find方法查找。
         else if (eh < 0)
             return (p = e.find(h, key)) != null ? p.val : null;
         //以上两种条件不满足,说明hash映射位置保存的还是链表头节点,但是和传入key值不同。那么遍历链表查找即可。
         while ((e = e.next) != null) {
             if (e.hash == h &&
                 ((ek = e.key) == key || (ek != null && key.equals(ek))))
                 return e.val;
         }
     }
     return null;
    }
    
    ConcurrentHashMap 中,通过大量的 CAS 操作加上 Synchronized 来确保线程安全。对 ConcurrentHashMap 的学习我们把重点放在哈希算法和扩容上,面试的时候是考察的重点。

    3. ConcurrentHashMap 如何体现线程安全的?

    数组初始化时的线程安全

    数组初始化时,首先通过自旋来保证一定可以初始化成功,然后通过 CAS 设置 SIZECTL 变量的值,来保证同一时刻只能有一个线程对数组进行初始化,CAS 成功之后,还会再次判断当前数组是否已经初始化完成,如果已经初始化完成,就不会再次初始化,通过自旋 + CAS + 双重 check 等手段保证了数组初始化时的线程安全。具体可查看 initTable() 方法。

    新增槽点值时的线程安全

    此时为了保证线程安全,做了四处优化:

1. 通过自旋死循环保证一定可以新增成功。
在新增之前,通过for (Node<K,V>[] tab = table;;)这样的死循环来保证新增一定可以成功,一旦新增成功,就可以退出当前死循环,新增失败的话,会重复新增的步骤,直到新增成功为止。

2. 当前槽点为空时,通过 CAS 新增。
这里的写法非常严谨,没有在判断槽点为空的情况下直接赋值,因为在判断槽点为空和赋值的瞬间,很有可能槽点已经被其他线程赋值了,所以我们采用 CAS 算法,能够保证槽点为空的情况下赋值成功,如果恰好槽点已经被其他线程赋值,当前 CAS 操作失败,会再次执行 for 自旋,再走槽点有值的 put 流程,这里就是自旋 + CAS 的结合。

3. 当前槽点有值,用 synchronized 锁住当前槽点。
put 时,如果当前槽点有值,就是 key 的 hash 冲突的情况,此时槽点上可能是链表或红黑树,我们通过锁住槽点,来保证同一时刻只会有一个线程能对槽点进行修改。

4. 红黑树旋转时,锁住红黑树的根节点,保证同一时刻,当前红黑树只能被一个线程旋转。

通过以上 4 点,保证了在各种情况下的新增(不考虑扩容的情况下),都是线程安全的,通过自旋 + CAS + 锁三大姿势,实现的很巧妙,值得我们借鉴。

扩容时的线程安全

ConcurrentHashMap 的扩容时机和 HashMap 相同,都是在 put 方法的最后一步检查是否需要扩容,如果需要则进行扩容,但两者扩容的过程完全不同,ConcurrentHashMap 扩容的方法叫做 transfer,从 put 方法的 addCount 方法进去,就能找到 transfer 方法,transfer 方法的主要思路是:

  1. 首先需要把老数组的值全部拷贝到扩容之后的新数组上,先从数组的队尾开始拷贝;
  2. 拷贝数组的槽点时,先把原数组槽点锁住,保证原数组槽点不能操作,成功拷贝到新数组时,把原数组槽点赋值为转移节点;
  3. 这时如果有新数据正好需要 put 到此槽点时,发现槽点为转移节点,就会一直等待,所以在扩容完成之前,该槽点对应的数据是不会发生变化的;
  4. 从数组的尾部拷贝到头部,每拷贝成功一次,就把原数组中的节点设置成转移节点;
  5. 直到所有数组数据都拷贝到新数组时,直接把新数组整个赋值给数组容器,拷贝完成。

    4. JDK1.7 和 JDK1.8 中 ConcurrentHashMap 的锁机制区别

    JDK 1.7 中,采用分段锁的机制,实现并发的更新操作,底层采用数组+链表的存储结构,包括两个核心静态内部类 Segment 和 HashEntry。

  6. Segment 继承 ReentrantLock(重入锁) 用来充当锁的角色,每个 Segment 对象守护每个散列映射表的若干个桶;

  7. HashEntry 用来封装映射表的键-值对(就是 JDK1.8 的 Node);
  8. 每个桶是由若干个 HashEntry 对象链接起来的链表

ConcurrentHashMap 解析 - 图2

JDK 1.8 中,采用Node + CAS + Synchronized来保证并发安全。取消 Segment,直接用 table 数组存储键值对;当 HashEntry 对象组成的链表长度超过 TREEIFY_THRESHOLD 时,链表转换为红黑树,提升性能。底层变更为数组 + 链表 + 红黑树。

JDK 1.8 中直接使用 synchronized 对槽点上锁,而不是使用 ReentrantLock 的好处是:

  1. 粒度降低了。
  2. JVM 开发团队没有放弃 synchronized,而且基于 JVM 的 synchronized 优化空间更大,更加自然。
  3. 在大量的数据操作下,对于 JVM 的内存压力,基于 API 的 ReentrantLock 会开销更多的内存。