ConcurrentHashMap是线程安全的HashMap,和HashTable不同,ConcurrentHashMap通过锁分离技术,运行效率比HashTable高

一、JDK1.7版本的ConcurrentHashMap

926638-20170809132445011-2033999443.png
在JDK1.7中,ConcurrentHashMap通过Segement数组实现锁分离技术
其数组结构由原来的桶,增加了一个Segement数组
其内部结构为 Segement->table->link

  1. static final class Segment<K,V> extends ReentrantLock implements Serializable {
  2. transient volatile HashEntry<K,V>[] table;
  3. }

在Segement内部声明了table,通过位运算初始化Segement的大小,,默认是16个,其最大值为65536既2的16次方

  1. int sshift = 0;
  2. int ssize = 1;
  3. while (ssize < concurrencyLevel) {
  4. ++sshift;
  5. ssize <<= 1;
  6. }

二、Put(1.7)

  1. static class Segment<K,V> extends ReentrantLock implements Serializable {

当执行put操作时,会进行第一次key的hash来定位Segment的位置,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后进行第二次hash操作,找到相应的HashEntry的位置,这里会利用继承过来的锁的特性,在将数据插入指定的HashEntry位置时(链表的尾端),会通过继承ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒

二、Put(1.8)

在1.7版本中锁的粒度是Segement,1.8是table,降级了锁的粒度,在1.8中就不涉及到Segement了

  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. if (key == null || value == null) throw new NullPointerException();
  3. int hash = spread(key.hashCode());
  4. int binCount = 0;
  5. for (Node<K,V>[] tab = table;;) {
  6. Node<K,V> f; int n, i, fh;
  7. if (tab == null || (n = tab.length) == 0)
  8. tab = initTable();
  9. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  10. if (casTabAt(tab, i, null,
  11. new Node<K,V>(hash, key, value, null)))
  12. break; // no lock when adding to empty bin
  13. }
  14. else if ((fh = f.hash) == MOVED)
  15. tab = helpTransfer(tab, f);
  16. else {
  17. V oldVal = null;
  18. synchronized (f) {
  19. if (tabAt(tab, i) == f) {
  20. if (fh >= 0) {
  21. binCount = 1;
  22. for (Node<K,V> e = f;; ++binCount) {
  23. K ek;
  24. if (e.hash == hash &&
  25. ((ek = e.key) == key ||
  26. (ek != null && key.equals(ek)))) {
  27. oldVal = e.val;
  28. if (!onlyIfAbsent)
  29. e.val = value;
  30. break;
  31. }
  32. Node<K,V> pred = e;
  33. if ((e = e.next) == null) {
  34. pred.next = new Node<K,V>(hash, key,
  35. value, null);
  36. break;
  37. }
  38. }
  39. }
  40. else if (f instanceof TreeBin) {
  41. Node<K,V> p;
  42. binCount = 2;
  43. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  44. value)) != null) {
  45. oldVal = p.val;
  46. if (!onlyIfAbsent)
  47. p.val = value;
  48. }
  49. }
  50. }
  51. }
  52. if (binCount != 0) {
  53. if (binCount >= TREEIFY_THRESHOLD)
  54. treeifyBin(tab, i);
  55. if (oldVal != null)
  56. return oldVal;
  57. break;
  58. }
  59. }
  60. }
  61. addCount(1L, binCount);
  62. return null;
  63. }

①ConcurrentHashMap不允许空的Key和Value出现

  1. if (key == null || value == null) throw new NullPointerException();

②当table处无数据时,直接进行无锁插入

  1. Node<K,V> f; int n, i, fh;
  2. if (tab == null || (n = tab.length) == 0)
  3. tab = initTable();
  4. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  5. if (casTabAt(tab, i, null,
  6. new Node<K,V>(hash, key, value, null)))
  7. break; // no lock when adding to empty bin
  8. }

③如果在扩容,则等待扩容

  1. if ((fh = f.hash) == MOVED)
  2. tab = helpTransfer(tab, f);

④若有数据时,则进行加锁插入,链表长度超过8转换成红黑树

  1. V oldVal = null;
  2. synchronized (f) {
  3. if (tabAt(tab, i) == f) {
  4. if (fh >= 0) {
  5. binCount = 1;
  6. for (Node<K,V> e = f;; ++binCount) {
  7. K ek;
  8. if (e.hash == hash &&
  9. ((ek = e.key) == key ||
  10. (ek != null && key.equals(ek)))) {
  11. oldVal = e.val;
  12. if (!onlyIfAbsent)
  13. e.val = value;
  14. break;
  15. }
  16. Node<K,V> pred = e;
  17. if ((e = e.next) == null) {
  18. pred.next = new Node<K,V>(hash, key,
  19. value, null);
  20. break;
  21. }
  22. }
  23. }
  24. else if (f instanceof TreeBin) {
  25. Node<K,V> p;
  26. binCount = 2;
  27. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  28. value)) != null) {
  29. oldVal = p.val;
  30. if (!onlyIfAbsent)
  31. p.val = value;
  32. }
  33. }
  34. }

四、Get(1.7)

ConcurrentHashMap的get操作跟HashMap类似,只是ConcurrentHashMap第一次需要经过一次hash定位到Segment的位置,然后再hash定位到指定的HashEntry,遍历该HashEntry下的链表进行对比,成功就返回,不成功就返回null

五、Get(1.8)

  1. 计算hash值,定位到该table索引位置,如果是首节点符合就返回
  2. 如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回
  3. 以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null

    六、Size(1.7)

    在1.7中对Size的求法有两种方案

  4. 第一种方案他会使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的

  5. 第二种方案是如果第一种方案不符合,他就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回

    七、Size(1.8)

    1. public int size() {
    2. long n = sumCount();
    3. return ((n < 0L) ? 0 :
    4. (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
    5. (int)n);
    6. }
    1. final long sumCount() {
    2. CounterCell[] as = counterCells; CounterCell a;
    3. long sum = baseCount;
    4. if (as != null) {
    5. for (int i = 0; i < as.length; ++i) {
    6. if ((a = as[i]) != null)
    7. sum += a.value;
    8. }
    9. }
    10. return sum;
    11. }
    Size在执行put的时候就已经计算好了