8.1 并发容器概览

  • ConcurrentHashMap :线程安全的HashMap
  • CopyOnWriteArrayList :线程安全的List
  • BlockingQueue :这是一个接口,表示阻塞队列,非常适合用于作为数据共享的通道
  • ConcurrentLinkedQueue :高效的非阻塞并发队列,使用链表实现。可以看做一个线程安全的LinkedList
  • ConcurrentSkipListMap :是一个Map ,使用跳表的数据结构进行快速查找

    8.2 Vector和Hashtable

    Vector、Hashtable是使用synchronized修饰的,这两个容器类是过时的古老的,性能比较差。

    8.3 同步的HashMap和ArrayList

  • 虽然这两个类不是线程安全的,但是可以用Collections.synchronizedList(new ArrayList ())和Collections.synchronizedMap(new HashMap ()使之变成线程安全的

  • 分析源码

其实就是对ArrayList和HashMap对象进行包装,有点类似于装饰器模式。
ConcurrentHashMapFCopyOnWriteArrayList

  • 取代同步的HashMap和同步的ArrayList (时代巨轮滚滚向前)
  • 绝大多数并发情况下, ConcurrentHashMap和CopyOnWriteArrayListI的性能都更好

8.4 ConcurrentHashMap概览

1.磨刀不误砍柴工: Map简介
2.为什么需要ConcurrentHashMap
3.九层之台,起于累土、罗马不是一天建成的: HashMap的分析
4. JDK1.7JConcurrentHashMap实现和分析
5. JDK1.8JConcurrentHashMap实现和源码分析
6.对比JDK1.7和1.8的优缺点 ,为什么要把1.7的结构改成1.8的结构?
7.组合操作: ConcurrentHashMap也不是线程安全的?
8.实际生产案例分享

8.5 Map接口的实现

  • HashMap
  • Hashtable
  • LinkedHashMap
  • TreeMap

Map的继承体系

image.png

以上四种实现,都要求保证key是不可变对象,hash值不变。

image.png

代码演示Map基本用法:

8.6 调整JDK版本

为什么需要ConcurrentHashMap?

  • 为什么不用Collections.synchronizedMap?
  • 为什么HashMap是线程不安全的?

为什么HashMap是线程不安全的?

  • 同时put碰撞导致数据丢失
  • 同时put扩容导致数据丢失
  • 死循环造成CPU100%(JDK1.7时,才会出现,本质上是因为扩容的时候,形成了循环链表,其实是把HashMap多线程操作了,本身没什么意义)

死循环造成的CPU100%
彩蛋:调试技巧——如何修改JDK版本,从8到7
彩蛋:调试技巧——多线程配合,模拟真实场景

调整jdk的方法:一个是project,一个是moudule的dependences选项卡设置。
image.png

image.png

8.7 经典问题分析

演示HashMap JDK7 CPU 百分之百,本质上是因为多线程操作HashMap扩容时候产生循环链表,Oracle认为这不是JDK得问题,因为错误的使用了HashMap。

8.8 线程独立调试

可以通过断点进行选择,做多线程调试。
image.png

两个线程同时在这里断点,然后一起去扩容,就会发生堆溢出。

image.png

这个问题只会在JDK1.7出现,CoolShell做过相关的分析。

8.9 HashMap具体问题分析

具体流程分析(仅供有兴趣的同学参考,若无兴趣则可以跳过,因为实际意义不大):
https://coolshell.cn/articles/9606.html
https://www.jianshu.com/p/1e9cf0ac07f4
https://www.jianshu.com/p/619a8efcf589
https://blog.csdn.net/loveliness_peri/article/details/81092360
https://cloud.tencent.com/developer/article/1120823
https://www.cnblogs.com/developer_chan/p/10450908.html

8.10 HashMap 结构图和特点

8.10.1 JDK1.7结构

image.png

8.10.2 JDK1.8结构

image.png

image.png

8.10.3 红黑树

红黑树特性

  • 对二叉查找树BST的一种平衡策略,O(logN) vs O(N)
  • 会自动平衡,防止极端不平衡从而影响查找效率的情况发生。

二叉树结构图

image.png

  • 每个节点要么是红色,要么是黑色,但根节点永远是黑色的
  • 红色节点不能连续(也即是,红色节点的孩子和父亲都不能是红色)
  • 从任一节点到其子树中每个叶子节点的路径都包含相同数量黑色节点
  • 所有的叶节点都是黑色的

8.10.4 HashMap关于并发的特点

1、非线程安全
2、迭代时不允许修改内容
3、只读的并发是安全的
4、如果一定要把HashMap用在并发环境,用Collections.synchronizedMap(new HashMap())

8.11 CurrentHashMap相关结构图

8.11.1 JDK1.7结构

image.png

image.png

  • Java 7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,仍然是数组和链表组成的拉链法
  • 每个segment独立上ReentrantLock锁,每个segment之间互不影响,提高了并发效率
  • ConcurrentHashMap默认有16个Segments (也就是一个segments可以负责多个桶节点),所以最多可以同时支持16个线程并发写(操作分别分布在不同的Segment上)。这个默认值可以在初始化的时候设置为其他值,但是一旦初始化以后,是不可以扩容的

8.11.1 JDK1.8 结构

红黑树将查找时间复杂度从O(n)提升到O(logn)
image.png

8.12 ConcurrentHashMap 源码分析

8.12 .1 JDK1.7源码

Segment继承自ReentrantLock,每个Segment上锁。
image.png

8.12.2 JDK1.8 源码解析

JDK8使用了CAS+synchronized,可以针对Node进行加锁,如果经过hash计算找到元素应该存放的位置,如果桶上的位置为空,则使用CAS进行比较替换,如果处于MOVED状态,则帮助进行扩容,否则说明要保存到链表或者红黑树上,则将当前桶的节点通过synchronized进行锁定,之后继续变量链表或者红黑树,对key值进行比对,如果存在key就将value进行替换,如果不存在,则创建一个新的节点保存到链表或红黑树种。
image.png
当桶节点下元素多余8个,并且容器容量大于64,则将链表转成红黑树
image.png

putVal流程

  • 判断key value不为空
  • 计算hash值
  • 根据对应位置节点的类型,来赋值,或者helpTransfer ,或者增长链表,或者给红黑树增加节点
  • 检查满足阈值就“红黑树化”
  • 返回oldVal

get流程

  • 计算hash值
  • 找到对应的位置,根据情况进行:
  • 直接取值
  • 红黑树里找值
  • 遍历链表取值
  • 返回找到的结果

8.13不同版本的对比

8.13.1 数据结构的优化

原来分段锁只有16个,初始化之后不能扩容,一个分段锁可以控制桶上的几个节点,而1.8之后,每个桶上的Node节点都能上锁,锁的粒度更小,性能更高。

8.13.2 Hash碰撞

拉链法变成链表和红黑树,查找效率更高

8.13.3 保证并发安全

Segment锁和CAS+synchronized,都能保证线程安全,但CAS+synchronized性能更好。

8.13.4 查询复杂度

红黑树查询效率更高。

8.13.5 为什么超过8要转为红黑树呢?

因为红黑树每个节点占用的空间是链表的两倍,做出了概率计算,当随着元素增加,数组不断扩容,当hash碰撞达到8次,形成8个链表节点,概率千万分之一,这个在源码注释当中有体现,所以超过8转为红黑树。绝大多数情况下,都不会出现红黑树。

8.14 组合操作的问题

ConcurrentHashMap也不是线程安全的?

  1. public class OptionsNotSafe implements Runnable {
  2. private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();
  3. public static void main(String[] args) throws InterruptedException {
  4. scores.put("小明", 0);
  5. Thread t1 = new Thread(new OptionsNotSafe());
  6. Thread t2 = new Thread(new OptionsNotSafe());
  7. t1.start();
  8. t2.start();
  9. t1.join();
  10. t2.join();
  11. System.out.println(scores.get("小明"));
  12. }
  13. @Override
  14. public void run() {
  15. for (int i = 0; i < 1000; i++) {
  16. Integer score = scores.get("小明");
  17. Integer newScore = score + 1;
  18. scores.put("小明", newScore);
  19. }
  20. }
  21. }

执行结果并不是期待的1123,因为ConcurrentHashMap解决的容器本身的线程安全问题,保证多线程put value不会出现覆盖等线程安全问题。
image.png

解决这个问题,可以加synchronized,但是这样做不好,因为ConcurrentHash好不容易保证了线程安全,可以使用replace方法,这个方法有三个参数,可以返回是否替换成功。

  1. public class OptionsNotSafe implements Runnable {
  2. private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();
  3. public static void main(String[] args) throws InterruptedException {
  4. scores.put("小明", 0);
  5. Thread t1 = new Thread(new OptionsNotSafe());
  6. Thread t2 = new Thread(new OptionsNotSafe());
  7. t1.start();
  8. t2.start();
  9. t1.join();
  10. t2.join();
  11. System.out.println(scores.get("小明"));
  12. }
  13. @Override
  14. public void run() {
  15. for (int i = 0; i < 1000; i++) {
  16. Integer score = scores.get("小明");
  17. while (!scores.replace("小明", score, ++score)) {
  18. score = scores.get("小明");
  19. }
  20. }
  21. }
  22. }

正确的输出了2000
image.png

还有一个比较常用的组合操作方法,putIfAbsent,如果不存在就设置进去,存在就返回当前值

if (!scores.containsKey("小明")) {
    scores.put("小明", 100);
} else {
    return scores.get("小明");
}

8.15 实际生产案例

在平时开发中,要时刻注意线程安全问题,如下一个场景,需要司机根据一些题目进行回答,才能取得营业资质,将这些题目信息都保存在JVM内存中,保存在一个线程不安全的一个map中,返回给司机需要对题目进行乱序,当多个司机多个请求多个线程去处理乱序,发现返回给司机的问题有重复的,可以用ConcurrentHashMap来解决这个问题。

8.16 CopyOnWriteArrayList的读写规则

8.16.1 诞生原因

  • 代替Vector和SynchronizedList ,就和ConcurrentHashMap代替SynchronizedMap的原因一样
  • Vector和SynchronizedList的锁的粒度太大,并发效率相对比较低并且迭代时无法编
  • Copy-On-Write并发容器还包括CopyOnWriteArraySet ,用来替代同步Set

8.16.2 适用场景

读操作可以尽可能地快,而写即使慢一些也没有太大关系
读多写少:黑名单,每日更新;监听器:迭代操作远多余修改操作.

8.16.3 读写规则

回顾读写锁:读读共享、其 他都互斥(写写互斥、读写互斥、写读互斥)
读写锁规则的升级:读取是完全不用加锁的,并且更厉害的是,写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。

它可以在迭代过程中,去修改这个容器的内容,ArrayList是做不到的。

8.17 CopyOnWriteArrayList数据过期问题

  • CopyOnWrite的含义
  • 创建新副本、读写分离
  • “不可变” 原理
  • 迭代的时候

8.18 CopyOnWriteArrayList缺点分析

8.18.1 存在问题

数据一致性问题 : CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的数据,马上能读到,请不要使用CopyOnWrite容器。
内存占用问题:因为CopyOnWrite的写是复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存。

8.18.2 源码分析

首先ArrayList是不支持在迭代的过程当中,进行元素的修改,会抛出并发修改异常——java.util.ConcurrentModificationException,如下方代码

public static void main(String[] args) {
    List<String> names = new ArrayList<>();
    names.add("wzy");
    names.add("lisi");
    names.add("zhaoliu");
    Iterator<String> namesIterator = names.iterator();
    while (namesIterator.hasNext()) {
        System.out.println(namesIterator.next());
        names.add("bob");
    }
}

执行结果
image.png
分析源码查看原因,在创建迭代器对象的时候就记录了ArrayList的修改次数。
image.png
next方法被调用的时候,就回去检查这个修改次数。
image.png
检查当前ArrayList的修改次数是否和创建迭代器时的修改次数相等,因为每次做修改操作的时候,ArrayList的修改次数都会改变,如果不相等,则抛出异常。
image.png

接下来再看CopyOnWriteArrayList,同样的代码逻辑。

public static void main(String[] args) {
    List<String> names = new CopyOnWriteArrayList<>();
    names.add("wzy");
    names.add("lisi");
    names.add("zhaoliu");
    Iterator<String> namesIterator = names.iterator();
    while (namesIterator.hasNext()) {
        System.out.println(namesIterator.next());
        names.add("bob");
    }
    System.out.println("最终集合内容:" + names);
}

执行结果,并没有抛出异常,也可以成功的向集合里面添加元素 ,但是在遍历元素的时候,却没办法将新添加的bob元素输出,可以看到最终是显示添加成功的,迭代器无法获取到最新的元素。
image.png

导致这种问题出现的原因,正是CopyOnWriteArrayList的写时复制机制导致的,下面通过分析源码来进行解决。

正如CopyOnWriteArrayList的名字,本质底层会基于数组的。
image.png
内部还创建了ReentrantLock锁。
image.png
先来查看add方法,由于是写方法,通过lock进行加锁,将当前数组元素拷贝出来一份,在这个数组上进行修改,修改完成之后,将新的数组赋值给成员变量,这样就完成了,写时复制机制。
image.png
通过get方法去获取,获取指定位置元素,继续调用getArray()方法。
image.png
直接通过数组的索引去过去,不加锁,因为每次修改都是在副本上修改,直接返回当前数组的数据即可。
image.png

至于为什么迭代器为什么读到滞后的数据,CopyOnWriteArrayList内部实现了COWIterator迭代器,在创建迭代器的时候,就把当前数组对象传递到对象内部作为成员变量。
image.png
相当于拿到的是数组的快照,迭代器在迭代的过程中,数组元素变化,迭代器是感知不到的,当然迭代过程中修改也不会报错。
image.png

8.19 并发队列介绍

8.19.1 为什么要使用队列

  • 用队列可以在线程间传递数据:生产者消费者模式、银行转账
  • 考虑锁等线程安全问题的重任从“你”转移到了“队列” 上

8.19.2 并发队列简介

  • Queue
  • BlockingQueue

各并发队列关系图
image.png

8.20 绘制漂亮的UML图

通过IDEA可以生成UML图。
image.png
效果
image.png
还可以添加类进来,Add Class to Diagram。
image.png
比如添加了ArraryBlockingQueue,就可以看到他们之间的关系。
image.png

8.21 阻塞队列BlockingQueue

8.21.1 什么是阻塞队列

  • 阻塞队列是具有阻塞功能的队列,所以它首先是一个队列,其次是具有阻塞功能。
  • 通常,阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用。阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的

阻塞队列示意图,是一个典型的生产消费者模型。

image.png

阻塞功能:最有特色的两个带有阻塞功能的方法是

  • take()方法:获取并移除队列的头结点,一旦如果执行take的时候,队列里无数据,则阻塞,直到队列里有数据
  • put()方法:插入元素。但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间

take和put原理图
image.png

image.png

  • 是否有界(容量有多大) :这是一个非常重要的属性,无界队列意味着里面可以容纳非常多( Integer.MAX_ _VALUE ,约为2的31次,是非常大的一个数,可以近似认为是无限容量)
  • 阻塞队列和线程池的关系:阻塞队列是线程池的重要组成部分

BlockingQueue主要方法

  • put,take(会阻塞住)
  • add,remove, element(失败直接抛异常)
  • offer,poll,peek(会返回true,false,poll取出会删除,peek不会删除)

    8.22 代码演示,源码分析

    使用案例:有10个面试者, 一共只有1个面试官,大厅里有3个位子供面
    试者休息,每个人的面试时间是10秒,模拟所有人面试的场景

8.23其他BlockingQueue类型介绍

8.23.1 LinkedBlockingQueue

  • 无界
  • 容量Integer.MAX _VALUE
  • 内部结构: Node、两把锁。分析put方法

8.23.2 PriorityBlockingQueue

  • 支持优先级
  • 自然顺序(而不是先进先出)
  • 无界队列
  • PriorityQueue的线程安全版本

8.23.3 SynchronousQueue

  • 它的容量为0
  • 需要注意的是 , SynchronousQueue的容量不是1而是0 ,因为SynchronousQueue不需要去持有元素,它所做的就是直接传递( direct handoff )
  • 效率很高

原理图
image.png

  • SynchronousQueue注意点
  • SynchronousQueue没有 peek等函数,因为peek的含义是取出头结点,但SynchronousQueue的容量是0 ,所以连头结点都没有,也就没有peek方法。同理,没有iterate相关方法
  • 是一个极好的用来直接传递的并发数据结构
  • SynchronousQyeue是线程池Executors.newCachedThreadPool()使用的阻塞队列

    8.23.4 DelayQueue

  • 延迟队列,根据延迟时间排序

  • 元素需 要实现Delayed接口,规定排序规则

    8.23.5 非阻塞并发队列

  • 并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种顾名思义ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法来实现线程安全(不具备阻塞功能) , 适合用在对性能要求较高的并发场景。用的相对比较少一些

  • 看源码的offer方法的CAS思想,内有p.casNext方法,用了UNSAFE.compareAndSwapObject

如何选择适合自己的队列?

  • 边界
  • 空间
  • 吞吐量