ConcurrentHashMap的实现原理

在JDK8中,ConcurrentHashMap的底层数据结构与HashMap一样,也是采用“数组+链表+红黑树”的形式。同时,它又采用锁定头节点的方式降低了锁粒度,以较低的性能代价实现了线程安全。底层数据结构的逻辑可以参考HashMap的实现,下面我重点介绍它的线程安全的实现机制。

  1. 初始化数组或头节点时,ConcurrentHashMap并没有加锁,而是CAS的方式进行原子替换(原子操作,基于Unsafe类的原子操作API)。
  2. 插入数据时会进行加锁处理,但锁定的不是整个数组,而是槽中的头节点。所以,ConcurrentHashMap中锁的粒度是槽,而不是整个数组,并发的性能很好。
  3. 扩容时会进行加锁处理,锁定的仍然是头节点。并且,支持多个线程同时对数组扩容,提高并发能力。每个线程需先以CAS操作抢任务,争抢一段连续槽位的数据转移权。抢到任务后,该线程会锁定槽内的头节点,然后将链表或树中的数据迁移到新的数组里。
  4. 查找数据时并不会加锁,所以性能很好。另外,在扩容的过程中,依然可以支持查找操作。如果某个槽还未进行迁移,则直接可以从旧数组里找到数据。如果某个槽已经迁移完毕,但是整个扩容还没结束,则扩容线程会创建一个转发节点存入旧数组,届时查找线程根据转发节点的提示,从新数组中找到目标数据。

加分回答
ConcurrentHashMap实现线程安全的难点在于多线程并发扩容,即当一个线程在插入数据时,若发现数组正在扩容,那么它就会立即参与扩容操作,完成扩容后再插入数据到新数组。在扩容的时候,多个线程共同分担数据迁移任务,每个线程负责的迁移数量是 (数组长度 >>> 3) / CPU核心数。
也就是说,为线程分配的迁移任务,是充分考虑了硬件的处理能力的。多个线程依据硬件的处理能力,平均分摊一部分槽的迁移工作。另外,如果计算出来的迁移数量小于16,则强制将其改为16,这是考虑到目前服务器领域主流的CPU运行速度,每次处理的任务过少,对于CPU的算力也是一种浪费。

JDK7

Segment (基于ReentrantLock的扩展实现) 默认16 锁粒度为Segment
Unsafe获取对应Segment,进行线程安全的put操作
size()锁定所有的Segment计算总值,性能差。采用重试机制(RETRIES_BEFORE_LOCK,指定重
试次数 2),获取可靠值。如果没有监控到发生变化(通过对比Segment.modCount),就直接返回,否则获取锁进行操作。

JDK8

锁粒度为hash对应链表头Node(put采用尾部插入), 采用lazy-load形式(有效避免初始开销)
数据存储利用volatile保证可见性
使用 CAS 等操作,在特定场景进行无锁并发操作。
使用 Unsafe、LongAdder 之类底层手段,进行极端情况的优化。
舍弃ReentrantLock,采用synchronized(JDK不断优化,性能差异小,减少内存消耗)
Unsafe优化: tabAt 就是直接利用 getObjectAcquire,避免间接调用的开销
size()分而治之的进行计数,然后求和处理。CounterCell 的操作,是基于 java.util.concurrent.atomic.LongAdder 进行的,是一种 JVM 利用空间换取更高效率的方法,利用了Striped64内部的复杂逻辑。

源码解析

Node

  1. //代表 table 的地址是 volatile,而不是数组 元素是 volatile。
  2. //这处是为了使得 table 在扩容时对其他线程也保持可见性。
  3. transient volatile Node<K,V>[] table;
  4. static class Node<K,V> implements Map.Entry<K,V> {
  5. final int hash;
  6. final K key;
  7. volatile V val;
  8. volatile Node<K,V> next;
  9. Node(int hash, K key, V val, Node<K,V> next) {
  10. this.hash = hash;
  11. this.key = key;
  12. this.val = val;
  13. this.next = next;
  14. }
  15. }

get

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. // 计算 hash 值
  4. int h = spread(key.hashCode());
  5. if ((tab = table) != null && (n = tab.length) > 0 &&
  6. (e = tabAt(tab, (n - 1) & h)) != null) {
  7. // 读取首节点的 Node 元素
  8. if ((eh = e.hash) == h) {
  9. // 如果 Key 相等则直接返回首节点
  10. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  11. return e.val;
  12. }
  13. // eh(hash 值)为负值表示正在扩容,这个时候查的是 ForwardingNode 的 find 方法来定位到 nextTable 来
  14. // eh=-1,说明该节点是一个 ForwardingNode,正在迁移,此时调用 ForwardingNode 的 find 方法去 nextTable 里找。
  15. // eh=-2,说明该节点是一个 TreeBin,此时调用 TreeBin 的 find 方法遍历红黑树, 由于红黑树有可能正在旋转变色,
  16. // 所以 find 里会有读写锁。
  17. // eh>=0,说明该节点下挂的是一个链表,直接遍历该链表即可。
  18. else if (eh < 0)
  19. return (p = e.find(h, key)) != null ? p.val : null;
  20. while ((e = e.next) != null) {
  21. // 直接在链表遍历查询
  22. if (e.hash == h &&
  23. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  24. return e.val;
  25. }
  26. }
  27. return null;
  28. }

put

  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. // 1 key 和 value 不能为 null。
  3. if (key == null || value == null) throw new NullPointerException();
  4. // 2 通过 spread 函数获取 hash 值(与 HashMap 类似,也加入了扰动)
  5. int hash = spread(key.hashCode());
  6. //用来计算在这个桶总共有多少元素,用来控制扩容或者转换为红 黑树。
  7. int binCount = 0;
  8. for (Node<K,V>[] tab = table;;) {
  9. Node<K,V> f; int n, i, fh;
  10. // 3 table 为空,则先初始化数组。
  11. if (tab == null || (n = tab.length) == 0)
  12. tab = initTable();
  13. // 4 如果 table 对应位置为空,则调用 CAS 插入相应的数据,注意这个地方没有加锁。
  14. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  15. if (casTabAt(tab, i, null,
  16. new Node<K,V>(hash, key, value, null)))
  17. break; // no lock when adding to empty bin
  18. }
  19. // 5 如果取出来的节点的 hash 值是 MOVED(-1)的话,则表示当前正在对这个数组
  20. // 进行扩容,复制到新的数组,则当前线程也去帮助复制。
  21. else if ((fh = f.hash) == MOVED)
  22. tab = helpTransfer(tab, f);
  23. else {
  24. V oldVal = null;
  25. // 6 如果这个位置有元素的话,则对该节点加 synchronized 锁。
  26. synchronized (f) {
  27. if (tabAt(tab, i) == f) {
  28. // 7 如果是链表的话(hash 大于 0),就对这个链表的所有元素进行 遍历。
  29. if (fh >= 0) {
  30. binCount = 1;
  31. for (Node<K,V> e = f;; ++binCount) {
  32. K ek;
  33. // 8 若找到 Key 和 Key 的 hash 值都一样的节点,则用新 的 value 值替换旧值。
  34. if (e.hash == hash &&
  35. ((ek = e.key) == key ||
  36. (ek != null && key.equals(ek)))) {
  37. oldVal = e.val;
  38. if (!onlyIfAbsent)
  39. e.val = value;
  40. break;
  41. }
  42. // 9 若没有找到相同 Key 节点的话,则在链表后面添加新 的节点。
  43. Node<K,V> pred = e;
  44. if ((e = e.next) == null) {
  45. pred.next = new Node<K,V>(hash, key,
  46. value, null);
  47. break;
  48. }
  49. }
  50. }
  51. // 10 当前桶的数据结构为红黑树时,调用 putTreeVal 方法将元素 添加到树中。
  52. else if (f instanceof TreeBin) {
  53. Node<K,V> p;
  54. binCount = 2;
  55. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  56. value)) != null) {
  57. oldVal = p.val;
  58. if (!onlyIfAbsent)
  59. p.val = value;
  60. }
  61. }
  62. }
  63. }
  64. // 11 当在同一个节点的数目达到 TREEIFY_THRESHOLD(8)个的时候,则扩张 数组或将给节点的数据转为树。
  65. if (binCount != 0) {
  66. if (binCount >= TREEIFY_THRESHOLD)
  67. treeifyBin(tab, i);
  68. if (oldVal != null)
  69. return oldVal;
  70. break;
  71. }
  72. }
  73. }
  74. // 12 执行 addCount()方法尝试更新元素个数 baseCount。
  75. addCount(1L, binCount);
  76. return null;
  77. }

image.pngimage.png