ConcurrentHashMap的内部数据结构

ConcurrentHashMap.png

重要属性

int sizeCtl

这是一个多功能的字段,可以用来记录参与Map扩展的线程数量,也用来记录新的table的扩容阈值

CounterCell[] counterCells

用来记录元素的个数,这是一个数组,使用数组来记录,是因为避免多线程竞争时,可能产生的冲突。使用了数组,那么多个线程同时修改数量时,极有可能实际操作数组中不同的单元,从而减少竞争。

Node[] table

实际存放Map内容的地方,一个map实际上就是一个Node数组,每个Node里包含了key和value的信息。

Node[] nextTable

当table需要扩充时,会把新的数据填充到nextTable中,也就是说nextTable是扩充后的Map。
以上就是ConcurrentHashMap的核心元素,其中最值得注意的便是Node。

内部类Node

  1. static class Node<K,V> implements Map.Entry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V val;
  5. volatile Node<K,V> next;
  6. Node(int hash, K key, V val, Node<K,V> next) {
  7. this.hash = hash;
  8. this.key = key;
  9. this.val = val;
  10. this.next = next;
  11. }
  12. public final K getKey() { return key; }
  13. public final V getValue() { return val; }
  14. public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
  15. public final String toString(){ return key + "=" + val; }
  16. public final V setValue(V value) {
  17. throw new UnsupportedOperationException();
  18. }
  19. public final boolean equals(Object o) {
  20. Object k, v, u; Map.Entry<?,?> e;
  21. return ((o instanceof Map.Entry) &&
  22. (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
  23. (v = e.getValue()) != null &&
  24. (k == key || k.equals(key)) &&
  25. (v == (u = val) || v.equals(u)));
  26. }
  27. /**
  28. * Virtualized support for map.get(); overridden in subclasses.
  29. */
  30. Node<K,V> find(int h, Object k) {
  31. Node<K,V> e = this;
  32. if (k != null) {
  33. do {
  34. K ek;
  35. if (e.hash == h &&
  36. ((ek = e.key) == k || (ek != null && k.equals(ek))))
  37. return e;
  38. } while ((e = e.next) != null);
  39. }
  40. return null;
  41. }
  42. }

Node的类结构:
ConcurrentHashMap - 图2
可以看到,在Map中的Node并非简单的Node对象,实际上,它有可能是Node对象,也有可能是一个Treebin或者ForwardingNode。
ReservationNode.png

  • ForwardingNode:A node inserted at head of bins during transfer operations.
  • ReservationNode:A place-holder node used in computeIfAbsent and compute.
  • TreeBin:TreeNodes used at the heads of bins. TreeBins do not hold user keys or values, but instead point to list of TreeNodes and their root. They also maintain a parasitic read-write lock forcing writers (who hold bin lock) to wait for readers (who do not) to complete before tree restructuring operations.
  • TreeNode:Nodes for use in TreeBins

一个正常的Map可能是长这样的:
ConcurrentHashMap - 图4
上图中,绿色部分表示Node数组,里面的元素是Node,也就是链表的头部,当两个元素在数据中的位置发生冲突时,就将它们通过链表的形式,放在一个槽位中。
当数组槽位对应的是一个链表时,在一个链表中查找key只能使用简单的遍历,这在数据不多时,还是可以接受的,当冲突数据比较多时,这种简单的遍历就有点慢了。
因此,在具体实现中,当链表的长度大于等于8时,会将链表树状化,也就是变成一颗红黑树。

如下图所示,其中一个槽位就变成了一颗树,这就是TreeBin(在TreeBin中使用TreeNode构造树)。
ConcurrentHashMap - 图5
当数组容量快满时,即超过75%的容量时,数组还需要进行扩容,在扩容过程中,如果老的数组已经完成了复制,那么就会将老数组中的元素使用ForwardingNode对象替代,表示当前槽位的数据已经处理了,不需要再处理了,这样,当有多个线程同时参与扩容时,就不会冲突。

put()方法的实现

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }
  4. final V putVal(K key, V value, boolean onlyIfAbsent) {
  5. if (key == null || value == null) throw new NullPointerException();//key和value都不能为空
  6. int hash = spread(key.hashCode());//将散列的较高位传播到较低位,并将最高位强制设为0,减少冲突
  7. int binCount = 0;
  8. for (Node<K,V>[] tab = table;;) {
  9. Node<K,V> f; int n, i, fh;
  10. if (tab == null || (n = tab.length) == 0)
  11. tab = initTable();
  12. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  13. if (casTabAt(tab, i, null,
  14. new Node<K,V>(hash, key, value, null)))
  15. break; // no lock when adding to empty bin
  16. }
  17. else if ((fh = f.hash) == MOVED)//节点正在转发
  18. tab = helpTransfer(tab, f);
  19. else {
  20. V oldVal = null;
  21. //加锁进行旧节点的更新
  22. synchronized (f) {
  23. if (tabAt(tab, i) == f) {
  24. //链表
  25. if (fh >= 0) {
  26. binCount = 1;
  27. for (Node<K,V> e = f;; ++binCount) {
  28. K ek;
  29. if (e.hash == hash &&
  30. ((ek = e.key) == key ||
  31. (ek != null && key.equals(ek)))) {
  32. oldVal = e.val;
  33. if (!onlyIfAbsent)
  34. e.val = value;
  35. break;
  36. }
  37. Node<K,V> pred = e;
  38. if ((e = e.next) == null) {
  39. pred.next = new Node<K,V>(hash, key, value, null);//头插
  40. break;
  41. }
  42. }
  43. }
  44. //树
  45. else if (f instanceof TreeBin) {
  46. Node<K,V> p;
  47. binCount = 2;
  48. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
  49. oldVal = p.val;
  50. if (!onlyIfAbsent)
  51. p.val = value;
  52. }
  53. }
  54. }
  55. }
  56. //转换成红黑树
  57. if (binCount != 0) {
  58. if (binCount >= TREEIFY_THRESHOLD)
  59. treeifyBin(tab, i);
  60. if (oldVal != null)
  61. return oldVal;
  62. break;
  63. }
  64. }
  65. }
  66. addCount(1L, binCount);
  67. return null;
  68. }

public V put(K key, V value):将给定的key和value对存入HashMap,它的工作主要有以下几个步骤:

  1. 如果没有初始化数组,则尝试初始化数组
  2. 如果当前正在扩容,则参与帮助扩容(调用helpTransfer()方法)
  3. 将给定的key,value 放入对应的槽位
  4. 统计元素总数
  5. 触发扩容操作

根据以上主要4个步骤,来依次详细说明一下:

初始化数组

初始化数据会生成一个Node数组:

  1. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];

默认情况下,n为16。同时设置sizeCtl为·n - (n >>> 2); 这意味着sizeCtl为n的75%,表示Map的size,也就是说ConcurrentHashMap的负载因子是0.75。(为了避免冲突,Map的容量是数组的75%,超过这个阈值,就会扩容)

参与帮助扩容

  1. else if ((fh = f.hash) == MOVED)
  2. tab = helpTransfer(tab, f);

如果一个节点的hash是MOVED,则表示这是一个ForwardingNode,也就是当前正在扩容中,为了尽快完成扩容,当前线程就会参与到扩容的工作中,而不是等待扩容操作完成,这是ConcurrentHashMap高性能的原因。
而代码中的f.hash==MOVE语义上等同于f instanceof ForwardingNode,但是使用整数相等的判断的效率要远远高于instanceof,所以,这里也是一处对性能的极限优化。

将key,value 放入对应的槽位

在大部分情况下,应该会走到这一步,也就是将key和value放入数组中。在这个操作中会使用大概如下操作:

  1. Node<K,V> f;
  2. synchronized (f) {
  3. if(所在槽位是一个链表)
  4. 插入链表
  5. else if(所在槽位是红黑树)
  6. 插入树
  7. if(链表长度大于8[TREEIFY_THRESHOLD])
  8. 将链表树状化
  9. }

可以看到,这使用了synchronized关键字,锁住了Node对象。由于在绝大部分情况下,不同线程大概率会操作不同的Node,因此这里的竞争应该不会太大。
并且随着数组规模越来越大,竞争的概率会越来越小,因此ConcurrentHashMap有了极好的并行性。

统计元素总数

为了有一个高性能的size()方法,ConcurrentHashMap使用了单独的方法来统计元素总数,元素数量统计在CounterCell数组中:

  1. CounterCell[] counterCells;
  2. @sun.misc.Contended static final class CounterCell {
  3. volatile long value;
  4. CounterCell(long x) { value = x; }
  5. }

CounterCell使用伪共享优化,具有很高的读写性能。counterCells中所有的成员的value相加,就是整个Map的大小。这里使用数组,也是为了防止冲突。
如果简单使用一个变量,那么多线程累加一个计数器时,难免要有竞争,现在分散到一个数组中,这种竞争就小了很多,对并发就更加友好了。
累加的主要逻辑如下:

  1. if (as == null || (m = as.length - 1) < 0 ||
  2. //不同线程映射到不同的数组元素,防止冲突
  3. (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
  4. //使用CAS直接增加对应的数据
  5. !(uncontended =
  6. U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))
  7. //如果有竞争,在这里会重试,如果竞争严重还会将CounterCell[]数组扩容,以减少竞争

触发扩容操作

最后,ConcurrentHashMap还会检查是否需要扩容,它会检查当前Map的大小是否超过了阈值,如果超过了,还会进行扩容。
ConcurrentHashMap的扩容过程非常巧妙,它并没有完全打乱当前已有的元素位置,而是在数组扩容2倍后,将一半的元素移动到新的空间中。
所有的元素根据高位是否为1分为low节点和high节点:

  1. //n是数组长度,数组长度是2的幂次方,因此一定是100 1000 10000 100000这种二进制数字
  2. //这里将low节点串一起, high节点串一起
  3. if ((ph & n) == 0)
  4. ln = new Node<K,V>(ph, pk, pv, ln);
  5. else
  6. hn = new Node<K,V>(ph, pk, pv, hn);

接着,重新放置这些元素的位置:

  1. //low节点留在当前位置
  2. setTabAt(nextTab, i, ln);
  3. //high节点放到扩容后的新位置,新位置距离老位置n
  4. setTabAt(nextTab, i + n, hn);
  5. //扩容完成,用ForwardingNode填充
  6. setTabAt(tab, i, fwd);

下图显示了 从8扩充到16时的可能得一种扩容情况,注意,新的位置总是在老位置的后面n个槽位(n为原数组大小)
image.gifimage.png
这样做的好处是,每个元素的位置不需要重新计算,进行查找时,由于总是会对n-1(一定是一个类似于1111 11111 111111这样的二进制数)按位与,因此,high类的节点自然就会出现在+n的位置上。

get()方法的实现

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode());
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) {//数组中的元素就是要找的key
  6. if ((eh = e.hash) == h) {
  7. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  8. return e.val;
  9. }
  10. else if (eh < 0)//调用Node的find方法查找元素
  11. return (p = e.find(h, key)) != null ? p.val : null;
  12. while ((e = e.next) != null) {//从链表中查找
  13. if (e.hash == h &&
  14. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  15. return e.val;
  16. }
  17. }
  18. return null;
  19. }

与put()方法相比,get()方法就比较简单了。步骤如下:

  1. 根据hash值 得到对应的槽位 (n - 1) & h
  2. 如果当前槽位第一个元素key就和请求的一样,直接返回
  3. 否则调用Node的find()方法查找
    1. 对于ForwardingNode 使用的是 ForwardingNode.find()
    2. 对于红黑树 使用的是TreeBin.find()
  4. 对于链表型的槽位,依次顺序查找对应的key