EJava 并发包 JUC

典型回答

Java提供了不同层面的线程安全支持。在传统集合框架内部,除了Hashtable等同步容器,还提供了所谓的同步包装器(Synchronized Wrapper),我们可以调用 Collections 工具类提供的包装方法,来获取一个同步的包装容器,但是它们都是利用非常粗粒度的同步方式,在高并发情况下,性能比较低下。

更加普遍的选择是利用并发包提供的线程安全容器类,它提供了:

  1. 各种并发容器,比如 ConcurrentHashMap,CopyOnWriteArrayList
  2. 各种线程安全队列(Queue/ Deque),ArrayBlockingQueue SynchronousQueue
  3. 各种有序容器的线程安全版本等。

具体保证线程安全的方式,从简单的sychronize方式,到基于更加精细化的,比如基于分离锁实现的
ConcurrentHashMap等并发实现等。总体来说,并发包内提供的容器通用场景,远优于早期的简单同步实现。

考点分析

  • 理解基本的线程安全工具
  • 理解传统集合框架并发编程中 Map 存在的问题,清楚简单同步方式的不足
  • 梳理并发包内,尤其是 ConcurrentHashMap 采取了哪些方法来提高并发表现
  • 最好能够掌握 ConcurrentHashMap 自身的演进,目前的很多分析资料还是基于其早期版本。

知识扩展

为什么需要ConcurrentHashMap?
Hashtable 低效,HashMap 非线程安全;同步包装器的内部实现还是利用了’this’ 作为互斥的mutex,没有真正意义上的改进。只适合在非高度并发的场景下。

ConcurrentHashMap 分析

ConcurrentHashMap 的设计实现其实一直在演化

早期 ConcurrentHashMap,其实现是基于:

  • 分离锁,也就是将内部进行分段(Segment),里面则是 HashEntry 的数组,和 HashMap 类似,哈希相同的条目也是以链表形式存放。
  • HashEntry 内部使用 volatile 的 value 字段来保证可见性,也利用了不可变对象的机制以改进利用 Unsafe 提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe 中的很多操作都是 JVM intrinsic 优化过的。

在进行并发写操作时:

  • ConcurrentHashMap 会获取再入锁,以保证数据一致性,Segment 本身就是基于 ReentrantLock 的扩展实现,所以,在并发修改期间,相应 Segment 是被锁定的。
  • 在最初阶段,进行重复性的扫描,以确定相应 key 值是否已经在数组里面,进而决定是更新还是放置操作,你可以在代码里看到相应的注释。重复扫描、检测冲突是 ConcurrentHashMap 的常见技巧。
  • 可能发生的扩容问题,在 ConcurrentHashMap 中同样存在。不过有一个明显区别,就是它进行的不是整体的扩容,而是单独对 Segment 进行扩容

Map 的 size 方法同样需要关注,它的实现涉及分离锁的一个副作用。
试想,如果不进行同步,简单的计算所有 Segment 的总值,可能会因为并发 put,导致结果不准确,但是直接锁定所有 Segment 进行计算,就会变得非常昂贵。
其实,分离锁也限制了 Map 的初始化等操作。
所以,ConcurrentHashMap 的实现是通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数 2),来试图获得可靠值。如果没有监控到发生变化(通过对比 Segment.modCount),就直接返回,否则获取锁进行操作。

在 Java 8 和之后的版本中,ConcurrentHashMap 发生了哪些变化呢?

  • 总体结构上,它的内部存储变得和我在专栏上一讲介绍的 HashMap 结构非常相似,同样是大的桶(bucket)数组,然后内部也是一个个所谓的链表结构(bin),同步的粒度要更细致一些。
  • 其内部仍然有 Segment 定义,但仅仅是为了保证序列化时的兼容性而已,不再有任何结构上的用处。
  • 因为不再使用 Segment,初始化操作大大简化,修改为 lazy-load 形式,这样可以有效避免初始开销,解决了老版本很多人抱怨的这一点。
  • 数据存储利用 volatile 来保证可见性。
  • 使用 CAS 等操作,在特定场景进行无锁并发操作。
  • 使用 Unsafe、LongAdder 之类底层手段,进行极端情况的优化。
  1. final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException();
  2. int hash = spread(key.hashCode());
  3. int binCount = 0;
  4. for (Node<K,V>[] tab = table;;) {
  5. Node<K,V> f; int n, i, fh; K fk; V fv;
  6. if (tab == null || (n = tab.length) == 0)
  7. tab = initTable();
  8. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  9. // 利用CAS去进行无锁线程安全操作,如果bin是空的
  10. if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
  11. break;
  12. }
  13. else if ((fh = f.hash) == MOVED)
  14. tab = helpTransfer(tab, f);
  15. else if (onlyIfAbsent // 不加锁,进行检查
  16. && fh == hash
  17. && ((fk = f.key) == key || (fk != null && key.equals(fk)))
  18. && (fv = f.val) != null)
  19. return fv;
  20. else {
  21. V oldVal = null;
  22. synchronized (f) {
  23. // 细粒度的同步修改操作...
  24. }
  25. }
  26. // Bin超过阈值,进行树化
  27. if (binCount != 0) {
  28. if (binCount >= TREEIFY_THRESHOLD)
  29. treeifyBin(tab, i);
  30. if (oldVal != null)
  31. return oldVal;
  32. break;
  33. }
  34. }
  35. }
  36. addCount(1L, binCount);
  37. return null;
  38. }

初始化操作实现在 initTable 里面,这是一个典型的 CAS 使用场景,利用 volatile 的 sizeCtl 作为互斥手段:如果发现竞争性的初始化,就 spin 在那里,等待条件恢复;否则利用 CAS 设置排他标志。如果成功则进行初始化;否则重试。

  1. private final Node<K,V>[] initTable() {
  2. Node<K,V>[] tab; int sc;
  3. while ((tab = table) == null || tab.length == 0) {
  4. // 如果发现冲突,进行spin等待
  5. if ((sc = sizeCtl) < 0)
  6. Thread.yield();
  7. // CAS成功返回true,则进入真正的初始化逻辑
  8. else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
  9. try {
  10. if ((tab = table) == null || tab.length == 0) {
  11. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  12. @SuppressWarnings("unchecked")
  13. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  14. table = tab = nt;
  15. sc = n - (n >>> 2);
  16. }
  17. } finally {
  18. sizeCtl = sc;
  19. }
  20. break;
  21. }
  22. }
  23. return tab;
  24. }

还是得配合视频资料在深入学习下,涉及到的东西有点多,目前难以融会贯通;