5.1 Map

5.1.1 HashMap底层原理

  1. final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
  2. boolean evict) {
  3. //判断table是否为空,如果是空的就创建一个table,并获取他的长度
  4. Node<K,V>[] tab; Node<K,V> p; int n, i;
  5. if ((tab = table) == null || (n = tab.length) == 0)
  6. n = (tab = resize()).length;
  7. //如果计算出来的索引位置之前没有放过数据,就直接放入
  8. if ((p = tab[i = (n - 1) & hash]) == null)
  9. tab[i] = newNode(hash, key, value, null);
  10. else {
  11. //进入这里说明索引位置已经放入过数据了
  12. Node<K,V> e; K k;
  13. //判断put的数据和之前的数据是否重复
  14. if (p.hash == hash &&
  15. ((k = p.key) == key || (key != null && key.equals(k)))) //key的地址或key的equals()只要有一个相等就认为key重复了,就直接覆盖原来key的value
  16. e = p;
  17. //判断是否是红黑树,如果是红黑树就直接插入树中
  18. else if (p instanceof TreeNode)
  19. e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
  20. else {
  21. //如果不是红黑树,就遍历每个节点,判断链表长度是否大于8,如果大于就转换为红黑树
  22. for (int binCount = 0; ; ++binCount) {
  23. if ((e = p.next) == null) {
  24. p.next = newNode(hash, key, value, null);
  25. if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
  26. treeifyBin(tab, hash);
  27. break;
  28. }
  29. //判断索引每个元素的key是否可要插入的key相同,如果相同就直接覆盖
  30. if (e.hash == hash &&
  31. ((k = e.key) == key || (key != null && key.equals(k))))
  32. break;
  33. p = e;
  34. }
  35. }
  36. //如果e不是null,说明没有迭代到最后就跳出了循环,说明链表中有相同的key,因此只需要将value覆盖,并将oldValue返回即可
  37. if (e != null) { // existing mapping for key
  38. V oldValue = e.value;
  39. if (!onlyIfAbsent || oldValue == null)
  40. e.value = value;
  41. afterNodeAccess(e);
  42. return oldValue;
  43. }
  44. }
  45. //说明没有key相同,因此要插入一个key-value,并记录内部结构变化次数
  46. ++modCount;
  47. if (++size > threshold)
  48. resize();
  49. afterNodeInsertion(evict);
  50. return null;
  51. }

HashMap存储的是key-value的键值对,允许key为null,也允许value为null。HashMap内部为数组+链表的结构,

  1. static final int tableSizeFor(int cap) {
  2. int n = cap - 1;
  3. n |= n >>> 1;
  4. n |= n >>> 2;
  5. n |= n >>> 4;
  6. n |= n >>> 8;
  7. n |= n >>> 16;
  8. return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
  9. }

返回 传入值的最小的2的n次方的值 比如 7 -> 8 12->16

  1. static final int hash(Object key) {
  2. int h;
  3. //h >>> 16,表示无符号右移16位,高位补0,任何数跟0异或都是其本身,因此key的hash值高16位不变
  4. return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); //
  5. }
  1. int n = 8;
  2. for(int j = 1; j < 100; j++) {
  3. int num = new Random().nextInt(100000000);
  4. Object o = (Object)(num);
  5. if(j % 8 == 0) {
  6. n = n + 8;
  7. }
  8. int h;
  9. System.err.println((n - 1) & ((h = o.hashCode()) ^ (h >>> 16)));
  10. }

上面代码是hashmap 获取下标的原理,n是map的大小,数组的下标为 (n - 1) & hashcode,就是数据大小和hashcode做&运算,得到的大小就是 ** 0-数组大小。
为了降低hash的冲突,尽量让hash分散和数组下标尽量取决于hashcode:
1:hashcode 前后16位做 异或 运算,降低hashcode 相同
2:让数组大小为 2 n -1 。就是让2进制的数字后面位数都是1。
当扩容后 扩容后的数组大小是之前的两倍 也就是二进制的数值左移一位,也就是多了一个1 ,那么在做
(n - 1) & hashcode 的时候 ,要么hashCode的 值 运算的结果 比 要么是之前的下标 i 要么是 之前的 i + n


会根据key的hashCode值来确定数组的索引(确认放在哪个桶里),如果遇到索引相同的key,桶的大小是2,如果一个key的hashCode是7,一个key的hashCode也是7,那么他们就会被分到一个桶中(hash冲突),如果发生hash冲突,HashMap会将同一个桶中的数据以链表的形式存储,但是如果发生hash冲突的概率比较高,就会导致同一个桶中的链表长度过长,遍历效率降低,所以在JDK1.8中如果链表长度到
达阀值(默认是8),就会将链表转换成红黑二叉树。**

两个key的hashCode可能会定位到一个桶中,这时就发生了hash冲突,如果HashMap的hash算法越散列,那么发生hash冲突的概率越低,如果数组越大,那么发生hash冲突的概率也会越低,但是数组越大带来的空间开销越多,但是遍历速度越快,这就要在空间和时间上进行权衡,这就要看看HashMap的扩容机制

threshold=负载因子 * length ,
大于 threshold 会进行扩容
也就是说数组长度固定以后, 如果负载因子越大,所能容纳的元素个数越多,如果超过这个值就会进行扩容(默认是扩容为原来的2倍),0.75这个值是权衡过空间和时间得出的,建议大家不要随意修改,如果在一些特殊情况下,比如空间比较多,但要求速度比较快,这时候就可以把扩容因子调小以较少hash冲突的概率。相反就增大扩容因子(这个值可以大于1)。
size就是HashMap中键值对的总个数。还有一个字段是modCount,记录是发生内部结构变化的次数,如果put值,但是put的值是覆盖原有的值,这样是不算内部结构变化的。

因为HashMap扩容每次都是扩容为原来的2倍,所以length总是2的次方,这是非常规的设置,常规设置是把桶的大小设置为素数,因为素数发生hash冲突的概率要小于合数,比如HashTable的默认值设置为11,就是桶的大小为素数的应用(HashTable扩容后不能保证是素数)。HashMap采用这种设置是为了在取模和扩容的时候做出优化。

hashMap是通过key的hashCode的高16位和低16位异或后和桶的数量取模得到索引位置,即key.hashcode()^(hashcode>>>16)%length,当length是2^n时,h&(length-1)运算等价于h%length,而&操作比%效率更高。而采用高16位和低16位进行异或,也可以让所有的位数都参与越算,使得在length比较小的时候也可以做到尽量的散列。

在扩容的时候,如果length每次是2^n,那么重新计算出来的索引只有两种情况,一种是 old索引+16,另一种是索引不变,所以就不需要每次都重新计算索引。

使用对象做为map的key的时候 要注意需要重新 hashcode 和 equal 方法

  1. public static void main(String[] args) {
  2. Map<Person,String> map = new HashMap<>(8);
  3. Person gege = new Person();
  4. gege.setAge(3);
  5. gege.setName("gege");
  6. map.put(gege,"哥哥");
  7. Person didi = new Person();
  8. didi.setAge(1);
  9. didi.setName("didi");
  10. map.put(didi,"弟弟");
  11. System.out.println(map.get(gege));
  12. System.out.println(map.get(didi));
  13. // 加入这个map传递给另一个方法,此时这个方法想拿到一个哥哥
  14. // new一个哥哥出来去get
  15. Person dagege = new Person();
  16. dagege.setName("gege");
  17. dagege.setAge(3);
  18. System.out.println(map.get(dagege));
  19. System.out.println(dagege.hashCode());
  20. System.out.println(gege.hashCode());
  21. }

不重写的时候 当去获取的时候 ,重新new一个对象 ,去当做key的时候,由于两个hashcode 不一样 更加不会equals 所以,获取不到之前的对象。

5.1.2 TreeMap底层原理

  1. public V put(K key, V value) {
  2. Entry<K,V> t = root; //获取根节点
  3. if (t == null) { // 如果根节点为空,则该元素置为根节点
  4. compare(key, key); // type (and possibly null) check
  5. root = new Entry<>(key, value, null);
  6. size = 1;
  7. modCount++;
  8. return null;
  9. }
  10. int cmp;
  11. Entry<K,V> parent;
  12. // split comparator and comparable paths
  13. Comparator<? super K> cpr = comparator; // 比较器对象
  14. // 如果比较器对象不为空,也就是自定义了比较器
  15. if (cpr != null) {
  16. do { // 循环比较并确定元素应插入的位置(也就是找到该元素的父节点)
  17. parent = t;
  18. // 调用比较器对象的compare()方法,该方法返回一个整数
  19. cmp = cpr.compare(key, t.key);
  20. if (cmp < 0) // 待插入元素的key"小于"当前位置元素的key,则查询左子树
  21. t = t.left;
  22. else if (cmp > 0)// 待插入元素的key"大于"当前位置元素的key,则查询右子树
  23. t = t.right;
  24. else
  25. return t.setValue(value); // "相等"则替换其value。
  26. } while (t != null);
  27. }
  28. else { // 如果比较器对象为空,使用默认的比较机制
  29. if (key == null)
  30. throw new NullPointerException();
  31. @SuppressWarnings("unchecked")
  32. Comparable<? super K> k = (Comparable<? super K>) key; // 取出比较器对象
  33. do { // 同样是循环比较并确定元素应插入的位置(也就是找到该元素的父节点)
  34. parent = t;
  35. cmp = k.compareTo(t.key);// 同样调用比较方法并返回一个整数
  36. if (cmp < 0) // 待插入元素的key"小于"当前位置元素的key,则查询左子树
  37. t = t.left;
  38. else if (cmp > 0) // 待插入元素的key"大于"当前位置元素的key,则查询右子树
  39. t = t.right;
  40. else
  41. return t.setValue(value); // "相等"则替换其value。
  42. } while (t != null);
  43. }
  44. Entry<K,V> e = new Entry<>(key, value, parent); // 根据key找到父节点后新建一个节点
  45. if (cmp < 0) // 根据比较的结果来确定放在左子树还是右子树
  46. parent.left = e;
  47. else
  48. parent.right = e;
  49. fixAfterInsertion(e);
  50. size++; // 集合大小+1
  51. modCount++; // 集合结构被修改次数+1
  52. return null;
  53. }

TreeMap 底层是红黑树结构,可以排序,可以默认排序,或者自定义排序规则。
插入的数据按照红黑树的结构存储。

5.1.3 LinkedHashMap底层原理

linkedHashMap底层调用的是hashmap的方法,ListedHashMap 重写了 newNode

  1. tab[i] = newNode(hash, key, value, null);
  1. Node<K,V> newNode(int hash, K key, V value, Node<K,V> e) {
  2. LinkedHashMap.Entry<K,V> p =
  3. new LinkedHashMap.Entry<K,V>(hash, key, value, e);
  4. linkNodeLast(p);
  5. return p;
  6. }
  7. private void linkNodeLast(LinkedHashMap.Entry<K,V> p) {
  8. LinkedHashMap.Entry<K,V> last = tail;
  9. tail = p;
  10. if (last == null)
  11. head = p;
  12. else {
  13. p.before = last;
  14. last.after = p;
  15. }
  16. }
  17. static class Entry<K,V> extends HashMap.Node<K,V> {
  18. Entry<K,V> before, after;
  19. Entry(int hash, K key, V value, Node<K,V> next) {
  20. super(hash, key, value, next);
  21. }
  22. }

可以看到 Entry有两个指针 前 和 后,就是存放在数组中的Node的节点中都可以找到他前后的节点。
每次newNode 的时候 都会调用 linkNodeLast 方法,这时会记录head的值,记录head的值,就是可以在便利的时候依次输出每个node,因为每个node都会存前一个before和后一个after

网上很多说是

  1. void afterNodeAccess(Node<K,V> e) { // move node to last
  2. LinkedHashMap.Entry<K,V> last;
  3. if (accessOrder && (last = tail) != e) {
  4. LinkedHashMap.Entry<K,V> p =
  5. (LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
  6. p.after = null;
  7. if (b == null)
  8. head = a;
  9. else
  10. b.after = a;
  11. if (a != null)
  12. a.before = b;
  13. else
  14. last = b;
  15. if (last == null)
  16. head = p;
  17. else {
  18. p.before = last;
  19. last.after = p;
  20. }
  21. tail = p;
  22. ++modCount;
  23. }
  24. }

网上说的是在hashmap 提供的回调函数会调用 linkedHashMap的的上面这个方法,这个是LinkedHashMap 排序的关键,因为这个函数是在链表中排序的,也就是hash冲突的时候的排序,所以这个不是真正排序的关键代码。有待认证。

  1. void afterNodeAccess(Node<K,V> e) { // move node to last
  2. LinkedHashMap.Entry<K,V> last;
  3. if (accessOrder && (last = tail) != e) {
  4. LinkedHashMap.Entry<K,V> p =
  5. (LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
  6. p.after = null;
  7. if (b == null)
  8. head = a;
  9. else
  10. b.after = a;
  11. if (a != null)
  12. a.before = b;
  13. else
  14. last = b;
  15. if (last == null)
  16. head = p;
  17. else {
  18. p.before = last;
  19. last.after = p;
  20. }
  21. tail = p;
  22. ++modCount;
  23. }
  24. }

上面的 方法是把这次入参的数据 ,放到链表的尾部。

5.1.4 CurrentHashMap底层原理

HashMap是工作中使用频度非常高的一个K-V存储容器。在多线程环境下,使用HashMap是不安全的,可能产生各种非期望的结果。
针对HashMap在多线程环境下不安全这个问题,HashMap的作者认为这并不是bug,而是应该使用线程安全的HashMap。
目前有如下一些方式可以获得线程安全的HashMap:

  • Collections.synchronizedMap
  • HashTable
  • ConcurrentHashMap

其中,前两种方式由于全局锁的问题,存在很严重的性能问题。所以,著名的并发编程大师Doug Lea在JDK1.5的java.util.concurrent包下面添加了一大堆并发工具。其中就包含ConcurrentHashMap这个线程安全的HashMap。
本文就来简单介绍一下ConcurrentHashMap的实现原理。
PS:基于JDK8

0 ConcurrentHashMap在JDK7中的回顾

ConcurrentHashMap在JDK7和JDK8中的实现方式上有较大的不同。首先我们先来大概回顾一下ConcurrentHashMap在JDK7中的原理是怎样的。

0.1 分段锁技术

针对HashTable会锁整个hash表的问题,ConcurrentHashMap提出了分段锁的解决方案。
分段锁的思想就是:锁的时候不锁整个hash表,而是只锁一部分。
如何实现呢?这就用到了ConcurrentHashMap中最关键的Segment。
ConcurrentHashMap中维护着一个Segment数组,每个Segment可以看做是一个HashMap。
而Segment本身继承了ReentrantLock,它本身就是一个锁。
在Segment中通过HashEntry数组来维护其内部的hash表。
每个HashEntry就代表了map中的一个K-V,用HashEntry可以组成一个链表结构,通过next字段引用到其下一个元素。
上述内容在源码中的表示如下:

  1. public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
  2. implements ConcurrentMap<K, V>, Serializable {
  3. // ... 省略 ...
  4. /**
  5. * The segments, each of which is a specialized hash table.
  6. */
  7. final Segment<K,V>[] segments;
  8. // ... 省略 ...
  9. /**
  10. * Segment是ConcurrentHashMap的静态内部类
  11. *
  12. * Segments are specialized versions of hash tables. This
  13. * subclasses from ReentrantLock opportunistically, just to
  14. * simplify some locking and avoid separate construction.
  15. */
  16. static final class Segment<K,V> extends ReentrantLock implements Serializable {
  17. // ... 省略 ...
  18. /**
  19. * The per-segment table. Elements are accessed via
  20. * entryAt/setEntryAt providing volatile semantics.
  21. */
  22. transient volatile HashEntry<K,V>[] table;
  23. // ... 省略 ...
  24. }
  25. // ... 省略 ...
  26. /**
  27. * ConcurrentHashMap list entry. Note that this is never exported
  28. * out as a user-visible Map.Entry.
  29. */
  30. static final class HashEntry<K,V> {
  31. final int hash;
  32. final K key;
  33. volatile V value;
  34. volatile HashEntry<K,V> next;
  35. // ... 省略 ...
  36. }
  37. }

所以,JDK7中,ConcurrentHashMap的整体结构可以描述为下图这样子。
java-集合 - 图1
由上图可见,只要我们的hash值足够分散,那么每次put的时候就会put到不同的segment中去。 而segment自己本身就是一个锁,put的时候,当前segment会将自己锁住,此时其他线程无法操作这个segment, 但不会影响到其他segment的操作。这个就是锁分段带来的好处。

0.2 线程安全的put

ConcurrentHashMap的put方法源码如下:

  1. public V put(K key, V value) {
  2. Segment<K,V> s;
  3. if (value == null)
  4. throw new NullPointerException();
  5. int hash = hash(key);
  6. int j = (hash >>> segmentShift) & segmentMask;
  7. // 根据key的hash定位出一个segment,如果指定index的segment还没初始化,则调用ensureSegment方法初始化
  8. if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
  9. (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
  10. s = ensureSegment(j);
  11. // 调用segment的put方法
  12. return s.put(key, hash, value, false);
  13. }

最终会调用segment的put方法,将元素put到HashEntry数组中,这里的注释中只给出锁相关的说明

  1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  2. // 因为segment本身就是一个锁
  3. // 这里调用tryLock尝试获取锁
  4. // 如果获取成功,那么其他线程都无法再修改这个segment
  5. // 如果获取失败,会调用scanAndLockForPut方法根据key和hash尝试找到这个node,如果不存在,则创建一个node并返回,如果存在则返回null
  6. // 查看scanAndLockForPut源码会发现他在查找的过程中会尝试获取锁,在多核CPU环境下,会尝试64次tryLock(),如果64次还没获取到,会直接调用lock()
  7. // 也就是说这一步一定会获取到锁
  8. HashEntry<K,V> node = tryLock() ? null :
  9. scanAndLockForPut(key, hash, value);
  10. V oldValue;
  11. try {
  12. HashEntry<K,V>[] tab = table;
  13. int index = (tab.length - 1) & hash;
  14. HashEntry<K,V> first = entryAt(tab, index);
  15. for (HashEntry<K,V> e = first;;) {
  16. if (e != null) {
  17. K k;
  18. if ((k = e.key) == key ||
  19. (e.hash == hash && key.equals(k))) {
  20. oldValue = e.value;
  21. if (!onlyIfAbsent) {
  22. e.value = value;
  23. ++modCount;
  24. }
  25. break;
  26. }
  27. e = e.next;
  28. }
  29. else {
  30. if (node != null)
  31. node.setNext(first);
  32. else
  33. node = new HashEntry<K,V>(hash, key, value, first);
  34. int c = count + 1;
  35. if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  36. // 扩容
  37. rehash(node);
  38. else
  39. setEntryAt(tab, index, node);
  40. ++modCount;
  41. count = c;
  42. oldValue = null;
  43. break;
  44. }
  45. }
  46. } finally {
  47. // 释放锁
  48. unlock();
  49. }
  50. return oldValue;
  51. }

0.3 线程安全的扩容(Rehash)

HashMap的线程安全问题大部分出在扩容(rehash)的过程中。
ConcurrentHashMap的扩容只针对每个segment中的HashEntry数组进行扩容。
由上述put的源码可知,ConcurrentHashMap在rehash的时候是有锁的,所以在rehash的过程中,其他线程无法对segment的hash表做操作,这就保证了线程安全。

0.4 JDK8中ConcurrentHashMap的初始化

以无参数构造函数为例,来看一下ConcurrentHashMap类初始化的时候会做些什么。

  1. ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();

首先会执行静态代码块和初始化类变量。 主要会初始化以下这些类变量:

  1. // Unsafe mechanics
  2. private static final sun.misc.Unsafe U;
  3. private static final long SIZECTL;
  4. private static final long TRANSFERINDEX;
  5. private static final long BASECOUNT;
  6. private static final long CELLSBUSY;
  7. private static final long CELLVALUE;
  8. private static final long ABASE;
  9. private static final int ASHIFT;
  10. static {
  11. try {
  12. U = sun.misc.Unsafe.getUnsafe();
  13. Class<?> k = ConcurrentHashMap.class;
  14. SIZECTL = U.objectFieldOffset
  15. (k.getDeclaredField("sizeCtl"));
  16. TRANSFERINDEX = U.objectFieldOffset
  17. (k.getDeclaredField("transferIndex"));
  18. BASECOUNT = U.objectFieldOffset
  19. (k.getDeclaredField("baseCount"));
  20. CELLSBUSY = U.objectFieldOffset
  21. (k.getDeclaredField("cellsBusy"));
  22. Class<?> ck = CounterCell.class;
  23. CELLVALUE = U.objectFieldOffset
  24. (ck.getDeclaredField("value"));
  25. Class<?> ak = Node[].class;
  26. ABASE = U.arrayBaseOffset(ak);
  27. int scale = U.arrayIndexScale(ak);
  28. if ((scale & (scale - 1)) != 0)
  29. throw new Error("data type scale not a power of two");
  30. ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
  31. } catch (Exception e) {
  32. throw new Error(e);
  33. }
  34. }

这里用到了Unsafe类,其中objectFieldOffset方法用于获取指定Field(例如sizeCtl)在内存中的偏移量。
获取的这个偏移量主要用于干啥呢?不着急,在下文的分析中,遇到的时候再研究就好。

PS:关于Unsafe的介绍和使用,可以查看笔者的另一篇文章 Unsafe类的介绍和使用

0.5 内部数据结构

先来从源码角度看一下JDK8中是怎么定义的存储结构。

  1. /**
  2. * The array of bins. Lazily initialized upon first insertion.
  3. * Size is always a power of two. Accessed directly by iterators.
  4. *
  5. * hash表,在第一次put数据的时候才初始化,他的大小总是2的倍数。
  6. */
  7. transient volatile Node<K,V>[] table;
  8. /**
  9. * 用来存储一个键值对
  10. *
  11. * Key-value entry. This class is never exported out as a
  12. * user-mutable Map.Entry (i.e., one supporting setValue; see
  13. * MapEntry below), but can be used for read-only traversals used
  14. * in bulk tasks. Subclasses of Node with a negative hash field
  15. * are special, and contain null keys and values (but are never
  16. * exported). Otherwise, keys and vals are never null.
  17. */
  18. static class Node<K,V> implements Map.Entry<K,V> {
  19. final int hash;
  20. final K key;
  21. volatile V val;
  22. volatile Node<K,V> next;
  23. }

可以发现,JDK8与JDK7的实现由较大的不同,JDK8中不在使用Segment的概念,他更像HashMap的实现方式。

PS:关于HashMap的原理,可以参考笔者的另一篇文章 HashMap原理及内部存储结构

这个结构可以通过下图描述出来
java-集合 - 图2

0.6 线程安全的hash表初始化

由上文可知ConcurrentHashMap是用table这个成员变量来持有hash表的。
table的初始化采用了延迟初始化策略,他会在第一次执行put的时候初始化table。
put方法源码如下(省略了暂时不相关的代码):

  1. /**
  2. * Maps the specified key to the specified value in this table.
  3. * Neither the key nor the value can be null.
  4. *
  5. * <p>The value can be retrieved by calling the {@code get} method
  6. * with a key that is equal to the original key.
  7. *
  8. * @param key key with which the specified value is to be associated
  9. * @param value value to be associated with the specified key
  10. * @return the previous value associated with {@code key}, or
  11. * {@code null} if there was no mapping for {@code key}
  12. * @throws NullPointerException if the specified key or value is null
  13. */
  14. public V put(K key, V value) {
  15. return putVal(key, value, false);
  16. }
  17. /** Implementation for put and putIfAbsent */
  18. final V putVal(K key, V value, boolean onlyIfAbsent) {
  19. if (key == null || value == null) throw new NullPointerException();
  20. // 计算key的hash值
  21. int hash = spread(key.hashCode());
  22. int binCount = 0;
  23. for (Node<K,V>[] tab = table;;) {
  24. Node<K,V> f; int n, i, fh;
  25. // 如果table是空,初始化之
  26. if (tab == null || (n = tab.length) == 0)
  27. tab = initTable();
  28. // 省略...
  29. }
  30. // 省略...
  31. }

initTable源码如下

  1. /**
  2. * Initializes table, using the size recorded in sizeCtl.
  3. */
  4. private final Node<K,V>[] initTable() {
  5. Node<K,V>[] tab; int sc;
  6. // #1
  7. while ((tab = table) == null || tab.length == 0) {
  8. // sizeCtl的默认值是0,所以最先走到这的线程会进入到下面的else if判断中
  9. // #2
  10. if ((sc = sizeCtl) < 0)
  11. Thread.yield(); // lost initialization race; just spin
  12. // 尝试原子性的将指定对象(this)的内存偏移量为SIZECTL的int变量值从sc更新为-1
  13. // 也就是将成员变量sizeCtl的值改为-1
  14. // #3
  15. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  16. try {
  17. // 双重检查,原因会在下文分析
  18. // #4
  19. if ((tab = table) == null || tab.length == 0) {
  20. int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默认初始容量为16
  21. @SuppressWarnings("unchecked")
  22. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  23. // #5
  24. table = tab = nt; // 创建hash表,并赋值给成员变量table
  25. sc = n - (n >>> 2);
  26. }
  27. } finally {
  28. // #6
  29. sizeCtl = sc;
  30. }
  31. break;
  32. }
  33. }
  34. return tab;
  35. }

成员变量sizeCtl在ConcurrentHashMap中的其中一个作用相当于HashMap中的threshold,当hash表中元素个数超过sizeCtl时,触发扩容; 他的另一个作用类似于一个标识,例如,当他等于-1的时候,说明已经有某一线程在执行hash表的初始化了,一个小于-1的值表示某一线程正在对hash表执行resize。
这个方法首先判断sizeCtl是否小于0,如果小于0,直接将当前线程变为就绪状态的线程。
当sizeCtl大于等于0时,当前线程会尝试通过CAS的方式将sizeCtl的值修改为-1。修改失败的线程会进入下一轮循环,判断sizeCtl<0了,被yield住;修改成功的线程会继续执行下面的初始化代码。
在new Node[]之前,要再检查一遍table是否为空,这里做双重检查的原因在于,如果另一个线程执行完#1代码后挂起,此时另一个初始化的线程执行完了#6的代码,此时sizeCtl是一个大于0的值,那么再切回这个线程执行的时候,是有可能重复初始化的。关于这个问题会在下图的并发场景中说明。
然后初始化hash表,并重新计算sizeCtl的值,最终返回初始化好的hash表。
下图详细说明了几种可能导致重复初始化hash表的并发场景,我们假设Thread2最终成功初始化hash表。

  • Thread1模拟的是CAS更新sizeCtl变量的并发场景
  • Thread2模拟的是table的双重检查的必要性

java-集合 - 图3
由上图可以看出,在Thread1中如果不对sizeCtl的值更新做并发控制,Thread1是有可能走到new Node[]这一步的。 在Thread3中,如果不做双重判断,Thread3也会走到new Node[]这一步。

0.7 线程安全的put

put操作可分为以下两类

  • 当前hash表对应当前key的index上没有元素时
  • 当前hash表对应当前key的index上已经存在元素时(hash碰撞)

    0.8 hash表上没有元素时

    对应源码如下

    1. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    2. if (casTabAt(tab, i, null,
    3. new Node<K,V>(hash, key, value, null)))
    4. break; // no lock when adding to empty bin
    5. }
    6. static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    7. return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    8. }
    9. static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
    10. Node<K,V> c, Node<K,V> v) {
    11. return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    12. }

    tabAt方法通过Unsafe.getObjectVolatile()的方式获取数组对应index上的元素,getObjectVolatile作用于对应的内存偏移量上,是具备volatile内存语义的。
    如果获取的是空,尝试用cas的方式在数组的指定index上创建一个新的Node。

    0.9 hash碰撞时

    对应源码如下

    1. else {
    2. V oldVal = null;
    3. // 锁f是在4.1中通过tabAt方法获取的
    4. // 也就是说,当发生hash碰撞时,会以链表的头结点作为锁
    5. synchronized (f) {
    6. // 这个检查的原因在于:
    7. // tab引用的是成员变量table,table在发生了rehash之后,原来index上的Node可能会变
    8. // 这里就是为了确保在put的过程中,没有收到rehash的影响,指定index上的Node仍然是f
    9. // 如果不是f,那这个锁就没有意义了
    10. if (tabAt(tab, i) == f) {
    11. // 确保put没有发生在扩容的过程中,fh=-1时表示正在扩容
    12. if (fh >= 0) {
    13. binCount = 1;
    14. for (Node<K,V> e = f;; ++binCount) {
    15. K ek;
    16. if (e.hash == hash &&
    17. ((ek = e.key) == key ||
    18. (ek != null && key.equals(ek)))) {
    19. oldVal = e.val;
    20. if (!onlyIfAbsent)
    21. e.val = value;
    22. break;
    23. }
    24. Node<K,V> pred = e;
    25. if ((e = e.next) == null) {
    26. // 在链表后面追加元素
    27. pred.next = new Node<K,V>(hash, key,
    28. value, null);
    29. break;
    30. }
    31. }
    32. }
    33. else if (f instanceof TreeBin) {
    34. Node<K,V> p;
    35. binCount = 2;
    36. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    37. value)) != null) {
    38. oldVal = p.val;
    39. if (!onlyIfAbsent)
    40. p.val = value;
    41. }
    42. }
    43. }
    44. }
    45. if (binCount != 0) {
    46. // 如果链表长度超过8个,将链表转换为红黑树,与HashMap相同,相对于JDK7来说,优化了查找效率
    47. if (binCount >= TREEIFY_THRESHOLD)
    48. treeifyBin(tab, i);
    49. if (oldVal != null)
    50. return oldVal;
    51. break;
    52. }
    53. }

    不同于JDK7中segment的概念,JDK8中直接用链表的头节点做为锁。 JDK7中,HashMap在多线程并发put的情况下可能会形成环形链表,ConcurrentHashMap通过这个锁的方式,使同一时间只有有一个线程对某一链表执行put,解决了并发问题。

    1.0 线程安全的扩容

    put方法的最后一步是统计hash表中元素的个数,如果超过sizeCtl的值,触发扩容。
    扩容的代码略长,可大致看一下里面的中文注释,再参考下面的分析。 其实我们主要的目的是弄明白ConcurrentHashMap是如何解决HashMap的并发问题的。 带着这个问题来看源码就好。关于HashMap存在的问题,参考本文一开始说的笔者的另一篇文章即可。
    其实HashMap的并发问题多半是由于put和扩容并发导致的。
    这里我们就来看一下ConcurrentHashMap是如何解决的。
    扩容涉及的代码如下:

    1. /**
    2. * The array of bins. Lazily initialized upon first insertion.
    3. * Size is always a power of two. Accessed directly by iterators.
    4. * 业务中使用的hash表
    5. */
    6. transient volatile Node<K,V>[] table;
    7. /**
    8. * The next table to use; non-null only while resizing.
    9. * 扩容时才使用的hash表,扩容完成后赋值给table,并将nextTable重置为null。
    10. */
    11. private transient volatile Node<K,V>[] nextTable;
    12. /**
    13. * Adds to count, and if table is too small and not already
    14. * resizing, initiates transfer. If already resizing, helps
    15. * perform transfer if work is available. Rechecks occupancy
    16. * after a transfer to see if another resize is already needed
    17. * because resizings are lagging additions.
    18. *
    19. * @param x the count to add
    20. * @param check if <0, don't check resize, if <= 1 only check if uncontended
    21. */
    22. private final void addCount(long x, int check) {
    23. // ----- 计算键值对的个数 start -----
    24. CounterCell[] as; long b, s;
    25. if ((as = counterCells) != null ||
    26. !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    27. CounterCell a; long v; int m;
    28. boolean uncontended = true;
    29. if (as == null || (m = as.length - 1) < 0 ||
    30. (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
    31. !(uncontended =
    32. U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
    33. fullAddCount(x, uncontended);
    34. return;
    35. }
    36. if (check <= 1)
    37. return;
    38. s = sumCount();
    39. }
    40. // ----- 计算键值对的个数 end -----
    41. // ----- 判断是否需要扩容 start -----
    42. if (check >= 0) {
    43. Node<K,V>[] tab, nt; int n, sc;
    44. // 当上面计算出来的键值对个数超出sizeCtl时,触发扩容,调用核心方法transfer
    45. while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
    46. (n = tab.length) < MAXIMUM_CAPACITY) {
    47. int rs = resizeStamp(n);
    48. if (sc < 0) {
    49. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
    50. sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
    51. transferIndex <= 0)
    52. break;
    53. // 如果有已经在执行的扩容操作,nextTable是正在扩容中的新的hash表
    54. // 如果并发扩容,transfer直接使用正在扩容的新hash表,保证了不会出现hash表覆盖的情况
    55. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
    56. transfer(tab, nt);
    57. }
    58. // 更新sizeCtl的值,更新成功后为负数,扩容开始
    59. // 此时没有并发扩容的情况,transfer中会new一个新的hash表来扩容
    60. else if (U.compareAndSwapInt(this, SIZECTL, sc,
    61. (rs << RESIZE_STAMP_SHIFT) + 2))
    62. transfer(tab, null);
    63. s = sumCount();
    64. }
    65. }
    66. // ----- 判断是否需要扩容 end -----
    67. }
    68. /**
    69. * Moves and/or copies the nodes in each bin to new table. See
    70. * above for explanation.
    71. */
    72. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    73. int n = tab.length, stride;
    74. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    75. stride = MIN_TRANSFER_STRIDE; // subdivide range
    76. if (nextTab == null) { // initiating
    77. try {
    78. @SuppressWarnings("unchecked")
    79. // 初始化新的hash表,大小为之前的2倍,并赋值给成员变量nextTable
    80. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
    81. nextTab = nt;
    82. } catch (Throwable ex) { // try to cope with OOME
    83. sizeCtl = Integer.MAX_VALUE;
    84. return;
    85. }
    86. nextTable = nextTab;
    87. transferIndex = n;
    88. }
    89. int nextn = nextTab.length;
    90. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    91. boolean advance = true;
    92. boolean finishing = false; // to ensure sweep before committing nextTab
    93. for (int i = 0, bound = 0;;) {
    94. Node<K,V> f; int fh;
    95. while (advance) {
    96. int nextIndex, nextBound;
    97. if (--i >= bound || finishing)
    98. advance = false;
    99. else if ((nextIndex = transferIndex) <= 0) {
    100. i = -1;
    101. advance = false;
    102. }
    103. else if (U.compareAndSwapInt
    104. (this, TRANSFERINDEX, nextIndex,
    105. nextBound = (nextIndex > stride ?
    106. nextIndex - stride : 0))) {
    107. bound = nextBound;
    108. i = nextIndex - 1;
    109. advance = false;
    110. }
    111. }
    112. if (i < 0 || i >= n || i + n >= nextn) {
    113. int sc;
    114. // 扩容完成时,将成员变量nextTable置为null,并将table替换为rehash后的nextTable
    115. if (finishing) {
    116. nextTable = null;
    117. table = nextTab;
    118. sizeCtl = (n << 1) - (n >>> 1);
    119. return;
    120. }
    121. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
    122. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
    123. return;
    124. finishing = advance = true;
    125. i = n; // recheck before commit
    126. }
    127. }
    128. else if ((f = tabAt(tab, i)) == null)
    129. advance = casTabAt(tab, i, null, fwd);
    130. else if ((fh = f.hash) == MOVED)
    131. advance = true; // already processed
    132. else {
    133. // 接下来是遍历每个链表,对每个链表的元素进行rehash
    134. // 仍然用头结点作为锁,所以在扩容的时候,无法对这个链表执行put操作
    135. synchronized (f) {
    136. if (tabAt(tab, i) == f) {
    137. Node<K,V> ln, hn;
    138. if (fh >= 0) {
    139. int runBit = fh & n;
    140. Node<K,V> lastRun = f;
    141. for (Node<K,V> p = f.next; p != null; p = p.next) {
    142. int b = p.hash & n;
    143. if (b != runBit) {
    144. runBit = b;
    145. lastRun = p;
    146. }
    147. }
    148. if (runBit == 0) {
    149. ln = lastRun;
    150. hn = null;
    151. }
    152. else {
    153. hn = lastRun;
    154. ln = null;
    155. }
    156. for (Node<K,V> p = f; p != lastRun; p = p.next) {
    157. int ph = p.hash; K pk = p.key; V pv = p.val;
    158. if ((ph & n) == 0)
    159. ln = new Node<K,V>(ph, pk, pv, ln);
    160. else
    161. hn = new Node<K,V>(ph, pk, pv, hn);
    162. }
    163. // setTabAt方法调用了Unsafe.putObjectVolatile来完成hash表元素的替换,具备volatile内存语义
    164. setTabAt(nextTab, i, ln);
    165. setTabAt(nextTab, i + n, hn);
    166. setTabAt(tab, i, fwd);
    167. advance = true;
    168. }
    169. else if (f instanceof TreeBin) {
    170. TreeBin<K,V> t = (TreeBin<K,V>)f;
    171. TreeNode<K,V> lo = null, loTail = null;
    172. TreeNode<K,V> hi = null, hiTail = null;
    173. int lc = 0, hc = 0;
    174. for (Node<K,V> e = t.first; e != null; e = e.next) {
    175. int h = e.hash;
    176. TreeNode<K,V> p = new TreeNode<K,V>
    177. (h, e.key, e.val, null, null);
    178. if ((h & n) == 0) {
    179. if ((p.prev = loTail) == null)
    180. lo = p;
    181. else
    182. loTail.next = p;
    183. loTail = p;
    184. ++lc;
    185. }
    186. else {
    187. if ((p.prev = hiTail) == null)
    188. hi = p;
    189. else
    190. hiTail.next = p;
    191. hiTail = p;
    192. ++hc;
    193. }
    194. }
    195. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
    196. (hc != 0) ? new TreeBin<K,V>(lo) : t;
    197. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
    198. (lc != 0) ? new TreeBin<K,V>(hi) : t;
    199. setTabAt(nextTab, i, ln);
    200. setTabAt(nextTab, i + n, hn);
    201. setTabAt(tab, i, fwd);
    202. advance = true;
    203. }
    204. }
    205. }
    206. }
    207. }
    208. }

    根据上述代码,对ConcurrentHashMap是如何解决HashMap并发问题这一疑问进行简要说明。

  • 首先new一个新的hash表(nextTable)出来,大小是原来的2倍。后面的rehash都是针对这个新的hash表操作,不涉及原hash表(table)。

  • 然后会对原hash表(table)中的每个链表进行rehash,此时会尝试获取头节点的锁。这一步就保证了在rehash的过程中不能对这个链表执行put操作。
  • 通过sizeCtl控制,使扩容过程中不会new出多个新hash表来。
  • 最后,将所有键值对重新rehash到新表(nextTable)中后,用nextTable将table替换。这就避免了HashMap中get和扩容并发时,可能get到null的问题。
  • 在整个过程中,共享变量的存储和读取全部通过volatile或CAS的方式,保证了线程安全。

    1.1 总结

    多线程环境下,对共享变量的操作一定要小心。要充分从Java内存模型的角度考虑问题。
    ConcurrentHashMap中大量的用到了Unsafe类的方法,我们自己虽然也能拿到Unsafe的实例,但在生产中不建议这么做。 多数情况下,我们可以通过并发包中提供的工具来实现,例如Atomic包下面的可以用来实现CAS操作,lock包下可以用来实现锁相关的操作。
    善用线程安全的容器工具,例如ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue等,因为我们在工作中无法像ConcurrentHashMap这样通过Unsafe的getObjectVolatile和setObjectVolatile原子性的更新数组中的元素,所以这些并发工具是很重要的。

5.2 Set

5.2.1 HashSet底层原理

  1. public HashSet() {
  2. map = new HashMap<>();
  3. }
  4. public HashSet(Collection<? extends E> c) {
  5. map = new HashMap<>(Math.max((int) (c.size()/.75f) + 1, 16));
  6. addAll(c);
  7. }
  8. public HashSet(int initialCapacity, float loadFactor) {
  9. map = new HashMap<>(initialCapacity, loadFactor);
  10. }
  11. //add 方法 新增数据
  12. public boolean add(E e) {
  13. return map.put(e, PRESENT)==null;
  14. }

HashSet 默认的构造方法 都是由HashMap 实现,而新增方法是调用 HashMap的put方法,key 是HashSet 值 , val 是 默认的new Object(); 因为使用的HashMap的 key 作为 存储数据的方式,所以 HashSet是不会重复的;

5.3 List

5.3.1ArrayList底层原理

ArrayList底层就是new Object数组实现,调用add方法如下

  1. public boolean add(E e) {
  2. ensureCapacityInternal(size + 1); // Increments modCount!! 扩容
  3. elementData[size++] = e; 把地size个位置存放传入的数据,然后size++
  4. return true;
  5. }

扩容方法

  1. private void grow(int minCapacity) { //minCapacity 原数组长度+1
  2. // overflow-conscious code
  3. int oldCapacity = elementData.length;
  4. int newCapacity = oldCapacity + (oldCapacity >> 1); //扩容机制 原数组长度*1.5 取整
  5. if (newCapacity - minCapacity < 0)
  6. newCapacity = minCapacity; //取原数组长度*1.5和 +1的最大值
  7. if (newCapacity - MAX_ARRAY_SIZE > 0)
  8. newCapacity = hugeCapacity(minCapacity);
  9. // minCapacity is usually close to size, so this is a win:
  10. elementData = Arrays.copyOf(elementData, newCapacity);
  11. }