简介

concurrentHashMap是个线程安全的hashMap。

JDK7的ConcurrentHashMap

jdk7里使用的是数组segment[]来存储某个hash的值,在每个segment里又维护一个hashEntity[]来存储key,value的值,在多线程并发访问同个concurrentHashMap时,会锁住一个segment。这是使用了分段锁的概念。

HashTable

HashTable也是个线程安全的hashMap。它于ConcurrentHashMap的区别是使用HashTable时候会锁住整个HashTable。每个segment下的HashEntity又类似于HashTable。
HashTable由于使用时候需要锁住整个HashTable,所以性能上不如ConcurrentHashMap。但是一方面为了兼容老版本,另外一方面,HashTable的迭代器时强一致性,ConcurrentHashMap的迭代器时弱一致性,ConcurrentHashMap并不完全代替HashTable。

put(K key,V value)插入值

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }

如果碰到key重复的,默认时替换。

  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. //key和value都不能为空
  3. if (key == null || value == null) throw new NullPointerException();
  4. int hash = spread(key.hashCode());//计算hash位置
  5. int binCount = 0;
  6. for (Node<K,V>[] tab = table;;) {
  7. Node<K,V> f; int n, i, fh;
  8. if (tab == null || (n = tab.length) == 0)
  9. tab = initTable(); //初始化table容器
  10. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//通过hash值对应的数组下标得到第一个节点; 以volatile读的方式来读取table数组中的元素,保证每次拿到的数据都是最新的
  11. if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))////如果该下标返回的节点为空,则直接通过cas将新的值封装成node插入即可;如果cas失败,说明存在竞争,则进入下一次循环
  12. break; // no lock when adding to empty bin,插入头节点,结束循环
  13. }
  14. else if ((fh = f.hash) == MOVED) //moved=-1的时候,说明table正在扩容
  15. tab = helpTransfer(tab, f); //帮助扩容
  16. else {
  17. V oldVal = null;
  18. synchronized (f) { //锁住当前node
  19. if (tabAt(tab, i) == f) {//双重检查,
  20. //链式结构
  21. if (fh >= 0) {
  22. binCount = 1;
  23. for (Node<K,V> e = f;; ++binCount) {
  24. K ek;
  25. //hash相等,且key相等 替换值
  26. if (e.hash == hash && ((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
  27. oldVal = e.val;
  28. if (!onlyIfAbsent) {
  29. e.val = value;//修改
  30. }
  31. break;
  32. }
  33. //循环到末尾未找到。新的值加在链表尾部
  34. Node<K,V> pred = e;
  35. if ((e = e.next) == null) {
  36. pred.next = new Node<K,V>(hash, key,value, null);
  37. break;
  38. }
  39. }
  40. }
  41. //如果是树形结构,执行树存放
  42. else if (f instanceof TreeBin) {
  43. Node<K,V> p;
  44. binCount = 2;
  45. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
  46. oldVal = p.val;//节点key重复
  47. if (!onlyIfAbsent)
  48. p.val = value;//修改值
  49. }
  50. }
  51. }
  52. }
  53. //添加成功
  54. if (binCount != 0) {
  55. //链表最大大于等于8 的话转成树形结构(红黑树)
  56. if (binCount >= TREEIFY_THRESHOLD)
  57. treeifyBin(tab, i);
  58. if (oldVal != null)
  59. return oldVal;//修改的,直接返回旧的值
  60. break;
  61. }
  62. }
  63. }
  64. addCount(1L, binCount);//map数据count+1
  65. return null;
  66. }

这段代码可以看出:
jdk8中数据时存在Node[]的数组中
node位空的时候会用cas去插入新的节点。
节点不为空的时候,会锁这个Node,而不是锁住整个table
第二个到第七个新key插入,会以链式插在单向链表得尾部
如果链表长度>=8的时候。链表会转换成红黑树。
扩容的时候,其他线程可以协助扩容

spread(key.hashCode())获取Node节点下标位置

  1. static final int spread(int h) {
  2. return (h ^ (h >>> 16)) & HASH_BITS;
  3. }

initTable()初始化容器

  1. private final Node<K,V>[] initTable() {
  2. Node<K,V>[] tab; int sc;
  3. while ((tab = table) == null || tab.length == 0) {
  4. if ((sc = sizeCtl) < 0) {
  5. Thread.yield(); // lost initialization race; just spin 获取锁失败,进入自旋
  6. }else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//cas锁,比较且交换。
  7. try {
  8. if ((tab = table) == null || tab.length == 0) {
  9. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//默认长度是16
  10. @SuppressWarnings("unchecked")
  11. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  12. table = tab = nt;
  13. sc = n - (n >>> 2);
  14. }
  15. } finally {
  16. sizeCtl = sc;//边界值
  17. }
  18. break;
  19. }
  20. }
  21. return tab;
  22. }

初始化的时候会同过cas。保证只有一个线程会初始化table。其他线程cas失败的化,会通过yield让出时间片。自旋等待初始化线程初始化table结束
初始化会默认创建一个16个槽位的table。

并设置边界值sc = n - (n >>> 2);及槽位的0.75(扩容因子)
也可以在初始化ConcurrentHashMap的时候设置初始化槽位值

  1. public ConcurrentHashMap(int initialCapacity) {
  2. if (initialCapacity < 0)
  3. throw new IllegalArgumentException();
  4. int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
  5. MAXIMUM_CAPACITY :
  6. tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
  7. this.sizeCtl = cap;
  8. }
  9. private static final int tableSizeFor(int c) {
  10. int n = c - 1;
  11. n |= n >>> 1;
  12. n |= n >>> 2;
  13. n |= n >>> 4;
  14. n |= n >>> 8;
  15. n |= n >>> 16;
  16. return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
  17. }

addCount(1L, binCount).添加count。扩容判断

  1. private final void addCount(long x, int check) {
  2. CounterCell[] as; long b, s;
  3. //counterCells不为空
  4. if ((as = counterCells) != null
  5. //或者修改BASECOUNT失败
  6. || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
  7. CounterCell a; long v; int m;
  8. boolean uncontended = true;
  9. if (as == null || (m = as.length - 1) < 0 //counterCells为空
  10. ||(a = as[ThreadLocalRandom.getProbe() & m]) == null //随机取余为空
  11. ||!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //修改槽位失败(并发)
  12. fullAddCount(x, uncontended);
  13. return;
  14. }
  15. if (check <= 1) //根节点添加,无需扩容
  16. return;
  17. s = sumCount(); //获取map的size
  18. }
  19. //检查是否需要扩容
  20. if (check >= 0) {
  21. Node<K,V>[] tab, nt; int n, sc;
  22. while (s >= (long)(sc = sizeCtl) && (tab = table) != null && //map.size >= 扩容预伐值,table不为空
  23. (n = tab.length) < MAXIMUM_CAPACITY) { //小于最大容量
  24. int rs = resizeStamp(n);
  25. //其他线程正在扩容
  26. if (sc < 0) {
  27. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || //校验异常 sizeCtl 变化了||容结束了,不再有线程进行扩容
  28. sc == rs + MAX_RESIZERS || (nt = nextTable) == null || //帮助线程数已经达到最大 || 结束扩容了
  29. transferIndex <= 0) //转移状态变化了
  30. break;
  31. //cas锁 帮助扩容
  32. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
  33. transfer(tab, nt);
  34. }
  35. //扩容
  36. else if (U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2))
  37. transfer(tab, null);
  38. s = sumCount();
  39. }
  40. }
  41. }

ConcurrentHashMap为了防止并发,会维护一个baseCount 和一个CounterCell[]
第一次count的时候回去初始CounterCell
其他线程进来的时候会先去cas修改baseCount+1,如果失败的话,会cas CounterCell数组中的某一个元素让他+1.如果还是失败的话回去扩容这个CounterCell
计算count之后,大于阈值的话,如果有其他线程在扩容的话,会尝试协助扩容

  1. private final void fullAddCount(long x, boolean wasUncontended) {
  2. int h;
  3. if ((h = ThreadLocalRandom.getProbe()) == 0) {
  4. ThreadLocalRandom.localInit(); // force initialization
  5. h = ThreadLocalRandom.getProbe();
  6. wasUncontended = true;
  7. }
  8. boolean collide = false; // True if last slot nonempty 如果最后一个槽非空,为true
  9. for (;;) {
  10. CounterCell[] as; CounterCell a; int n; long v;
  11. if ((as = counterCells) != null && (n = as.length) > 0) {//counterCells已经初始化了,且counterCells已经被赋值了
  12. if ((a = as[(n - 1) & h]) == null) {//某个counterCell槽位没有值
  13. if (cellsBusy == 0) { // Try to attach new Cell 尝试附加新的元素
  14. CounterCell r = new CounterCell(x); // Optimistic create 乐观创建
  15. if (cellsBusy == 0 &&
  16. U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//加锁
  17. boolean created = false;
  18. try { // Recheck under lock
  19. CounterCell[] rs; int m, j;
  20. if ((rs = counterCells) != null &&
  21. (m = rs.length) > 0 &&
  22. rs[j = (m - 1) & h] == null) {
  23. rs[j] = r;//初始化槽位
  24. created = true;
  25. }
  26. } finally {
  27. cellsBusy = 0;
  28. }
  29. if (created)
  30. break;
  31. continue; // Slot is now non-empty 槽位不为空了
  32. }
  33. }
  34. collide = false;
  35. }
  36. else if (!wasUncontended) // CAS already known to fail
  37. wasUncontended = true; // Continue after rehash 之后继续重复
  38. else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))//给a槽位+1
  39. break;
  40. else if (counterCells != as || n >= NCPU)
  41. collide = false; // At max size or stale 最大扩容
  42. else if (!collide)
  43. collide = true;
  44. else if (cellsBusy == 0 &&
  45. U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
  46. try {
  47. if (counterCells == as) {// Expand table unless stale
  48. CounterCell[] rs = new CounterCell[n << 1];//扩容
  49. for (int i = 0; i < n; ++i)
  50. rs[i] = as[i];
  51. counterCells = rs;
  52. }
  53. } finally {
  54. cellsBusy = 0;
  55. }
  56. collide = false;
  57. continue; // Retry with expanded table
  58. }
  59. h = ThreadLocalRandom.advanceProbe(h);
  60. }
  61. else if (cellsBusy == 0 && counterCells == as &&
  62. U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//case保证只有一个线程初始化counterCells
  63. boolean init = false;
  64. try { // Initialize table
  65. if (counterCells == as) {
  66. CounterCell[] rs = new CounterCell[2];//传建一个长度为2的CounterCell
  67. rs[h & 1] = new CounterCell(x);
  68. counterCells = rs;
  69. init = true;
  70. }
  71. } finally {
  72. cellsBusy = 0;//释放cas锁,volatile解决可见性
  73. }
  74. if (init)
  75. break;
  76. }
  77. else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))//上面都失败的话回去尝试更新baseCount。
  78. break; // Fall back on using base
  79. }
  80. }

并发的时候,第一个线程回去cas 去初始化长度为2的CounterCell[]并随机将值放入一个槽位,
其他线程自旋,并cas尝试更新baseCount
CounterCell初始化好之后,线程会随机cas更新某个槽位
发现某个槽位没有实例化,则cas锁去实例化这个槽位
这些cas volatile修饰的cellsBusy值。保证只有一个线程会去初始化/修改/实例化某个槽位或者值。也同时保证可见性

sumCount()获取map的size

  1. final long sumCount() {
  2. CounterCell[] as = counterCells; CounterCell a;
  3. long sum = baseCount;
  4. if (as != null) {
  5. for (int i = 0; i < as.length; ++i) {
  6. if ((a = as[i]) != null)
  7. sum += a.value;
  8. }
  9. }
  10. return sum;
  11. }

计数size的时候,会将baseCount的值+遍历所有的CounterCell里的值

resizeSteam(int n)获取当前扩容的线程数

  1. static final int resizeStamp(int n) {
  2. return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
  3. }

helpTransfer(Node[] tab, Node f)协助扩容

  1. final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
  2. Node<K,V>[] nextTab; int sc;
  3. if (tab != null && (f instanceof ForwardingNode) &&
  4. (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  5. int rs = resizeStamp(tab.length);
  6. while (nextTab == nextTable && table == tab &&
  7. (sc = sizeCtl) < 0) {
  8. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  9. sc == rs + MAX_RESIZERS || transferIndex <= 0)
  10. break;
  11. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//cas去确定扩容线程满了没
  12. transfer(tab, nextTab);
  13. break;
  14. }
  15. }
  16. return nextTab;
  17. }
  18. return table;
  19. }

做协助库容前的准备,cas去确定扩容线程是否满了

transfer(tab, nextTab)扩容或协助扩容的真正方法

  1. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  2. int n = tab.length, stride;
  3. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
  4. stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围
  5. //新的table 未创建。创建新的table
  6. if (nextTab == null) { // initiating 初始化新的table
  7. try {
  8. @SuppressWarnings("unchecked")
  9. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  10. nextTab = nt;
  11. } catch (Throwable ex) { // try to cope with OOME
  12. sizeCtl = Integer.MAX_VALUE; //扩容失败 sizeCtl,该容器不再扩容
  13. return;
  14. }
  15. nextTable = nextTab;
  16. transferIndex = n;
  17. }
  18. int nextn = nextTab.length;
  19. //转发节点,占位用的。其他线程发现这个fwd占位符者跳过这个节点
  20. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  21. // 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
  22. boolean advance = true;
  23. boolean finishing = false; // to ensure sweep before committing nextTab 在提交nextTab之前确保清除
  24. for (int i = 0, bound = 0;;) {//迭代去扩容
  25. Node<K,V> f; int fh;
  26. while (advance) {
  27. int nextIndex, nextBound;
  28. // 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
  29. // 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。
  30. // 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。
  31. if (--i >= bound || finishing)
  32. advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
  33. // 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。
  34. else if ((nextIndex = transferIndex) <= 0) {
  35. // 如果小于等于0,说明没有区间了 ,i 改成 -1,推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了
  36. // 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断
  37. i = -1;
  38. advance = false;
  39. }
  40. // CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用
  41. else if (U.compareAndSwapInt
  42. (this, TRANSFERINDEX, nextIndex,
  43. nextBound = (nextIndex > stride ?
  44. nextIndex - stride : 0))) {
  45. bound = nextBound; // 这个值就是当前线程可以处理的最小当前区间最小下标
  46. i = nextIndex - 1;// 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
  47. advance = false; // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。
  48. }
  49. }
  50. if (i < 0 || i >= n || i + n >= nextn) {
  51. int sc;
  52. if (finishing) {
  53. nextTable = null;
  54. table = nextTab;//结束扩容
  55. sizeCtl = (n << 1) - (n >>> 1);
  56. return;
  57. }
  58. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  59. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  60. return;
  61. finishing = advance = true;
  62. i = n; // recheck before commit
  63. }
  64. }
  65. else if ((f = tabAt(tab, i)) == null)
  66. advance = casTabAt(tab, i, null, fwd);
  67. else if ((fh = f.hash) == MOVED)
  68. advance = true; // already processed
  69. else {
  70. synchronized (f) {//扩容某个槽位
  71. if (tabAt(tab, i) == f) {
  72. Node<K,V> ln, hn;//定义一个高位,一个低位
  73. if (fh >= 0) {//链式
  74. int runBit = fh & n;
  75. Node<K,V> lastRun = f;
  76. for (Node<K,V> p = f.next; p != null; p = p.next) {//遍历
  77. int b = p.hash & n;
  78. if (b != runBit) {
  79. runBit = b;
  80. lastRun = p;//找出lastRun节点
  81. }
  82. }
  83. if (runBit == 0) {
  84. ln = lastRun;
  85. hn = null;
  86. }
  87. else {
  88. hn = lastRun;
  89. ln = null;
  90. }
  91. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  92. int ph = p.hash; K pk = p.key; V pv = p.val;
  93. if ((ph & n) == 0)
  94. ln = new Node<K,V>(ph, pk, pv, ln);//使用头插法放地位
  95. else
  96. hn = new Node<K,V>(ph, pk, pv, hn);//使用头插法放高位
  97. }
  98. setTabAt(nextTab, i, ln);//地位
  99. setTabAt(nextTab, i + n, hn);//高位
  100. setTabAt(tab, i, fwd);
  101. advance = true;
  102. }
  103. else if (f instanceof TreeBin) {//红黑树
  104. TreeBin<K,V> t = (TreeBin<K,V>)f;
  105. TreeNode<K,V> lo = null, loTail = null;
  106. TreeNode<K,V> hi = null, hiTail = null;
  107. int lc = 0, hc = 0;
  108. for (Node<K,V> e = t.first; e != null; e = e.next) {
  109. int h = e.hash;
  110. TreeNode<K,V> p = new TreeNode<K,V>
  111. (h, e.key, e.val, null, null);
  112. if ((h & n) == 0) {
  113. if ((p.prev = loTail) == null)
  114. lo = p;
  115. else
  116. loTail.next = p;
  117. loTail = p;
  118. ++lc;
  119. }
  120. else {
  121. if ((p.prev = hiTail) == null)
  122. hi = p;
  123. else
  124. hiTail.next = p;
  125. hiTail = p;
  126. ++hc;
  127. }
  128. }
  129. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) ://判断需不需要恢复链式
  130. (hc != 0) ? new TreeBin<K,V>(lo) : t;
  131. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  132. (lc != 0) ? new TreeBin<K,V>(hi) : t;
  133. setTabAt(nextTab, i, ln);
  134. setTabAt(nextTab, i + n, hn);
  135. setTabAt(tab, i, fwd);
  136. advance = true;
  137. }
  138. }
  139. }
  140. }
  141. }
  142. }

扩容前会细分一个线程扩容范围将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
并创建一个原来table两倍大的容器,并重原来容器的尾部开始向前扩容。
扩容某个槽位会先synchronized锁住槽位,防止并发。
对于链表会先定义高地位两个链表,遍历找到lastRun的节点。在重新计算p.hash & n计算新的hash槽位是在地位还是高位,再次遍历采用头插入法将将节点分高低链表。
低位的链表放在新table原来位置,高位的放在新table原来槽位*2的位置
红黑树也同样重新hash分高低位链表,如果单个链表的长度小于8,则恢复链式结构

get(k key)获取值

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode());//获取hash
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) {//(n-1)&h 的下标有值
  6. if ((eh = e.hash) == h) {
  7. if ((ek = e.key) == key || (ek != null && key.equals(ek)))//根节点有值,直接取
  8. return e.val;
  9. }
  10. else if (eh < 0)//表示正在扩容,那么就从forwardingNode里查找
  11. return (p = e.find(h, key)) != null ? p.val : null;
  12. while ((e = e.next) != null) {//遍历查找
  13. if (e.hash == h &&
  14. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  15. return e.val;
  16. }
  17. }
  18. return null;
  19. }