前有HashMap作为基于哈希表实现的用于映射(键值对)处理的容器,在介绍HashMap或者看HashMap源码的时候也知道HashMap不是线程安全的。因此引入了ConcurrentHashMap,它也是基于哈希表的,但是它是线程安全的并发容器,在java.util.concurrent包下。

image.png

属性

哈希表中真正存放数据的结点Node,和HashMap中的区别在于加了 volatile 关键字确定其可见性。

  1. static class Node<K,V> implements Map.Entry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V val;
  5. volatile Node<K,V> next;
  6. Node(int hash, K key, V val, Node<K,V> next) {
  7. this.hash = hash;
  8. this.key = key;
  9. this.val = val;
  10. this.next = next;
  11. }
  12. }

ConcurrentHashMap中同样是使用的结点数组存储哈希表,区别在于加了 volatile 关键字确定其可见性。

  1. private static final float LOAD_FACTOR = 0.75f;
  2. static final int TREEIFY_THRESHOLD = 8;
  3. transient volatile Node<K,V>[] table;
  4. private transient volatile Node<K,V>[] nextTable;

get方法

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode());
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) {
  6. if ((eh = e.hash) == h) {
  7. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  8. return e.val;
  9. }
  10. else if (eh < 0)
  11. return (p = e.find(h, key)) != null ? p.val : null;
  12. while ((e = e.next) != null) {
  13. if (e.hash == h &&
  14. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  15. return e.val;
  16. }
  17. }
  18. return null;
  19. }

这个方法没什么特别的,根据计算出的hashcode寻址,如果就在桶上那么直接返回值,如果是红黑树就按照树的方式获取值,不是就按照链表的方式遍历获取值。该方法没有涉及到并发的设计。

put方法

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }
  4. /** Implementation for put and putIfAbsent */
  5. final V putVal(K key, V value, boolean onlyIfAbsent) {
  6. if (key == null || value == null) throw new NullPointerException();
  7. //计算hashcode
  8. int hash = spread(key.hashCode());
  9. int binCount = 0;
  10. for (Node<K,V>[] tab = table;;) {
  11. Node<K,V> f; int n, i, fh;
  12. //2. 判断是否需要初始化
  13. if (tab == null || (n = tab.length) == 0)
  14. tab = initTable();
  15. //f为当前key定位出的Node,如果为空表示当前位置可以写入数据,利用CAS尝试写入
  16. else if ((f = tabAxt(tab, i = (n - 1) & hash)) == null) {
  17. if (casTabAt(tab, i, null,
  18. new Node<K,V>(hash, key, value, null)))
  19. break; // no lock when adding to empty bin
  20. }
  21. //需要扩容 当前位置hashcode==MOVED==-1
  22. else if ((fh = f.hash) == MOVED)
  23. tab = helpTransfer(tab, f);
  24. else {
  25. V oldVal = null;
  26. //都不满足,在synchronized锁里写入数据,进行了并发优化的地方
  27. synchronized (f) {
  28. if (tabAt(tab, i) == f) {
  29. if (fh >= 0) {
  30. binCount = 1;
  31. for (Node<K,V> e = f;; ++binCount) {
  32. K ek;
  33. if (e.hash == hash &&
  34. ((ek = e.key) == key ||
  35. (ek != null && key.equals(ek)))) {
  36. oldVal = e.val;
  37. if (!onlyIfAbsent)
  38. e.val = value;
  39. break;
  40. }
  41. Node<K,V> pred = e;
  42. if ((e = e.next) == null) {
  43. pred.next = new Node<K,V>(hash, key,
  44. value, null);
  45. break;
  46. }
  47. }
  48. }
  49. else if (f instanceof TreeBin) {
  50. Node<K,V> p;
  51. binCount = 2;
  52. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  53. value)) != null) {
  54. oldVal = p.val;
  55. if (!onlyIfAbsent)
  56. p.val = value;
  57. }
  58. }
  59. }
  60. }
  61. if (binCount != 0) {
  62. //和原hashmap一样,转为红黑树
  63. if (binCount >= TREEIFY_THRESHOLD)
  64. treeifyBin(tab, i);
  65. if (oldVal != null)
  66. return oldVal;
  67. break;
  68. }
  69. }
  70. }
  71. addCount(1L, binCount);
  72. return null;
  73. }
  • 根据 key 计算出 hashcode 。
  • 判断是否需要进行初始化。
  • f 即为当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
  • 如果当前位置的 hashcode == MOVED == -1,则需要进行扩容。
  • 如果都不满足,则利用 synchronized 锁写入数据。
  • 如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树。

CAS,乐观锁,在这里使用了乐观锁的原理来实现数据写入

JDK1.7 与 1.8的区别

首先是结构不同

在1.7里
ConcurrentHashMap - 图2

在1.8里
ConcurrentHashMap - 图3

两种结构的调整主要解决了1.7版本中对于查询遍历链表效率太低的问题。在1.8引入了红黑树,当链表长度超过8时,即转换为红黑树存储信息,提高了遍历的速率。

其二

在JDK1.7里,ConcurrentHashMap的put方法里主要使用了ReentrantLock来实现并发阻塞,对HashEntry,即结点加锁,在做完put操作之后再释放该锁,做到并发同步。

  1. //jdk1.7
  2. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  3. HashEntry<K,V> node = tryLock() ? null :
  4. scanAndLockForPut(key, hash, value);
  5. V oldValue;
  6. try {
  7. HashEntry<K,V>[] tab = table;
  8. int index = (tab.length - 1) & hash;
  9. HashEntry<K,V> first = entryAt(tab, index);
  10. for (HashEntry<K,V> e = first;;) {
  11. if (e != null) {
  12. K k;
  13. if ((k = e.key) == key ||
  14. (e.hash == hash && key.equals(k))) {
  15. oldValue = e.value;
  16. if (!onlyIfAbsent) {
  17. e.value = value;
  18. ++modCount;
  19. }
  20. break;
  21. }
  22. e = e.next;
  23. }
  24. else {
  25. if (node != null)
  26. node.setNext(first);
  27. else
  28. node = new HashEntry<K,V>(hash, key, value, first);
  29. int c = count + 1;
  30. if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  31. rehash(node);
  32. else
  33. setEntryAt(tab, index, node);
  34. ++modCount;
  35. count = c;
  36. oldValue = null;
  37. break;
  38. }
  39. }
  40. } finally {
  41. unlock();
  42. }
  43. return oldValue;
  44. }

步骤:

  • 首先第一步的时候会尝试获取锁,如果获取失败肯定就有其他线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁。
    • 尝试自旋获取锁。
    • 如果重试的次数达到了 MAX_SCAN_RETRIES 则改为阻塞锁获取,保证能获取成功
  • 将当前 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry。
  • 遍历该 HashEntry,如果不为空则判断传入的 key 和当前遍历的 key 是否相等,相等则覆盖旧的 value
  • 不为空则需要新建一个 HashEntry 并加入到 Segment 中,同时会先判断是否需要扩容
  • 最后会解除在 1 中所获取当前 Segment 的锁

get方法非常高效,整个过程都不需要加锁。

1.8 在 1.7 的数据结构上做了大的改动,采用红黑树之后可以保证查询效率(O(logn)),甚至取消了 ReentrantLock 改为了 synchronized,这样可以看出在新版的 JDK 中对 synchronized 优化是很到位的。