ConcurrentHashMap的面试问题:

  1. ConcurrentHashMap和HashMap的区别是什么?为什么ConcurrentHashMap线程安全
  2. Hash扩容为什么要扩大两倍

JDK1.7

68747470733a2f2f63732d6e6f7465732d313235363130393739362e636f732e61702d6775616e677a686f752e6d7971636c6f75642e636f6d2f696d6167652d32303139313230393030313033383032342e706e67.png

java7的ConcurrentHashMap采用了分段锁(Segment),每个段维护了几个HashEntry,多个线程可以访问不同段的HashEntry,从而使并发度提高,并发度就是Segment的个数。

默认创建16个Segment,并发度默认为16。

在执行 size 操作时,需要遍历所有 Segment 然后把 count 累计起来。

ConcurrentHashMap 在执行 size 操作时先尝试不加锁,如果连续两次不加锁操作得到的结果一致,那么可以认为这个结果是正确的。

尝试次数使用 RETRIES_BEFORE_LOCK 定义,该值为 2,retries 初始值为 -1,因此尝试次数为 3。

如果尝试的次数超过 3 次,就需要对每个 Segment 加锁。

JDK1.8

java8_concurrenthashmap.png

Java8的ConcurrentHashMap相对于java7,不再是之前的Segment数组+HashEntry数组+链表,而是Node数组+链表/红黑树。当链表达到一定长度时,链表会转换为红黑树。

初始化initTable()

  1. private final Node<K,V>[] initTable() {
  2. Node<K,V>[] tab; int sc;
  3. while ((tab = table) == null || tab.length == 0) {
  4. //若sizeCtl < 0表示其他线程执行CAS成功,正在初始化或者正在扩容
  5. if ((sc = sizeCtl) < 0)
  6. Thread.yield(); // lost initialization race; just spin
  7. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  8. try {
  9. if ((tab = table) == null || tab.length == 0) {
  10. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  11. @SuppressWarnings("unchecked")
  12. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  13. table = tab = nt;
  14. //1-0.25 = 0.75,设置容量的0.75倍为阈值
  15. sc = n - (n >>> 2);
  16. }
  17. } finally {
  18. //将阈值赋给sizeCtl
  19. sizeCtl = sc;
  20. }
  21. break;
  22. }
  23. }
  24. return tab;
  25. }

sizeCtl :默认为0,用来控制table的初始化和扩容操作
-1 代表table正在初始化
-N 表示有N-1个线程正在进行扩容操作
其余情况:
1、如果table未初始化,表示table需要初始化的大小。
2、如果table初始化完成,表示table扩容的阈值,默认是table大小的0.75倍

put方法

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }
  4. /** Implementation for put and putIfAbsent */
  5. final V putVal(K key, V value, boolean onlyIfAbsent) {
  6. if (key == null || value == null) throw new NullPointerException();
  7. int hash = spread(key.hashCode());
  8. int binCount = 0;
  9. for (Node<K,V>[] tab = table;;) {
  10. Node<K,V> f; int n, i, fh;
  11. //判断是否初始化,没有就去初始化
  12. if (tab == null || (n = tab.length) == 0)
  13. tab = initTable();
  14. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  15. //如果桶为空,不加锁插入,成功break跳出
  16. if (casTabAt(tab, i, null,
  17. new Node<K,V>(hash, key, value, null)))
  18. break; // no lock when adding to empty bin
  19. }
  20. else if ((fh = f.hash) == MOVED)
  21. tab = helpTransfer(tab, f);
  22. else {
  23. V oldVal = null;
  24. synchronized (f) {
  25. if (tabAt(tab, i) == f) {
  26. if (fh >= 0) {
  27. binCount = 1;
  28. for (Node<K,V> e = f;; ++binCount) {
  29. K ek;
  30. if (e.hash == hash &&
  31. ((ek = e.key) == key ||
  32. (ek != null && key.equals(ek)))) {
  33. oldVal = e.val;
  34. if (!onlyIfAbsent)
  35. e.val = value;
  36. break;
  37. }
  38. Node<K,V> pred = e;
  39. if ((e = e.next) == null) {
  40. pred.next = new Node<K,V>(hash, key,
  41. value, null);
  42. break;
  43. }
  44. }
  45. }
  46. else if (f instanceof TreeBin) {
  47. Node<K,V> p;
  48. binCount = 2;
  49. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  50. value)) != null) {
  51. oldVal = p.val;
  52. if (!onlyIfAbsent)
  53. p.val = value;
  54. }
  55. }
  56. }
  57. }
  58. if (binCount != 0) {
  59. if (binCount >= TREEIFY_THRESHOLD)
  60. treeifyBin(tab, i);
  61. if (oldVal != null)
  62. return oldVal;
  63. break;
  64. }
  65. }
  66. }
  67. addCount(1L, binCount);
  68. return null;
  69. }

步骤大概分为:

  1. 根据key先计算出hashcode
  2. 根据表是否为空,表长是否为0判断是否进行初始化。
  3. 根据hash定位Node节点,如果节点为空,则用CAS尝试写入。
  4. 若hashcode==-1的话,表示map正在扩容,该线程调用helpTransfer()帮助扩容。
  5. 若以上都不满足,则用synchronize锁住这个Node节点。
  6. 之后判断是否是链表,是的话遍历链表,遍历的过程中如果有节点key和待加入的key相同,则覆盖,遍历到尾部则说明无覆盖情况,直接插入。
  7. 不是链表若是红黑树则对树做一次put操作。
  8. 之后检查map大小是否超过阈值,根据具体情况调用transfer来扩容。

get方法

get逻辑相对简单而且连CAS都不用,不贴源码了,直接写逻辑

  1. 根据key计算hash
  2. 根据hash确定头结点,头结点存在则进入下一步
  3. key的hash相同,key值也相同,直接返回元素
  4. 若头结点hash值小于0,表示正在扩容,或是红黑树,调用find查找
  5. 如果是链表,就遍历查找