ConcurrentHashMap 是一个常用的高并发容器类,也是一种线程安全的哈希表。Java 7 以及之前版本中的 ConcurrentHashMap 使用 Segment(分段锁)技术将数据分成一段一段存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。Java 8 对其内部的存储结构进行了优化,使之在性能上有进一步的提升。ConcurrentHashMap 和同步容器 HashTable 的主要区别在锁的类型和粒度上:HashTable 实现同步是利用 synchronized 关键字进行锁定的,其实是针对整张哈希表进行锁定的,即每次锁住整张表让线程独占,虽然解决了线程安全问题,但是造成了巨大的资源浪费。

HashMap 和 HashTable 的问题

首先,HashMap 是非线程安全的。在 JDK1.7 中,采取的是头插法,先插入链表的头部,那么在扩容的时候,容易引起死循环,具体见 HashMap 为什么线程不安全。在 JDK 1.8 中修改为尾插法,解决了这个问题。但是由于没有加锁,故而多线程下也不能保证所取到的就是上一步保存的数据(线程 A 先保存数据,线程 B 后取数据,有可能线程 B 先调用到)。

HashTable 是线程安全的,其实现是在方法上加上 synchronized 关键字,锁的对象是整张 hash 表,当一个线程访问其中一个方法时,另一个线程访问其他的方法都会进入阻塞或轮询状态,导致效率低下。

HashMap 和 HashTable 的实现原理是一样的,只是有一下两个区别:

  1. HashTable不允许key和value为null
  1. public synchronized V put(K key, V value) {
  2. // Make sure the value is not null
  3. if (value == null) {
  4. throw new NullPointerException();
  5. }
  6. // Makes sure the key is not already in the hashtable.
  7. Entry<?,?> tab[] = table;
  8. int hash = key.hashCode();
  9. int index = (hash & 0x7FFFFFFF) % tab.length;
  10. @SuppressWarnings("unchecked")
  11. Entry<K,V> entry = (Entry<K,V>)tab[index];
  12. for(; entry != null ; entry = entry.next) {
  13. if ((entry.hash == hash) && entry.key.equals(key)) {
  14. V old = entry.value;
  15. entry.value = value;
  16. return old;
  17. }
  18. }
  19. addEntry(hash, key, value, index);
  20. return null;
  21. }

可见 HashTable 没对 key 和 value 做判空,直接抛出异常,因此两者都不能为 null。

  1. HashTable 使用 synchronized 来保证线程安全,包含 get()/put() 在内的所有相关需要进行同步执行的方法都加上了 synchronized 关键字,对这个 Hash 表进行锁定

JDK 1.7 中 ConcurrentHashMap 的结构

JDK 1.7 的 ConcurrentHashMap 的锁机制基于粒度更小的分段锁,分段锁也是提升多并发程序性能的重要手段之一,和 LongAdder 一样,属于热点分散型的削峰手段。

分段锁其实是一种锁的设计,并不是具体的一种锁,对于 ConcurrentHashMap 而言,分段锁技术将 Key 分成一个一个小 Segment 存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。

JDK 1.7 中的 ConcurrentHashMap 采用了 Segment 分段锁的方式实现。一个 ConcurrentHashMap 中包含一个 Segment 数组,一个 Segment 中包含一个 HashEntry 数组,每个元素是一个链表结构(一个 Hash 表的桶)。

image.png

JDK 1.7 中 ConcurrentHashMap 的实现原理

ConcurrentHashMap 类的核心源码如下:

  1. public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
  2. private static final long serialVersionUID = 7249069246763182397L;
  3. /**
  4. * 默认初始容量
  5. */
  6. static final int DEFAULT_INITIAL_CAPACITY = 16;
  7. /**
  8. * 默认扩容因子
  9. */
  10. static final float DEFAULT_LOAD_FACTOR = 0.75f;
  11. /**
  12. * 哈希表的默认并发级别,即分段锁的数量
  13. */
  14. static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  15. /**
  16. * 集合最大容量
  17. */
  18. static final int MAXIMUM_CAPACITY = 1 << 30;
  19. /**
  20. * 每段表的最小容量。 必须是 2 的整数幂,至少是 2,避免在构造之后立即扩容
  21. */
  22. static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
  23. /**
  24. * 分段锁的最大段数;必须是小于 1 << 24 的 2 的幂。
  25. */
  26. static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
  27. /**
  28. * 加锁前的重试次数
  29. */
  30. static final int RETRIES_BEFORE_LOCK = 2;
  31. /**
  32. * segment的掩码。key的哈希值的高位用来选择具体的Segment
  33. */
  34. final int segmentMask;
  35. /**
  36. * 偏移量
  37. */
  38. final int segmentShift;
  39. /**
  40. * segment数组
  41. */
  42. final Segment<K,V>[] segments;
  43. transient Set<K> keySet;
  44. transient Set<Map.Entry<K,V>> entrySet;
  45. transient Collection<V> values;
  46. /**
  47. * 默认构造方法,初始容量、加载因子以及并发级数都是用默认值
  48. * DEFAULT_INITIAL_CAPACITY:16
  49. * DEFAULT_LOAD_FACTOR:0.75
  50. * DEFAULT_CONCURRENCY_LEVEL:16
  51. */
  52. public ConcurrentHashMap() {
  53. this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
  54. }
  55. public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
  56. if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
  57. throw new IllegalArgumentException();
  58. // 限制分段锁数量
  59. if (concurrencyLevel > MAX_SEGMENTS)
  60. concurrencyLevel = MAX_SEGMENTS;
  61. // 查找最佳匹配参数(不小于给定参数最接近的2次幂)
  62. int sshift = 0;
  63. // 表示并发度,也就是分段锁的数量
  64. int ssize = 1;
  65. // concurrencyLevel 默认值为16,while循环结束之后,sshift=4,ssize=16
  66. // while循环是为了保证并发度为2的整数次方
  67. while (ssize < concurrencyLevel) {
  68. ++sshift;
  69. ssize <<= 1;
  70. }
  71. // 偏移量 28
  72. this.segmentShift = 32 - sshift;
  73. // 掩码值 3
  74. this.segmentMask = ssize - 1;
  75. // 初始容量 16
  76. if (initialCapacity > MAXIMUM_CAPACITY)
  77. initialCapacity = MAXIMUM_CAPACITY;
  78. // c = 16 / 16 = 1
  79. int c = initialCapacity / ssize;
  80. if (c * ssize < initialCapacity) {
  81. ++c;
  82. }
  83. // MIN_SEGMENT_TABLE_CAPACITY = 2
  84. // cap表示每段表的容量,默认为2
  85. int cap = MIN_SEGMENT_TABLE_CAPACITY;
  86. while (cap < c) {
  87. cap <<= 1;
  88. }
  89. // create segments and segments[0]
  90. // 第0个Segment的
  91. Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
  92. (HashEntry<K,V>[])new HashEntry[cap]);
  93. // segments的大小为2的整数次方
  94. Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
  95. UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
  96. this.segments = ss;
  97. }
  98. }
  99. // Unsafe mechanics
  100. private static final sun.misc.Unsafe UNSAFE;
  101. private static final long SBASE;
  102. private static final int SSHIFT;
  103. private static final long TBASE;
  104. private static final int TSHIFT;
  105. static {
  106. int ss, ts;
  107. try {
  108. UNSAFE = sun.misc.Unsafe.getUnsafe();
  109. Class tc = HashEntry[].class;
  110. Class sc = Segment[].class;
  111. // 获取数组中第一个元素的地址
  112. TBASE = UNSAFE.arrayBaseOffset(tc);
  113. SBASE = UNSAFE.arrayBaseOffset(sc);
  114. // 获取数组中一个元素占用的字节
  115. ts = UNSAFE.arrayIndexScale(tc);
  116. ss = UNSAFE.arrayIndexScale(sc);
  117. } catch (Exception e) {
  118. throw new Error(e);
  119. }
  120. if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
  121. throw new Error("data type scale not a power of two");
  122. // 返回无符号整数ss/ts的最高非0位前面的0的个数,包括符号位在内;如果ss/ts为负数,将会返回0,因为符号位为1
  123. SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
  124. TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
  125. }
  1. ConcurrentHashMap 默认的初始容量为 16,加载因子为 0.75,并发度为 16。而且,容量必须为2的整数次幂,通过第一个 while 循环进行保证
  2. 每段表的容量默认为 MIN_SEGMENT_TABLE_CAPACITY,也就是 2。加载因子会传递到 Segment 中,当每个 Segment 的元素数量达到一定的阈值时,需要进行 rehash 。segment 的个数不能扩容,但是其内部的元素数量可以扩容

HashEntry

  1. static final class HashEntry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V value;
  5. volatile HashEntry<K,V> next;
  6. HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
  7. this.hash = hash;
  8. this.key = key;
  9. this.value = value;
  10. this.next = next;
  11. }
  12. final void setNext(HashEntry<K,V> n) {
  13. UNSAFE.putOrderedObject(this, nextOffset, n);
  14. }
  15. // Unsafe mechanics
  16. static final sun.misc.Unsafe UNSAFE;
  17. static final long nextOffset;
  18. static {
  19. try {
  20. UNSAFE = sun.misc.Unsafe.getUnsafe();
  21. Class k = HashEntry.class;
  22. nextOffset = UNSAFE.objectFieldOffset
  23. (k.getDeclaredField("next"));
  24. } catch (Exception e) {
  25. throw new Error(e);
  26. }
  27. }
  28. }

HashEntry 用来保存 K-V 键值对数据,其中 value 是一个 volatile 关键字修饰的值,表示具有可见性。还有一个 HashEntry 类型的 next 字段,表示这是一个单链表结构。setNext(HashEntry<K,V> n) 是一个 CAS 操作,保证线程安全。

Segment

  1. // 继承至 ReentrantLock
  2. static final class Segment<K,V> extends ReentrantLock implements Serializable {
  3. private static final long serialVersionUID = 2249069246763182397L;
  4. /**
  5. * 尝试锁定的最大次数,超过该值进入阻塞获取。在多处理器上,使用有限次数的重试来维护
  6. * 在定位节点时获取的缓存
  7. */
  8. static final int MAX_SCAN_RETRIES =
  9. Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
  10. /**
  11. * The per-segment table. 通过entryAt/setEntryAt来访问元素,提供volatile语义.
  12. *
  13. * table 是由 HashEntry 实例组成的数组。。如果 HashEntry 实例的哈希值发生碰撞,则会将
  14. * 新的HashEntry 以链表的形式连接成一个链表。table 数组的成员表示 Hash 桶的一个桶,每
  15. * 个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分。若并发级别为 16,table 守护
  16. * ConcurrentHashMap 包含桶总数的 1/16
  17. */
  18. transient volatile HashEntry<K,V>[] table;
  19. /**
  20. * 元素数量。仅在锁内或具备可见性(volatile)的读取操作中访问。
  21. *
  22. * count表示每个Segment实例管理的table数组(若干个HashEntry组成的链表)包含的HashEntry实例的个数
  23. */
  24. transient int count;
  25. /**
  26. * table被更新的次数
  27. */
  28. transient int modCount;
  29. /**
  30. * table中包含的HashEntry的数量超过该变量的值,触发rehash操作
  31. */
  32. transient int threshold;
  33. /**
  34. * 加载因子
  35. */
  36. final float loadFactor;
  37. Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
  38. this.loadFactor = lf;
  39. this.threshold = threshold;
  40. this.table = tab;
  41. }
  42. ......
  43. }
  1. HashEntry 数组类型的成员 table,且因为 HashEntry 是一个单链表结构,因此 table 中的每一个元素对应一个 Hash 表中的桶
  2. Segment 继承了 ReentrantLock,因此具备了乐观锁的功能。每个 Segment 守护一个 table 数组中的元素(即 HashEntry 链表),每个 HashEntry 需要修改时,都要先获取到它对应的 Segment 锁
  3. ConcurrentHashMap 在默认并发级别会创建包含 16 个 Segment 对象的数组(见 ConcurrentHashMap 构造器),每个 Segment 大约守护整个哈希表中桶总数的 1/16,其中第 N 个哈希桶由第 N mod 16 个锁来保护。假设使用合理的哈希算法使关键字能够均匀地分部,那么这大约能使对锁的请求减少到原来的 1/16。默认并发级别的情况下,ConcurrentHashMap 支持多达 16 个并发的写入线程。

get(Object key) - 获取元素

  1. private static int hash(int h) {
  2. // Spread bits to regularize both segment and index locations,
  3. // using variant of single-word Wang/Jenkins hash.
  4. h += (h << 15) ^ 0xffffcd7d;
  5. h ^= (h >>> 10);
  6. h += (h << 3);
  7. h ^= (h >>> 6);
  8. h += (h << 2) + (h << 14);
  9. return h ^ (h >>> 16);
  10. }
  11. public V get(Object key) {
  12. Segment<K,V> s; // manually integrate access methods to reduce overhead
  13. HashEntry<K,V>[] tab;
  14. // 计算key的hash值
  15. int h = hash(key.hashCode());
  16. // 根据hash值计算分段锁的索引
  17. long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
  18. // (1)获取对应的Segment对象
  19. if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
  20. 2
  21. for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
  22. e != null; e = e.next) {
  23. K k;
  24. if ((k = e.key) == key || (e.hash == h && key.equals(k)))
  25. return e.value;
  26. }
  27. }
  28. return null;
  29. }
  1. 第一次 hash,函数为 ((h >>> segmentShift) & segmentMask),找到 segments 里面对应的 HashEntry 对应的下标,然后遍历该位置的链表
  2. 第二次 hash,函数为 ((tab.length - 1) & h),即 h 对数组长度取模,找到 Segment 里面对应的 HashEntry 的下标,然后遍历该位置的链表

可以看到,以上都是通过 Unsafe 的 getObjectVolatile() 方法来获取数组中的元素的。这是因为虽然 Segment 对象持有 HashEntry 数组的引用是 volatile 类型的,但是数组内部的元素并不是 volatile 类型,因此多线程对数组元素的修改不是线程安全的,可能会在数组中读到尚未构造完成的对象。

由于分段锁数组在构造时没进行初始化,可能读出来一个空值,因此需要先进行判断。在确定分段锁和它内部的哈希表都不为空之后,再通过哈希码读取HashEntry数组的元素,根据上面的源码可以看到,这时获得的是链表的头节点。之后再从头到尾对链表进行遍历查找,如果找到对应的值就将其返回,否则返回null。以上就是整个查找元素的过程。

另外,get() 方法不需要加锁是因为这是个只读操作,不会修改 Map 的数据结构,因此只要保证涉及到读取的数据的属性为线程可见即可,也就是使用 volatile 修饰涉及到的成员变量。

put(K key, V value) - 添加元素

  1. public V put(K key, V value) {
  2. Segment<K,V> s;
  3. if (value == null)
  4. throw new NullPointerException();
  5. int hash = hash(key.hashCode());
  6. int j = (hash >>> segmentShift) & segmentMask;
  7. if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
  8. (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
  9. s = ensureSegment(j);
  10. return s.put(key, hash, value, false);
  11. }
  12. public V putIfAbsent(K key, V value) {
  13. Segment<K,V> s;
  14. // 1. 保证传入的 value 不为 null,否则触发 NPE 异常
  15. if (value == null)
  16. throw new NullPointerException();
  17. // 2. 计算 key 的 hashcode 值
  18. int hash = hash(key.hashCode());
  19. // 3. 计算出所处分段锁的下标并通过 CAS 操作尝试获取分段锁 Segment 对象
  20. int j = (hash >>> segmentShift) & segmentMask;
  21. if ((s = (Segment<K,V>)UNSAFE.getObject
  22. (segments, (j << SSHIFT) + SBASE)) == null)
  23. // 创建一个分段锁
  24. s = ensureSegment(j);
  25. // 4. 调用 Segment#put() 完成添加操作
  26. return s.put(key, hash, value, true);
  27. }

ConcurrentHashMap 有两个 put 方法,put() 与 putIfAbsent() 的区别在于:若 key-value 键值对已存在,前者会进行覆盖,后者则不会。源码可知,两者都是通过 Segment#put(K key, int hash, V value, boolean onlyIfAbsent) 方法调用完成的。整个调用顺序如下:

  1. 保证传入的 value 不为 null,否则触发 NPE 异常
  2. 计算 key 的 hashcode 值
  3. 根据 key 的 hashcode 值计算出所处分段锁的下标并通过 CAS 操作尝试获取分段锁 Segment 对象,若分段锁不存在,则通过 ensureSegment() 方法创建一个分段锁
  4. 调用 Segment#put(K key, int hash, V value, boolean onlyIfAbsent) 完成添加操作

ensureSegment() - 保证对应下标的分段锁存在

  1. private Segment<K,V> ensureSegment(int k) {
  2. final Segment<K,V>[] ss = this.segments;
  3. // 偏移量
  4. long u = (k << SSHIFT) + SBASE; // raw offset
  5. Segment<K,V> seg;
  6. if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
  7. Segment<K,V> proto = ss[0]; // use segment 0 as prototype
  8. int cap = proto.table.length;
  9. float lf = proto.loadFactor;
  10. int threshold = (int)(cap * lf);
  11. HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
  12. if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
  13. == null) { // recheck
  14. Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
  15. while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
  16. if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
  17. break;
  18. }
  19. }
  20. }
  21. return seg;
  22. }

该方法的功能就是判断在 segments 的偏移量为 u 的位置是否存在元素,不存在则创建一个新的 HashEntry 数组变量 tab,并通过 CAS 操作添加到 segments 数组中。

Segment#put(K , int , V , boolean) - 添加元素

在 ConcurrentHashMap 中的 put 方法中只完成了分段锁的寻址,真正的元素添加操作是在对应的分段锁中完成的。

  1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  2. // 尝试获取锁,获取失败则进入 scanAndLockForPut() 方法,该方法中会检查 key 对应节点
  3. // 对象是否已存在于分段锁的哈希表中,不存在会创建,方便获取锁之后直接进行使用
  4. HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
  5. V oldValue;
  6. try {
  7. HashEntry<K,V>[] tab = table;
  8. int index = (tab.length - 1) & hash;
  9. // 获取 table 数组对应索引的元素
  10. HashEntry<K,V> first = entryAt(tab, index);
  11. for (HashEntry<K,V> e = first;;) {
  12. if (e != null) {
  13. K k;
  14. if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
  15. oldValue = e.value;
  16. if (!onlyIfAbsent) {
  17. // 替换旧元素,modCount更新
  18. e.value = value;
  19. ++modCount;
  20. }
  21. break;
  22. }
  23. e = e.next;
  24. } else {
  25. // 头插法
  26. if (node != null)
  27. node.setNext(first);
  28. else
  29. // 进来直接获取到锁,则可能走到这个分支
  30. node = new HashEntry<K,V>(hash, key, value, first);
  31. // 元素数量+1
  32. int c = count + 1;
  33. if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  34. // 超过阈值,进行扩容
  35. rehash(node);
  36. else
  37. // 设置为桶元素
  38. setEntryAt(tab, index, node);
  39. ++modCount;
  40. count = c;
  41. oldValue = null;
  42. break;
  43. }
  44. }
  45. } finally {
  46. // 释放锁
  47. unlock();
  48. }
  49. return oldValue;
  50. }

为了保证线程安全,分段锁中的 put 操作是需要进行加锁的,所以线程一开始就会获取锁,若获取成功则继续执行,若获取失败则调用 scanAndLockForPut() 方法进行自旋,在自旋过程中会先扫描哈希表查找指定的 key,如果 key 不存在就会新建一个 HashEntry 返回,这样在获取到锁之后就不必再新建了,为的是在等待锁的过程中顺便做些事情,不至于白白浪费时间。

线程在成功获取到锁之后会根据计算到的下标获取指定下标的元素。此时获取到的是链表的头节点,如果头节点不为空就对链表进行遍历查找,找到之后再根据 onlyIfAbsent 参数的值决定是否进行替换。如果遍历时没找到头节点,就会新建一个 HashEntry 节点作为头节点。在向链表添加元素之后,检查元素总数是否超过阈值,如果超过就调用 rehash 进行扩容,没超过的话就直接将数组对应下标的元素引用指向新添加的节点。setEntryAt() 方法内部是通过调用 UnSafe 的 putOrderedObject() 方法来更改数组元素引用的,这样就保证了其他线程在读取时可以读到最新的值。

Segment#scanAndLockForPut(K, int, V)

  1. private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
  2. // 从分段 table 中获取 HashEntry 对象
  3. HashEntry<K,V> first = entryForHash(this, hash);
  4. HashEntry<K,V> e = first;
  5. HashEntry<K,V> node = null;
  6. int retries = -1; // negative while locating node
  7. // 尝试获取锁,获取失败继续循环
  8. while (!tryLock()) {
  9. HashEntry<K,V> f; // to recheck first below
  10. if (retries < 0) {
  11. // 保证插入的数据对应的HashEntry存在
  12. if (e == null) {
  13. // 创建 HashEntry
  14. if (node == null) // speculatively create node
  15. node = new HashEntry<K,V>(hash, key, value, null);
  16. retries = 0;
  17. } else if (key.equals(e.key)) {
  18. // 根据 equals 方法查找对应的 node
  19. retries = 0;
  20. } else {
  21. e = e.next;
  22. }
  23. } else if (++retries > MAX_SCAN_RETRIES) {
  24. // 重试次数超过最大可尝试直接获取锁次数,进入自旋
  25. lock();
  26. break;
  27. } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
  28. e = first = f; // re-traverse if entry changed
  29. retries = -1;
  30. }
  31. }
  32. return node;
  33. }
  • 通过扫描对应分段锁中的 hash 表查找对应的 key:
    1. 如果 key 不存在,则创建 HashEntry 节点并返回
    2. 如果 key 存在,则根据 equals 方法在链表中查找已存在元素,若元素不存在则创建 HashEntry 节点并返回,否则返回 null
  • 若 while 循环获取锁的次数大于 MAX_SCAN_RETRIES,则执行 lock 方法,进入阻塞等待,直到获取到锁为止

Segment#entryForHash(Segment, int) - 获取 key 的 hash 值对应的元素

  1. static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
  2. HashEntry<K,V>[] tab;
  3. return (seg == null || (tab = seg.table) == null) ? null :
  4. // 在hashEntry数组中获取相对偏移量的数据
  5. (HashEntry<K,V>) UNSAFE.getObjectVolatile
  6. (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
  7. }

该方法的作用就是根据 key 的 hash 值从分段锁的 table 数组中查找相对应的 HashEntry 对象。可能返回 null。

Segment#entryAt(HashEntry, int) - 从分段锁的哈希表中获取index对应的元素

  1. static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
  2. return (tab == null) ? null :
  3. (HashEntry<K,V>) UNSAFE.getObjectVolatile
  4. (tab, ((long)i << TSHIFT) + TBASE);
  5. }

该方法的作用就是根据 key 的 hash 值计算出对应哈希桶元素的索引,根据该索引从定位到 HashEntry 对象。可能返回 null。

rehash(HashEntry node) - 扩容

  1. /**
  2. * 将每个列表中的节点重新分类为新表。 因为我们使用的是二次幂扩容,所以每个 bin 中的元素
  3. * 必须保持相同的索引,或者以二次幂的偏移量移动。 我们通过捕获可以重用旧节点的情况来消除
  4. * 不必要的节点创建,因为它们的下一个字段不会改变。 据统计,在默认阈值下,当表翻倍时,
  5. * 只有大约六分之一需要克隆。 一旦它们不再被可能处于并发遍历表中的任何读取器线程引用,
  6. * 它们替换的节点将是可垃圾回收的。 入口访问使用普通数组索引,因为它们之后是 volatile 表写入。
  7. */
  8. private void rehash(HashEntry<K,V> node) {
  9. // 旧表
  10. HashEntry<K,V>[] oldTable = table;
  11. int oldCapacity = oldTable.length;
  12. // 新表长度翻一倍
  13. int newCapacity = oldCapacity << 1;
  14. // 新表的扩容临界值
  15. threshold = (int)(newCapacity * loadFactor);
  16. // 创建新表
  17. HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
  18. // 方便计算索引
  19. int sizeMask = newCapacity - 1;
  20. for (int i = 0; i < oldCapacity ; i++) {
  21. HashEntry<K,V> e = oldTable[i];
  22. if (e != null) {
  23. HashEntry<K,V> next = e.next;
  24. // 在新表中的索引
  25. int idx = e.hash & sizeMask;
  26. // 原来i位置中的元素没有发生冲突,也就是没有构成链表
  27. if (next == null) // Single node on list
  28. newTable[idx] = e;
  29. else { // Reuse consecutive sequence at same slot
  30. HashEntry<K,V> lastRun = e;
  31. int lastIdx = idx;
  32. // 查找链表中最后一个非重复元素及其索引,在lastIdx之后都是相同的索引
  33. // 注意,lastRun可能是作为另一个节点(pre)的next存在的,但是没关系,在下一个循环中
  34. // pre节点的next节点会被重置
  35. for (HashEntry<K,V> last = next; last != null; last = last.next) {
  36. int k = last.hash & sizeMask;
  37. if (k != lastIdx) {
  38. lastIdx = k;
  39. lastRun = last;
  40. }
  41. }
  42. // 转移最后一部分相同key的元素到新表对应位置
  43. newTable[lastIdx] = lastRun;
  44. // Clone remaining nodes
  45. for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
  46. V v = p.value;
  47. int h = p.hash;
  48. int k = h & sizeMask;
  49. HashEntry<K,V> n = newTable[k];
  50. // 创建一个 HashEntry,并将之前数组中相同位置的元素该HashEntry的next
  51. newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
  52. }
  53. }
  54. }
  55. }
  56. int nodeIndex = node.hash & sizeMask; // add the new node
  57. node.setNext(newTable[nodeIndex]);
  58. newTable[nodeIndex] = node;
  59. table = newTable;
  60. }
  1. rehash() 方法只在 put() 方法中调用,由于 put() 中已经获取到 lock,所以可以确保线程安全
  2. 扩容时首先生成一个新的数组,大小为原数组的 2 倍,然后将原数组的元素迁移到新数组中,遍历原 HashEntry 数组,获取到每一个位置的 HashEntry 链表:
    1. 计算原数据应该转移到新数组的下标,如果原 HashEntry 链表只有一个元素,直接转移到新链表
    2. 如果有多个,采用 lastRun 机制,先获取最后几个在新数组连续的元素,先转移过去
    3. 遍历剩余的链表,根据原 HashEntry 元素生成新的 HashEntry 对象,并采用头插法放到新数组中
  3. 数据转移完之后,将新的元素再计算下标之后,头插法放到新数组中
  4. 将新数组更新到 Segment 对象中

参考

HashMap 为什么线程不安全

jdk 1.7 - HashMap

jdk 1.7 - ConcurrentHashMap