Java ConcurrentHashMap
如何在高并发下提高系统吞吐是所有后端开发者追求的目标,Java并发的开创者Doug Lea在Java 7 ConcurrentHashMap的设计中给出了一些参考答案,这里总结了ConcurrentHashMap源码中影响并发性能的十个细节,有常见的自旋锁,CAS的使用,也有延迟写内存,volatile语义退化等不常见的技巧。
由于ConcurrentHashMap的内容比较多,而且Java 7和Java 8两个版本的实现相差比较大。
《阿里巴巴Java开发手册》的作者孤尽对ConcurrentHashMap的设计十分推崇,他说:“ConcurrentHashMap源码是学习Java代码开发规范的一个非常好的学习材料,我建议同学们可以时常去看一看,总会有新的收获的”,相信大家平常也能听到很多对于ConcurrentHashMap设计的溢美之词,在展开隐藏在ConcurrentHashMap所有小秘密之前,在大脑中首先要有这样的一幅图:
ConcurrentHashMap中十个提升性能的细节 - 图1
对于Java 7来说,这张图已经能完全把ConcurrentHashMap的架构说清楚了:

  1. ConcurrentHashMap是一个线程安全的Map实现,其读取不需要加锁,通过引入Segment,可以做到写入的时候加锁力度足够小
  2. 由于引入了SegmentConcurrentHashMap在读取和写入的时候需要需要做两次哈希,但这两次哈希换来的是更细力粒度的锁,也就意味着可以支持更高的并发
  3. 每个桶数组中的key-value对仍然以链表的形式存放在桶中,这一点和HashMap是一致的。

关于Java 7的ConcurrentHashMap的整体架构,用上面三两句话就可以概括,这张图应该很快就可以在大家的大脑中留下印象,接下来通过几个问题来引发思考:

  1. ConcurrentHashMap的哪些操作需要加锁?
  2. ConcurrentHashMap的无锁读是如何实现的?
  3. 在多线程的场景下调用size()方法获取ConcurrentHashMap的大小有什么挑战?ConcurrentHashMap是怎么解决的?
  4. 在有Segment存在的前提下,应该如何扩容的?

HashMap中最重要的点有四个:初始化,数据寻址-hash方法,数据存储-put方法,扩容-resize方法,对于ConcurrentHashMap来说,这四个操作依然是最重要的,但由于其引入了更复杂的数据结构,因此在调用size()查看整个ConcurrentHashMap的数量大小的时候也有不小的挑战,也会重点看下Doug Lea在size()方法中的设计。

初始化

  1. public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
  2. if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
  3. throw new IllegalArgumentException();
  4. if (concurrencyLevel > MAX_SEGMENTS)
  5. concurrencyLevel = MAX_SEGMENTS;
  6. // Find power-of-two sizes best matching arguments
  7. int sshift = 0;
  8. int ssize = 1;
  9. // 保证ssize是大于concurrencyLevel的最小的2的整数次幂
  10. while (ssize < concurrencyLevel) {
  11. ++sshift;
  12. ssize <<= 1;
  13. }
  14. // 寻址需要两次哈希,哈希的高位用于确定segment,低位用户确定桶数组中的元素
  15. this.segmentShift = 32 - sshift;
  16. this.segmentMask = ssize - 1;
  17. if (initialCapacity > MAXIMUM_CAPACITY)
  18. initialCapacity = MAXIMUM_CAPACITY;
  19. int c = initialCapacity / ssize;
  20. if (c * ssize < initialCapacity)
  21. ++c;
  22. int cap = MIN_SEGMENT_TABLE_CAPACITY;
  23. while (cap < c)
  24. cap <<= 1;
  25. Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
  26. Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
  27. UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
  28. this.segments = ss;
  29. }

初始化方法中做了三件重要的事:

  1. 确定了segments的数组的大小ssizessize根据入参concurrencyLevel确定,取大于concurrencyLevel的最小的2的整数次幂
  2. 确定哈希寻址时的偏移量,这个偏移量在确定元素在segment数组中的位置时会用到
  3. 初始化segment数组中的第一个元素,元素类型为HashEntry的数组,这个数组的长度为initialCapacity / ssize,即初始化大小除以segment数组的大小,segment数组中的其他元素在后续put操作时参考第一个已初始化的实例初始化

    1. static final class HashEntry<K,V> {
    2. final int hash;
    3. final K key;
    4. volatile V value;
    5. volatile HashEntry<K,V> next;
    6. HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
    7. this.hash = hash;
    8. this.key = key;
    9. this.value = value;
    10. this.next = next;
    11. }
    12. final void setNext(HashEntry<K,V> n) {
    13. UNSAFE.putOrderedObject(this, nextOffset, n);
    14. }
    15. }

    这里的HashEntryHashMap中的HashEntry作用是一样的,它是ConcurrentHashMap的数据项,这里要注意两个细节:

    细节一

    HashEntry的成员变量valuenext是被关键字volatile修饰的,也就是说所有线程都可以及时检查到其他线程对这两个变量的改变,因而可以在不加锁的情况下读取到这两个引用的最新值。

    细节二

    HashEntrysetNext方法中调用了UNSAFE.putOrderedObject,这个接口是属于sun安全库中的api,并不是J2SE的一部分,它的作用和volatile恰恰相反,调用这个api设值是使得volatile修饰的变量延迟写入主存,那到底是什么时候写入主存呢?
    JMM中有一条规定:
    对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行storewrite操作)
    后文在讲put方法的时候再详细看setNext的用法

    哈希

    由于引入了segment,因此不管是调用get方法读还是调用put方法写,都需要做两次哈希,还记得在初始化的时候系统做了一件重要的事:

  • 确定哈希寻址时的偏移量,这个偏移量在确定元素在segment数组中的位置时会用到

没错就是这段代码:

  1. this.segmentShift = 32 - sshift;

这里用32去减是因为int型的长度是32,有了segmentShiftConcurrentHashMap是如何做第一次哈希的呢?

  1. public V put(K key, V value) {
  2. Segment<K,V> s;
  3. if (value == null)
  4. throw new NullPointerException();
  5. int hash = hash(key);
  6. // 变量j代表着数据项处于segment数组中的第j项
  7. int j = (hash >>> segmentShift) & segmentMask;
  8. // 如果segment[j]为null,则下面的这个方法负责初始化之
  9. s = ensureSegment(j);
  10. return s.put(key, hash, value, false);
  11. }

put方法为例,变量j代表着数据项处于segment数组中的第j项。如下图所示假如segment数组的大小为2的n次方,则hash >>> segmentShift正好取了key的哈希值的高n位,再与掩码segmentMask相与相当与仍然用key的哈希的高位来确定数据项在segment数组中的位置。
ConcurrentHashMap中十个提升性能的细节 - 图2
hash方法与非线程安全的HashMap相似,这里不再细说。

细节三

在延迟初始化Segment数组时,作者采用了CAS避免了加锁,而且CAS可以保证最终的初始化只能被一个线程完成。在最终决定调用CAS进行初始化前又做了两次检查,第一次检查可以避免重复初始化tab数组,而第二次检查则可以避免重复初始化Segment对象,每一行代码都有详细的考虑。

  1. private Segment<K,V> ensureSegment(int k) {
  2. final Segment<K,V>[] ss = this.segments;
  3. long u = (k << SSHIFT) + SBASE; // raw offset 实际的字节偏移量
  4. Segment<K,V> seg;
  5. if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
  6. Segment<K,V> proto = ss[0]; // use segment 0 as prototype
  7. int cap = proto.table.length;
  8. float lf = proto.loadFactor;
  9. int threshold = (int)(cap * lf);
  10. HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
  11. if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck 再检查一次是否已经被初始化
  12. Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
  13. while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
  14. if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) // 使用 CAS 确保只被初始化一次
  15. break;
  16. }
  17. }
  18. }
  19. return seg;
  20. }

put方法

  1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  2. HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
  3. V oldValue;
  4. try {
  5. HashEntry<K,V>[] tab = table;
  6. int index = (tab.length - 1) & hash;
  7. HashEntry<K,V> first = entryAt(tab, index);
  8. for (HashEntry<K,V> e = first;;) {
  9. if (e != null) {
  10. K k; // 如果找到key相同的数据项,则直接替换
  11. if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
  12. oldValue = e.value;
  13. if (!onlyIfAbsent) {
  14. e.value = value;
  15. ++modCount;
  16. }
  17. break;
  18. }
  19. e = e.next;
  20. }
  21. else {
  22. if (node != null)
  23. // node不为空说明已经在自旋等待时初始化了,注意调用的是setNext,不是直接操作next
  24. node.setNext(first);
  25. else
  26. // 否则,在这里新建一个HashEntry
  27. node = new HashEntry<K,V>(hash, key, value, first);
  28. int c = count + 1; // 先加1
  29. if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  30. rehash(node);
  31. else
  32. // 将新节点写入,注意这里调用的方法有门道
  33. setEntryAt(tab, index, node);
  34. ++modCount;
  35. count = c;
  36. oldValue = null;
  37. break;
  38. }
  39. }
  40. } finally {
  41. unlock();
  42. }
  43. return oldValue;
  44. }

这段代码在整个ConcurrentHashMap的设计中非常出彩,在这短短的40行代码中,Doug Lea就像一位神奇的魔术师,转眼间已经变换了好几种魔法,让人目瞪口呆,感叹其对并发的理解之深,让慢慢的解析Doug Lea在这段代码中使用的魔法:

细节四

CPU的调度是公平的,好不容易轮到的时间片如果因为获取不到锁就将本线程挂起无疑会降低本线程的效率,更何况挂起之后还要重新调度,切换上下文,又是一笔不小的开销。如果可以遇见其他线程占有锁的时间不会很长,采用自旋将会是一个比较好的选择,在这里面也有一个权衡,如果别的线程占有锁的时间过长,反而是挂起阻塞等待性能好一点,来看下ConcurrentHashMap的做法:

  1. private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
  2. HashEntry<K,V> first = entryForHash(this, hash);
  3. HashEntry<K,V> e = first;
  4. HashEntry<K,V> node = null;
  5. int retries = -1; // negative while locating node
  6. while (!tryLock()) { // 自旋等待
  7. HashEntry<K,V> f; // to recheck first below
  8. if (retries < 0) {
  9. if (e == null) { // 这个桶中还没有写入k-v项
  10. if (node == null) // speculatively create node 直接创建一个新的节点
  11. node = new HashEntry<K,V>(hash, key, value, null);
  12. retries = 0;
  13. }
  14. // key值相等,直接跳出去尝试获取锁
  15. else if (key.equals(e.key))
  16. retries = 0;
  17. else // 遍历链表
  18. e = e.next;
  19. }
  20. else if (++retries > MAX_SCAN_RETRIES) {
  21. // 自旋等待超过一定次数之后只能挂起线程,阻塞等待了
  22. lock();
  23. break;
  24. }
  25. else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
  26. // 如果头节点改变了,则重置次数,继续自旋等待
  27. e = first = f;
  28. retries = -1;
  29. }
  30. }
  31. return node;
  32. }

ConcurrentHashMap的策略是自旋MAX_SCAN_RETRIES次,如果还没有获取到锁则调用lock挂起阻塞等待,当然如果其他线程采用头插法改变了链表的头结点,则重置自旋等待次数。

细节五

要知道,如果要从编码的角度提升系统的并发度,一个黄金法则就是减少并发临界区的大小。在scanAndLockForPut这个方法的设计上,就是在自旋的过程中初始化了一个HashEntry,这样做的好处就是线程在拿到锁之后不用初始化HashEntry了,占有锁的时间相应减小,进而提升性能。

细节六

put方法的开头,有这么一行不起眼的代码:

  1. HashEntry<K,V>[] tab = table;

看起来好像就是简单的临时变量赋值,其实大有来头,看一下table的声明:

  1. transient volatile HashEntry<K,V>[] table;

table变量被关键字volatile修饰,CPU在处理volatile修饰的变量的时候会有下面的行为:

嗅探

每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到处理器缓存里
因此直接读取这类变量的读取和写入比普通变量的性能消耗更大,因此在put方法的开头将table变量赋值给一个普通的本地变量目的是为了消除volatile带来的性能损耗。这里就有另外一个问题:那这样做会不会导致table的语义改变,让别的线程读取不到最新的值呢?别着急,接着看。

细节七

注意put方法中的这个方法:entryAt():

  1. static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
  2. return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)i << TSHIFT) + TBASE);
  3. }

这个方法的底层会调用UNSAFE.getObjectVolatile,这个方法的目的就是对于普通变量读取也能像volatile修饰的变量那样读取到最新的值,在前文中分析过,由于变量tab现在是一个普通的临时变量,如果直接调用tab[i]不一定能拿到最新的首节点的。兜兜转换不是回到了原点么,为啥不刚开始就操作volatile变量呢,费了这老大劲。继续往下看。

细节八

put方法的实现中,如果链表中没有key值相等的数据项,则会把新的数据项插入到链表头写入到数组中,其中调用的方法是:

  1. static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) {
  2. UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
  3. }

putOrderedObject这个接口写入的数据不会马上被其他线程获取到,而是在put方法最后调用unclock后才会对其他线程可见,参见对JMM的描述:
对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行storewrite操作)
这样的好处有两个,第一是性能,因为在持有锁的临界区不需要有同步主存的操作,因此持有锁的时间更短。第二是保证了数据的一致性,在put操作的finally语句执行完之前,put新增的数据是不对其他线程展示的,这是ConcurrentHashMap实现无锁读的关键原因。
在这里稍微总结一下put方法里面最重要的三个细节,首先将volatile变量转为普通变量提升性能,因为在put中需要读取到最新的数据,因此接下来调用UNSAFE.getObjectVolatile获取到最新的头结点,但是通过调用UNSAFE.putOrderedObject让变量写入主存的时间延迟到put方法的结尾,一来缩小临界区提升性能,而来也能保证其他线程读取到的是完整数据。

细节九

如果put真的需要往链表头插入数据项,那也得注意了,ConcurrentHashMap相应的语句是:

  1. node.setNext(first);

看下setNext的具体实现:

  1. final void setNext(HashEntry<K,V> n) {
  2. UNSAFE.putOrderedObject(this, nextOffset, n);
  3. }

因为next变量是用volatile关键字修饰的,这里调用UNSAFE.putOrderedObject相当于是改变了volatile的语义,这里面的考量有两个,第一个仍然是性能,这样的实现性能明显更高,这一点前文已经详细的分析过,第二点是考虑了语义的一致性,对于put方法来说因为其调用的是UNSAFE.getObjectVolatile,仍然能获取到最新的数据,对于get方法,在put方法未结束之前,是不希望不完整的数据被其他线程通过get方法读取的,这也是合理的。

resize扩容

  1. private void rehash(HashEntry<K,V> node) {
  2. HashEntry<K,V>[] oldTable = table;
  3. int oldCapacity = oldTable.length;
  4. int newCapacity = oldCapacity << 1;
  5. threshold = (int)(newCapacity * loadFactor);
  6. HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
  7. int sizeMask = newCapacity - 1;
  8. for (int i = 0; i < oldCapacity ; i++) {
  9. HashEntry<K,V> e = oldTable[i];
  10. if (e != null) {
  11. HashEntry<K,V> next = e.next;
  12. int idx = e.hash & sizeMask;
  13. if (next == null) // Single node on list 只有一个节点,简单处理
  14. newTable[idx] = e;
  15. else {
  16. HashEntry<K,V> lastRun = e;
  17. int lastIdx = idx;
  18. // 保证下文中newTable[k]不会为null
  19. for (HashEntry<K,V> last = next;
  20. last != null;
  21. last = last.next) {
  22. int k = last.hash & sizeMask;
  23. if (k != lastIdx) {
  24. lastIdx = k;
  25. lastRun = last;
  26. }
  27. }
  28. newTable[lastIdx] = lastRun;
  29. // Clone remaining nodes 对标记之前的不能重用的节点进行复制,再重新添加到新数组对应的hash桶中去
  30. for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
  31. V v = p.value;
  32. int h = p.hash;
  33. int k = h & sizeMask;
  34. HashEntry<K,V> n = newTable[k];
  35. newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
  36. }
  37. }
  38. }
  39. }
  40. int nodeIndex = node.hash & sizeMask; // add the new node 部分的put功能,把新节点添加到链表的最前面
  41. node.setNext(newTable[nodeIndex]);
  42. newTable[nodeIndex] = node;
  43. table = newTable;
  44. }

在整个桶数组长度为2的正整数幂的情况下,扩容前同一个桶中的元素在扩容后只会分布在两个桶中,其中一个桶的下标保持不变,称之为旧桶,另一个桶的下标为旧桶下标加上旧的容量,称之为新桶,其实第一个for循环的目的就是在一个链表中找到最后一个应该移到新桶的数据项,直接移到新桶中,这样做是为了保证后面调用HashEntry<K,V> n = newTable[k];的时候不会读取到null。第二个for就比较简单了,将所有的数据项移到新的桶数组中,当所有的操作完成之后才将newTable赋值给table
rehash方法中是没有加锁的,并不是说调用这个方法不需要加锁,是在外层加了锁,这一点需要注意。

size方法

之前在分析HashMap方法的时候并没有去说size方法,因为在单线程环境下这个方法可以使用一个全局的变量解决,同样的方案当然也可以在多线程场景下使用,不过要在多线程环境下读取全局变量又会陷入到无尽的“锁”中,这是大家不愿意看到的,那ConcurrentHashMap是如何解决这个问题的呢:

  1. public int size() {
  2. final Segment<K,V>[] segments = this.segments;
  3. int size;
  4. boolean overflow; // true if size overflows 32 bits
  5. long sum; // sum of modCounts
  6. long last = 0L; // previous sum
  7. int retries = -1; // first iteration isn't retry
  8. try {
  9. for (;;) {
  10. if (retries++ == RETRIES_BEFORE_LOCK) {
  11. for (int j = 0; j < segments.length; ++j)
  12. ensureSegment(j).lock(); // force creation
  13. }
  14. sum = 0L;
  15. size = 0;
  16. overflow = false;
  17. for (int j = 0; j < segments.length; ++j) {
  18. Segment<K,V> seg = segmentAt(segments, j);
  19. if (seg != null) {
  20. sum += seg.modCount;
  21. int c = seg.count;
  22. if (c < 0 || (size += c) < 0)
  23. overflow = true;
  24. }
  25. }
  26. if (sum == last)
  27. break;
  28. last = sum;
  29. }
  30. } finally {
  31. if (retries > RETRIES_BEFORE_LOCK) {
  32. for (int j = 0; j < segments.length; ++j)
  33. segmentAt(segments, j).unlock();
  34. }
  35. }
  36. return overflow ? Integer.MAX_VALUE : size;
  37. }

在前面介绍put方法时选择忽略了一个小小的成员变量modCount,这个变量在这里大显身手,它的主要作用就是记录整个Segment中写入操作的次数,因为写入操作是会影响整个ConcurrentHashMap的大小的。
因为在读取ConcurrentHashMap大小的时候需要保证读到的是最新的值,因此其调用了UNSAFE.getObjectVolatile这个方法,虽然这个方法的性能比普通变量要差,但是比起全局加锁,可好多了。

  1. static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
  2. long u = (j << SSHIFT) + SBASE; // 计算实际的字节偏移量
  3. return ss == null ? null : (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
  4. }

细节十

size方法的设计上,ConcurrentHashMap先尝试无锁的方法,如果两次遍历所有segment数组的时候整个ConcurrentHashMap没有发生写入操作,则直接返回每个segment数组的size()之和,否则重新遍历,如果写入操作频繁,则不得已加锁处理,这里的加锁相当于是一个全局的锁,因为对segment数组的每一个元素都加了锁。那如何判断整个ConcurrentHashMap的写入是否频繁呢?就看无锁重试的次数,当无锁重试的次数超过阈值的话就全局加锁处理。

总结

在看完ConcurrentHashMap中的这些细节之后尝试回答一下文章开头提出来的问题:

  1. ConcurrentHashMap的哪些操作需要加锁?

答:只有写入操作才需要加锁,读取操作不需要加锁

  1. ConcurrentHashMap的无锁读是如何实现的?

答:首先HashEntry中的valuenext都是有volatile修饰的,其次在写入操作的时候通过调用UNSAFE库延迟同步了主存,保证了数据的一致性

  1. 在多线程的场景下调用size()方法获取ConcurrentHashMap的大小有什么挑战?ConcurrentHashMap是怎么解决的?

答:size()具有全局的语义,如何能保证在不加全局锁的情况下读取到全局状态的值是一个很大的挑战,ConcurrentHashMap通过查看两次无锁读中间是否发生了写入操作来决定读取到的size()是否可信,如果写入操作频繁,则再退化为全局加锁读取。

  1. 在有Segment存在的前提下,是如何扩容的?

答:segment数组的大小在一开始初始化的时候就已经决定了,扩容主要扩的是HashEntry数组,基本的思路与HashTable一致,但这是一个线程不安全方法,调用之前需要加锁。