JUC提供了一组针对List、Set、Map集合的线程安全版本,相对于粗暴使用synchronized关键字来说,性能更高,控制也更精细化。

1. ConcurrentHashMap

1.1 为什么HashMap线程不安全

  • HashMap底层使用链表(1.8后还有红黑树)来存储值,在对key做hash运算后,若产生碰撞,会把hash相同的值放进链表存储(可以称之为哈希桶)。
  • HashMap本身有存储容量上限,默认是16,扩容因数是0.75,当前已用容量达到75%的时候会触发扩容,扩容会对所有的key-value进行rehash,重新计算hash值后再放入新的链表中。
  • 假如A线程put进了一组数据,其中有a→b的链表结构,但在put过程中触发扩容,rehash后,新的链表结构可能是b→a。
  • 此时线程B闯入,也put进数据,并产生a→b的链表结构。
  • 这样就形成了a→b→a的循环链表,一旦我们从该map里取出不存在的key(遍历所有key,迟早会找到这个循环链表),或者刚好查值查到了该循环链表,那么就会陷入死循环。
  • 综上所述,HashMap是线程不安全的,原因就是put元素的过程没有满足原子性和可见性,我们可以直接使用synchronized关键字修饰来确保线程安全,但这样做就会严重影响性能(HashTable就是这么实现的,所以特别慢)。

    1.2 实现原理

    HashTable虽然保证线程安全,但本质是锁住了整张哈希表,因此影响了性能。

    JDK1.7

    JDK1.7的思路是将链表拆分成多个Segment,分段加锁,这样性能就会好一些。
    image.png

  • 由图可知,只要线程对map的操作分布在不同的segment上,就不会产生线程安全问题。

  • 但segment一旦初始化就不可改变,上图有16个segment,那最多就只能支持16个线程同时访问。
  • segment数组不可扩容,但segment对象内部真正存放元素的链表可以扩容。
  • 1.7一定程度改善了HashTable的问题,但如果某个链表长度特别大,超过8时,就会影响查询元素的性能。
  • put 操作的线程安全性:
    • 初始化槽,使用了 CAS 来初始化 Segment 中的数组。
    • 添加节点到链表的操作是插入到表头的,所以如果这个时候 get 操作在链表遍历的过程已经到了中间,是不会影响的。
    • 但另一个问题就是如果 get 操作在 put 之后会读取不到表头的值,解决这个问题依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
    • 扩容问题:扩容是新创建了数组,然后进行迁移数据,最后将新table赋值给旧table。所以,如果 get 操作此时也在进行,那么也没关系,但如果 get 先操作,那么就是在旧的 table 上做查询操作,可能会引起脏读;而如果 put 先行,那么 put 操作的可见性保证就是依赖 table 上的 volatile 关键字。
  • remove 操作的线程安全性:
    • get 操作需要遍历链表,但是 remove 操作会”破坏”链表。
    • 如果get 操作已经查询到了被 remove 破坏的节点了(get先于remove),那么这里不存在任何问题。
    • 但如果 remove 先于 get 破坏了一个节点,分两种情况考虑:
      • 如果此节点是头结点,那么需要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,但是 volatile 并不能保证数组内部元素操作的可见性,所以源码中使用了 UNSAFE 来操作数组,请看方法 setEntryAt。
      • 如果要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。
  • 实际使用中,segment机制的性能依旧不太好,而且不适合超高并发情况(因为线程数量受限)。

    JDK1.8

    JDK1.8吸取了1.7的教训,摒弃了限制繁多的segment,继续使用和HashMap类似的数组+链表+红黑树来改善遍历效率,再通过CAS操作来解决高并发问题(其实这也符合哈希表读取场景大于写入场景的情况,因此借助乐观锁机制来解决线程安全问题)。
    image.png

    1.3 put过程分析

  • 首先,初始化一个合适大小的数组initTable,数组内部存储Entry键值对,然后会设置 sizeCtl。

    • 这个过程的并发问题是通过对 sizeCtl 进行一个 CAS 操作来控制的。
  • treeifyBin,这个操作要么扩容数组,要么将数组转为红黑树,具体根据数组长度决定,超过8的倍数就会触发构建红黑树。
  • tryPresize,这里是做翻倍扩容操作的,扩容后数组容量为原来的 2 倍。
  • transfer,这里是在扩容后做数据迁移,扩容操作往往会伴随多次迁移(此处的迁移不涉及多线程,由扩容操作独立执行)。
  • 但put方法也会触发transfer,这里就会涉及到多线程,ConcurrentHashMap的机制是将迁移任务拆分成一堆的小任务包,分别给不同的线程去执行(同一个线程可以分到多个任务包),这个过程也会使用CAS操作sizeCtl来确保线程安全。

    1.4 get过程分析

  • 计算 hash 值

  • 根据 hash 值找到数组对应位置: (n - 1) & h
  • 根据该位置处结点性质进行相应查找

    • 如果该位置为 null,那么直接返回 null 就可以了
    • 如果该位置处的节点刚好就是我们需要的,返回该节点的值即可
    • 如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树
    • 如果以上 3 条都不满足,那就是链表,进行遍历比对即可

      1.5 Map集合总结

  • HashTable:使用了synchronized关键字对put等操作进行加锁

  • ConcurrentHashMap JDK1.7:使用分段锁机制实现
  • ConcurrentHashMap JDK1.8:则使用数组+链表+红黑树数据结构和CAS原子操作实现

    2. CopyOnWriteArrayList

    CopyOnWriteArrayList是ArrayList 的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的拷贝来实现的,COW(Copy-on-Write,写时复制)模式的体现。
    相对于普通的ArrayList,该类的成员属性有所不同:

  • 内部使用了ReentrantLock可重入锁来保证本身的线程安全性

  • 针对数组使用了volatile关键字
  • 针对数组内部存储的具体元素,使用反射机制和CAS操作来确保原子性

    2.1 核心函数

    add()

    此函数用于将指定元素添加到此列表的尾部,处理流程如下:
  1. 首先获取锁(保证多线程的安全访问),获取当前的Object数组,获取Object数组的长度为length。
  2. 然后根据老数组复制一个长度为length+1的新数组(此时,末尾元素为null)。
  3. 最后将下标为length的数组元素(也就是末尾元素)设置为元素e,再将新数组赋值给老数组,释放锁,返回。这样就完成了元素的添加。

    addIfAbsent()

    该函数用于添加尚未存在的元素(如果已存在,就不添加直接返回),可以保证多线程环境下不会重复添加元素:

  4. 获取锁,获取当前数组为current,current长度为len,判断数组之前的快照snapshot是否等于当前数组current,若不相等,则进入步骤2;否则,进入步骤4

  5. 不相等,表示在snapshot与current之间,有其他线程对数组进行了修改(如进行了add、set、remove等操作),获取snapshot与current之间较小者的长度,对current进行遍历操作,若遍历过程发现snapshot与current的元素不相等并且current的元素与指定元素相等(可能进行了set操作),进入步骤5,否则,进入步骤3
  6. 在当前数组current中索引指定元素,若能够找到,进入步骤5,否则,进入步骤4
  7. 复制当前数组current为新数组,长度为len+1,此时新数组末尾元素为null。再把末尾元素设为指定元素e,再把新数组赋值给current,进入步骤5
  8. 释放锁,返回。

    set()

    此函数可以用指定的元素替代此list指定位置上的元素,也是基于数组的复制来实现的,不再赘述。

    remove()

    此函数用于移除此列表指定位置上的元素:

  9. 获取锁,获取数组elements,数组长度为length,获取索引的值elements[index],计算需要移动的元素个数(length - index - 1),若个数为0,则表示移除的是数组的最后一个元素,复制elements数组,复制长度为length-1,然后设置数组,进入步骤3;否则,进入步骤2

  10. 先复制index索引前的元素,再复制index索引后的元素,然后设置数组。
  11. 释放锁,返回旧值。

    2.2 使用场景与缺陷

    CopyOnWriteArrayList 有几个缺点:
  • 由于写操作的时候,需要拷贝数组,会消耗内存,如果原数组的内容比较多的情况下,可能导致young gc或者full gc
  • 不能用于实时读的场景,像拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到数据可能还是旧的,虽然CopyOnWriteArrayList 能做到最终一致性,但是还是没法满足实时性要求
  • CopyOnWriteArrayList合适读多写少的场景,不过仍需慎用,因为谁也没法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要重新复制数组,这个代价实在太高昂了。在高性能的web应用中,这种操作容易OOM。

    为什么并发安全且性能比Vector好?

    Vector对单独的add,remove等方法都是在方法上加了synchronized;并且如果一个线程A调用size时,另一个线程B 执行了remove,然后size的值就不是最新的,然后线程A调用remove就会越界(这时就需要再加一个Synchronized)。这样就导致有了双重锁,效率大大降低,于是vector废弃了。

    3. ConcurrentLinkedQueue

  • ConcurerntLinkedQueue一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。

  • 队列的头部是队列中存在时间最长的元素。
  • 队列的尾部是队列中存在时间最短的元素。
  • 新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。
  • 当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue是一个恰当的选择,此队列不允许出现null元素。

线程安全的集合类 - 图3

3.1 源码分析

类关系和成员变量

  • ConcurrentLinkedQueue继承了抽象类AbstractQueue,AbstractQueue定义了对队列的基本操作;同时实现了Queue接口,Queue定义了对队列的基本操作,同时,还实现了Serializable接口,表示可以被序列化。
  • 包含了一个核心内部类Node,通过使用volatile关键字保证字段可见性。

    • 使用Unsafe类和反射机制来保障CAS操作元素,以保证原子性

      1. public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, Serializable {
      2. private static class Node<E> {
      3. // 当前元素
      4. volatile E item;
      5. // next元素
      6. volatile Node<E> next;
      7. }
      8. // 使用Unsafe类支撑CAS操作
      9. private static final sun.misc.Unsafe UNSAFE;
      10. // item域的偏移量
      11. private static final long itemOffset;
      12. // next域的偏移量
      13. private static final long nextOffset;
      14. }

      核心函数

      offer()

      offer函数用于将指定元素插入此队列的尾部。下面模拟offer函数的操作,队列状态的变化(假设单线程添加元素,连续添加10、20两个元素):
      线程安全的集合类 - 图4

  • 若ConcurrentLinkedQueue的初始状态如上图所示,即队列为空。单线程添加元素,此时,添加元素10,则状态如下所示

线程安全的集合类 - 图5

  • 如上图所示,添加元素10后,tail没有变化,还是指向之前的结点,继续添加元素20,则状态如下所示

线程安全的集合类 - 图6

  • 如上图所示,添加元素20后,tail指向了最新添加的结点。

    poll()

    此函数用于获取并移除此队列的头,如果此队列为空,则返回null。下面模拟poll函数的操作,队列状态的变化(假设单线程操作,状态为之前offer10、20后的状态,poll两次):
    线程安全的集合类 - 图7

  • 在poll操作后,队列的状态如下图所示:

线程安全的集合类 - 图8

  • 如上图可知,poll操作后,head改变了,并且head所指向的结点的item变为了null。再进行一次poll操作,队列的状态如下图所示:

线程安全的集合类 - 图9

  • 由于是最后一个元素,头尾节点指针不再变化,只是元素为null了

    remove()

    此函数用于从队列中移除指定元素的单个实例(如果存在),其中,会调用到first函数和succ函数,first用来找到第一个存活元素,succ用于获取下一个结点,下面是模拟的remove操作:
    线程安全的集合类 - 图10

  • remove 10后,head移动到旧的head结点的下一个结点,并会把其元素设为null

线程安全的集合类 - 图11
线程安全的集合类 - 图12

  • remove 20后,head继续移动,并把元素设为null

    size()

    获取队列的长度,null元素不会被统计在内

    3.2 HOPS(延迟更新的策略)的设计

    通过上面对offer和poll方法的分析,我们发现tail和head是延迟更新的,两者更新触发时机为:

  • tail更新触发时机:当tail节点的next节点不为null的时候,会执行定位队列真正尾节点的操作,找到队尾节点并完成插入之后才会通过casTail()更新tail;

  • 当tail节点的下一个节点为null的时候,只插入节点不更新tail。
  • head更新触发时机:
    • 当head指向的节点的item域为null的时候,会执行定位队列真正头节点的操作,找到队头节点并完成删除之后才会通过updateHead()更新head;
    • 当head指向的节点的item域不为null的时候,只删除节点不更新head。 从上面更新时的状态图可以看出,head和tail的更新是“延迟的”。

那么这样设计的意图是什么呢? 如果让tail永远作为队列的队尾节点,实现的代码量会更少,而且逻辑更易懂。但是,这样做有一个缺点,如果大量的入队操作,每次都要执行CAS更新tail,汇总起来对性能也会是大大的损耗。如果能减少CAS更新的操作,无疑可以大大提升入队的操作效率。

所以JUC包作者doug lea大师每间隔1次(tail和队尾节点的距离为1)进行才利用CAS更新tail。对head的更新也是同样的道理,虽然这样设计会多出一步在循环中定位队尾节点的操作,但总体来说读的效率要远远高于写的性能,因此多出来的在循环中定位尾节点的操作性能损耗相对CAS而言是很小的。

3.3 适用场景

ConcurrentLinkedQueue通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如阻塞队列常见,毕竟取数据也要不停的去循环,不如阻塞的逻辑好设计,但是在并发量特别大的情况下,是个不错的选择,性能上好很多,而且这个队列的设计也是特别费力,尤其使用的改良算法和对哨兵的处理。整体思路都是比较严谨的,需要使用无锁队列的时候,这就是个不错的参考。

4. Blocking队列

BlockingQueue

BlockingQueue是一个接口,通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:
线程安全的集合类 - 图13

  • 一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
  • 负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列

    核心方法

    BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常 特定值 阻塞 超时
插入 add(o) offer(o) put(o) offer(o, timeout, timeunit)
移除 remove(o) poll(o) take(o) poll(timeout, timeunit)
检查 element(o) peek(o)

四组不同的行为方式解释:

  • 抛异常: 如果试图的操作无法立即执行,抛一个异常。
  • 特定值: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

  • 无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。

  • 可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。
    • 但是这么干效率并不高(基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此尽量不要用这一类的方法,除非你确实不得不那么做。

      BlockingDeque

      相对BlockingQueue 队列来说,BlockingDeque是一个双端队列,允许同时在队列两端插入、获取元素。如果生产者线程需要在队列的两端都可以插入数据,消费者线程需要在队列的两端都可以移除数据,这个时候可以使用 BlockingDeque:
      线程安全的集合类 - 图14

      核心方法

      BlockingDeque 具有 4 组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛异常 特定值 阻塞 超时
插入 addFirst(o) offerFirst(o) putFirst(o) offerFirst(o, timeout, timeunit)
移除 removeFirst(o) pollFirst(o) takeFirst(o) pollFirst(timeout, timeunit)
检查 getFirst(o) peekFirst(o)

四组不同的行为方式解释:

  • 抛异常: 如果试图的操作无法立即执行,抛一个异常。
  • 特定值: 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时: 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

    其他队列

    | 名称 | 特点 | | :—-: | :—-: | | 延迟队列 DelayQueue | 内部的元素可以一个过期时间,到期后自动出队 | | 链阻塞队列 LinkedBlockingQueue | 底层使用链表结构的队列 | | 优先级阻塞队列PriorityBlockingQueue | 可手动排序的队列 | | 同步队列 SynchronousQueue | 内部同时只能够容纳单个元素的队列 |