• JUC(java.util.concurrent)
    • 进程和线程
      • 进程:后台运行的程序(我们打开的一个软件,就是进程)
      • 线程:轻量级的进程,并且一个进程包含多个线程(同在一个软件内,同时运行窗口,就是线程)
    • 并发和并行
      • 并发:同时访问某个东西,就是并发
      • 并行:一起做某些事情,就是并行
  • JUC下的三个包
    • java.util.concurrent
      • java.util.concurrent.atomic
      • java.util.concurrent.locks

        JUC多线程以及并发包

        1. 谈谈你对volatile的理解

        volatile是java虚拟机提供的轻量级的同步机制;volatile主要有以下三大特性:保证可见性、不保证原子性、禁止指令重排序;

        1.1 JMM

        JMM(Java Memory Model,Java内存模型),本身是一种抽象的概念 并不真实存在,它描述的是一组规则或规范。通过规范定制了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。JMM有三大特性:可见性、原子性、有序性
        JMM关于同步规定:
  1. 线程解锁前,必须把共享变量的值刷新回主内存
  2. 线程加锁前,必须读取主内存的最新值到自己的工作内存
  3. 加锁解锁是同一把锁

由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方成为栈空间),工作内存是每个线程的私有数据区域,而Java内存模型中规定所有变量都存储在主内存,主内存是共享内存区域,所有线程都可访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到自己的工作空间,然后对变量进行操作,操作完成再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存储存着主内存中的变量副本拷贝,因此不同的线程无法访问对方的工作内存,因此线程间的通讯(传值) 必须通过主内存来完成,其简要访问过程如下图:
image.png
数据传输速率:硬盘 < 内存 < < cache < CPU
上面提到了两个概念:主内存 和 工作内存

  • 主内存:就是计算机的内存,也就是经常提到的8G内存,16G内存
  • 工作内存:但我们实例化 new student,那么 age = 25 也是存储在主内存中
    • 当同时有三个线程同时访问 student中的age变量时,那么每个线程都会拷贝一份,到各自的工作内存,从而实现了变量的拷贝

image.png

缓存一致性是什么?

为什么这里主线程中某个值被更改后,其它线程能马上知晓呢?其实这里是用到了总线嗅探技术
在说嗅探技术之前,首先谈谈缓存一致性的问题,就是当多个处理器运算任务都涉及到同一块主内存区域的时候,将可能导致各自的缓存数据不一。为了解决缓存一致性的问题,需要各个处理器访问缓存时都遵循一些协议,在读写时要根据协议进行操作,这类协议主要有MSI、MESI等等。

MESI:

当CPU写数据时,如果发现操作的变量是共享变量,即在其它CPU中也存在该变量的副本,会发出信号通知其它CPU将该内存变量的缓存行设置为无效,因此当其它CPU读取这个变量的时,发现自己缓存该变量的缓存行是无效的,那么它就会从内存中重新读取。

总线探嗅技术

那么如何发现读取的数据缓存是否失效呢?这里是用到了总线嗅探技术,就是每个处理器通过嗅探在总线上传播的数据来检查自己缓存值是否过期了,当处理器发现自己的缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置为无效状态,当处理器对这个数据进行修改操作的时候,会重新从内存中把数据读取到处理器缓存中。

总线风暴

总线嗅探技术有哪些缺点?
由于Volatile的MESI缓存一致性协议,需要不断的从主内存嗅探和CAS循环,无效的交互会导致总线带宽达到峰值。因此不要大量使用volatile关键字,至于什么时候使用volatile、什么时候用锁以及Syschonized都是需要根据实际场景的。

1.2 保证可见性

可见性是指当一个线程在本地修改了主内存中的共享变量并写回主内存后,需要通知其他线程主内存中的共享变量已经被修改,并重新读取。
写个例子证明可见性:

  1. class MyNumber {
  2. public volatile Integer number = 0; // 如果不加volatile 那么写回主内存的时候其他线程是不可见的
  3. public void addTo100() {
  4. this.number = 100;
  5. }
  6. }
  7. public class VolatileDemo {
  8. public static void main(String[] args) {
  9. MyNumber mynumber = new MyNumber();
  10. new Thread(() -> {
  11. System.out.println(Thread.currentThread().getName() + "\t come in");
  12. try {
  13. TimeUnit.SECONDS.sleep(2);
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. mynumber.addTo100();
  18. System.out.println(Thread.currentThread().getName() + "\t 修改了number = " + mynumber.number);
  19. }, "Thread A").start();
  20. // 第二个线程是我们的main线程
  21. while (mynumber.number == 0) {
  22. // 如果number等于0,那么main线程就一直在这里等待
  23. }
  24. System.out.println(Thread.currentThread().getName() + "\t mission is over" + mynumber.number);
  25. }
  26. }

1.3 原子性

原子性表示什么意思?——不可分割,完整性,也就是说线程正在做某个具体业务室,中间不可以被加塞或分割。需要整体完整,要么同时成功,要么同时失败。
案例演示volatile不保证原子性:

  1. class MyNumber {
  2. public volatile Integer number = 0; // 如果不加volatile 那么写回主内存的时候其他线程是不可见的
  3. public void addTo100() {
  4. this.number = 100;
  5. }
  6. // 请注意,此时number加了volatile关键字修饰
  7. public void addPlusPlus() {
  8. number++;
  9. }
  10. }
  11. /**
  12. * 验证volatile的可见性
  13. * 如果int number = 0 没有添加volatile关键字,则没有可见性
  14. *
  15. * 验证volatile不保证原子性
  16. *
  17. */
  18. public class VolatileDemo {
  19. public static void main(String[] args) {
  20. MyNumber mynumber = new MyNumber();
  21. for (int i = 0; i < 20; i++) {
  22. new Thread(() -> {
  23. for (int j = 0; j < 1000; j++) {
  24. mynumber.addPlusPlus();
  25. }
  26. }, "Thread" + i).start();
  27. }
  28. // 需要等待上面20个线程全部计算完成后,再用main线程获取number的最终值
  29. // Thread.activeCount() 可以统计当前活跃线程数
  30. while (Thread.activeCount() > 2) {
  31. Thread.yield();
  32. }
  33. System.out.println(Thread.currentThread().getName() + "\t finally number value" + mynumber.number);
  34. }
  35. }

为什么volatile不保证原子性?

在多线程竞争的情况下,某个线程修改了本地变量值写入主内存之前可能会被挂起,同时另外一个线程将修改写入主内存(还没来得及通知其他线程时)当前线程紧接着也将自己的修改写入主内存,导致了数据的丢失。
因为有很多值在putfield这步写回去的时候可能线程的调度被挂起了,刚好也没有收到最新值的通知,有这么一个纳秒级别的时间差,一写就出现了写覆盖,就把人家的值覆盖掉了
image.png

如何解决原子性问题?

  1. 可以加synchronized来解决不保证原子性问题,但是不推荐使用;
  2. 可以使用java.util.concurrent.atomic包下的AtomicInteger(带原子性包装的整型类)来解决不保证原子性问题;(其通过CAS来保证原子性) ```java class MyNumber { public volatile Integer number = 0; // 如果不加volatile 那么写回主内存的时候其他线程是不可见的 public void addTo100() {

    1. this.number = 100;

    }

    // 请注意,此时number加了volatile关键字修饰 public void addPlusPlus() {

    1. number++;

    }

    // 通过原子性包装类来解决原子性问题 AtomicInteger atomicInteger = new AtomicInteger(); public void addMyAtomic() {

    1. atomicInteger.getAndIncrement();

    }

}

/**

  • 验证volatile的可见性
  • 如果int number = 0 没有添加volatile关键字,则没有可见性 *
  • 验证volatile不保证原子性
  • why-为什么不保证原子性 / public class VolatileDemo { public static void main(String[] args) {

    1. MyNumber mynumber = new MyNumber();
    2. for (int i = 0; i < 20; i++) {
    3. new Thread(() -> {
    4. for (int j = 0; j < 1000; j++) {
    5. mynumber.addPlusPlus();
    6. mynumber.addMyAtomic();
    7. }
    8. }, "Thread" + i).start();
    9. }
    10. // 需要等待上面20个线程全部计算完成后,再用main线程获取number的最终值
    11. // Thread.activeCount() 可以统计当前活跃线程数
    12. while (Thread.activeCount() > 2) {
    13. Thread.yield();
    14. }
    15. System.out.println(Thread.currentThread().getName() + "\t finally number value" + mynumber.number);
    16. System.out.println(Thread.currentThread().getName() + "\t finally atomicInteger value" + mynumber.atomicInteger);

    } } ```

    1.4 有序性

    指令重排序

    计算机在执行程序时,为了提高性能,编译器和处理器常常会做指令重排,一般分为以下3种:
    image.png
    通俗解释——先把会做的题目做了,再做不会做的题目(代码执行顺序跟写的顺序不同)
    单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致。(还是会重排,但不影响结果)
    处理器在进行重新排序是必须要考虑指令之间数据依赖性
    多线程环境中线程交替执行,由于编译器优化重排的存在,两个线程使用的变量能否保持一致性是无法确定的,结果无法预测。 ```java 案例1: public void mySort(){ int x=11;//语句1 int y=12;//语句2 x=x+5;//语句3 y=x*x;//语句4 } 实际执行顺序可能为: 1234 2134 1324 问题:请问语句4 可以重排后变成第一条码? 存在数据的依赖性 没办法排到第一个。

案例2: int a, b, x, y = 0; 线程1进行操作: x = a, b = 1; 线程2进行操作: y = b, a = 2; 最终结果 x = 0, y = 0 如果编译器对这段代码进行执行重排优化后,可能出现下列情况: 线程1:b = 1, x = a; 线程2:a = 2, y = b; 最终结果 x = 2, y = 1 这也就说明在多线程环境下,由于编译器优化重排的存在,两个线程使用的变量能否保持一致是无法确定的.

案例3: public class ReSortSeqDemo { int a = 0; boolean flag = true; public void method1() { a = 1; // 语句1 flag = true; // 语句2 } public void method2() { if (flag) { a = a + 5; // 语句3 System.out.println(“**retValue:” + a); } } } 多线程分别调用method1和method2的情况下,method1指令重排可能会导致语句2先执行, 此时其他的线程执行method2,得到的结果为5,而实际上我们想要的结果是6,因此需要静止指令重排

  1. <a name="oLz3M"></a>
  2. #### 小结
  3. volatile实现禁止指令重排优化,从而避免多线程环境下程序出现乱序执行的现象<br />先了解一个概念,内存屏障(Memory Barrier)又称内存栅栏,是一个CPU指令,它的作用有两个:
  4. - 一是保证特定操作的执行顺序。
  5. - 二是保证某些变量的内存可见性(利用该特性实现volatile的内存可见性)。
  6. 由于编译器和处理器都能执行指令重排优化。如果在指令间插入一条Memory Barrier则会告诉编译器和CPU,不管什么指令都不能和这条Memory Barrier指令重排序,也就是说通过插入内存屏障禁止在内存屏障前后的指令执行重排序优化。内存屏障另外一个作用是强制刷出各种CPU的缓存数据,因此任何CPU上的线程都能读取到这些数据的最新版本。**内存屏障就是一段与平台相关的代码,Java中的内存屏障代码都在Unsafe类中定义,共包含三个方法:LoadFence()、storeFence()、fullFence()。**<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/22423156/1649424375640-1e2aab2e-2480-4cfa-a8b3-fd0cf7851ffb.png#clientId=u9bdb8641-81b7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=485&id=u9ae7aa07&name=image.png&originHeight=485&originWidth=1214&originalType=binary&ratio=1&rotation=0&showTitle=false&size=267304&status=done&style=none&taskId=ud5f90562-f790-4678-925b-097829a38fd&title=&width=1214)
  7. <a name="eOcgV"></a>
  8. ### 线程安全性得到满足
  9. 工作内存与主内存同步延迟现象导致的可见性问题<br />可以使用synchronized或volatile关键字解决,它们都可以使一个线程修改后的变量立即对其他线程可见。<br />对于指令重排导致的可见性问题和有序性问题<br />可以利用volatile关键字解决,因为volatile的另外一个作用就是禁止重排序优化。
  10. <a name="Tx1rl"></a>
  11. ### 1.5 你在哪些地方使用到了volatile
  12. <a name="rnpKL"></a>
  13. #### 单例模式DCL(Double Check Lock)分析
  14. ```java
  15. public class SingletonDemo {
  16. // private static SingletonDemo instance = null;
  17. // 如果在高并发多线程的版本里面,那么此时的单例模式最终的写法就是volatile + DCL
  18. private static volatile SingletonDemo instance = null;
  19. private SingletonDemo() {
  20. System.out.println(Thread.currentThread().getName() + "\t 构造方法");
  21. }
  22. // DCL双捡锁机制
  23. public static SingletonDemo getInstance() {
  24. if (instance == null) {
  25. synchronized (SingletonDemo.class) {
  26. if (instance == null) instance = new SingletonDemo();
  27. }
  28. }
  29. return instance;
  30. }
  31. public static void main(String[] args) {
  32. for (int i = 0; i < 10; i++) {
  33. new Thread(() -> {
  34. SingletonDemo.getInstance();
  35. }, "Thread" + i).start();
  36. }
  37. }
  38. }

DCL机制不一定线程安全,原因是有指令重排序的存在,加入volatile可禁止指令重排序。
原因在于某一个线程在执行到第一次检测,读取到的instance不为null时,instance的引用对象可能没有完成初始化.
instance=new SingletonDem(); 可以分为以下步骤(伪代码)

memory=allocate();//1.分配对象内存空间
instance(memory);//2.初始化对象
instance=memory;//3.设置instance的指向刚分配的内存地址,此时instance!=null

步骤2和步骤3不存在数据依赖关系.而且无论重排前还是重排后程序执行的结果在单线程中并没有改变,因此这种重排优化是允许的.
memory=allocate();//1.分配对象内存空间
instance=memory;//3.设置instance的指向刚分配的内存地址,此时instance!=null 但对象还没有初始化完.
instance(memory);//2.初始化对象
但是指令重排只会保证串行语义的执行一致性(单线程) 并不会关心多线程间的语义一致性,所以当一条线程访问instance不为null时,由于instance实例未必完成初始化,也就造成了线程安全问题。

2. CAS

2.1 CAS是什么

  • 如果线程的期望值跟物理内存的真实值一样,就更新值到物理内存当中,并返回true;
  • 如果线程的期望值跟物理内存的真实值不一样,返回是false,那么本次修改失败,那么此时需要重新获得主物理内存的新值(自旋)

CAS的全称为Compare-And-Swap ,它是一条CPU并发原语。它的功能是判断内存某个位置的值是否为预期值,如果是则更新为新的值,这个过程是原子的。
CAS并发原语体现在Java语言中就是sun.misc.Unsafe类的各个方法。调用UnSafe类中的CAS方法,JVM会帮我们实现出CAS汇编指令,这是一种完全依赖于硬件的功能,通过它实现了原子操作,再次强调,由于CAS是一种系统原语,原语属于操作系统用于范畴,是由若干条指令组成,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致的问题,也就是说CAS是线程安全的

CASDemo

  1. public class CASDemo {
  2. public static void main(String[] args) {
  3. AtomicInteger atomicInteger = new AtomicInteger(5);
  4. // main do thing ...
  5. System.out.println(atomicInteger.compareAndSet(5, 2019) + "\t current data:" + atomicInteger.get());
  6. System.out.println(atomicInteger.compareAndSet(5, 1024) + "\t current data:" + atomicInteger.get());
  7. }
  8. }

image.png

2.2 CAS底层原理?Unsafe的理解?

CAS底层原理=自旋锁+Unsafe类
我们先看看atomicInteger.getAndIncrement()方法的源码,其使用的是unsafe类的getAndAddInt方法
image.png

Unsafe类

Unsafe类在jdk中的jre/lib/rt.jar包的sun.misc包下。
image.png
Unsafe是CAS的核心类 由于Java 方法无法直接访问底层 ,需要通过本地(native)方法来访问,UnSafe相当于一个后面,基于该类可以直接操作特额定的内存数据.UnSafe类在于sun.misc包中,其内部方法操作可以向C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于UNSafe类的方法。
注意UnSafe类中所有的方法都是native修饰的,也就是说UnSafe类中的方法都是直接调用操作底层资源执行响应的任务
为什么Atomic修饰的包装类,能够保证原子性,依靠的就是底层的unsafe类
变量ValueOffset,便是该变量在内存中的偏移地址,因为UnSafe就是根据内存偏移地址获取数据的;

详解CAS

变量value用volatile修饰,保证了多线程之间的可见性。
image.png
var5:就是我们从主内存中拷贝到工作内存中的值(每次都要从主内存拿到最新的值到自己的本地内存,然后执行compareAndSwapInt()再和主内存的值进行比较。因为线程不可以直接越过高速缓存,直接操作主内存,所以执行上述方法需要比较一次,再执行加1操作

  • var1 AtomicInteger对象本身.
  • var2 该对象值的引用地址
  • var4 需要变动的数值
  • var5 是用过var1 var2找出内存中真实的值
    • 用该对象当前的值与var5比较
    • 如果相同,更新var5+var4的值并且返回true
    • 如果不同,继续取值然后比较,直到更新完成。

假设线程A和线程B两个线程同时执行getAndAddInt操作(分别在不同的CPU上):

  1. AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据JMM模型,线程A和线程B各自持有一份值为3的value的副本分别到各自的工作内存.
  2. 线程A通过getIntVolatile(var1,var2) 拿到value值3,这时线程A被挂起.
  3. 线程B也通过getIntVolatile(var1,var2) 拿到value值3,此时刚好线程B没有被挂起并执行compareAndSwapInt方法比较内存中的值也是3 成功修改内存的值为4 线程B打完收工 一切OK.
  4. 线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的数值和内存中的数字4不一致,说明该值已经被其他线程抢先一步修改了,那A线程修改失败,只能重新来一遍了.
  5. 线程A重新获取value值,因为变量value是volatile修饰,所以其他线程对他的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt方法进行比较替换,直到成功.

    底层汇编

    Unsafe类中的compareAndSwapInt,是一个本地方法,该方法的实现位于unsafe.cpp中 ```cpp UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv env,jobject unsafe , jobject obj, jlong offset ,jint e, jint x)) UnsafeWrapper(“Unsafe_CompareAndSwapInt”); oop p = JNIHandles::resolve(obj); jint addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic:.cmpxchg(x, addr, e)) == e; UNSAFE_END
  1. - 先想办法拿到变量value在内存中的地址
  2. - 通过Atomic::cmpxchg实现比较替换,其中参数X是即将更新的值,参数e是原内存的值
  3. <a name="HlDWA"></a>
  4. ### 2.3 CAS的缺点
  5. - 循环时间长开销大:上面getAndAddInt方法执行时有个do-while。如果CAS失败会一直进行尝试,如果CAS长时间一直不成功,可能会给CPU带来很大的开销。(怎么解决呢? LongAdder
  6. - 只能保证一个共享变量的原子操作
  7. - 当对一个共享变量执行操作时,我们可以通过循环CAS的方式来保证原子操作
  8. - 但是对于多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候只能用锁来保证原子性
  9. - ABA问题(引入版本号可以解决)
  10. <a name="zDgZ1"></a>
  11. ### 2.4 总结
  12. - CAScompareAndSwap,比较当前工作内存中的值和主物理内存中的值,如果相同则执行规定操作,否者继续比较直到主内存和工作内存的值一致为止
  13. - CAS3个操作数,内存值V,旧的预期值A,要修改的更新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否者什么都不做
  14. <a name="dSchN"></a>
  15. ### 2.5 LongAdder(CAS机制优化)
  16. LongAdderjava8为我们提供的新的类,跟AtomicLong有相同的效果。是对CAS机制的优化,因为非常搞高并发的请求下AtomicLong的性能是不能让人接受的。<br />如下AtomicLong incrementAndGet的代码,虽然AtomicLong使用CAS算法,但是CAS失败后还是通过无限循环的自旋锁不多的尝试,这就是高并发下CAS性能低下的原因所在。源码如下:
  17. ```java
  18. public final long incrementAndGet() {
  19. for (;;) {
  20. long current = get();
  21. long next = current + 1;
  22. if (compareAndSet(current, next))
  23. return next;
  24. }
  25. }

核心思想:将热点数据分离。
比如说它可以将AtomicLong内部的内部核心数据value分离成一个数组,每个线程访问时,通过hash等算法映射到其中一个数字进行计数,而最终的计数结果则为这个数组的求和累加,其中热点数据value会被分离成多个单元的cell,每个cell独自维护内部的值。当前对象的实际值由所有的cell累计合成,这样热点就进行了有效地分离,并提高了并行度。这相当于将AtomicLong的单点的更新压力分担到各个节点上。在低并发的时候通过对base的直接更新,可以保障和AtomicLong的性能基本一致。而在高并发的时候通过分散提高了性能。
image.pngimage.png
在LongAdder的底层实现中,首先有一个base值,刚开始多线程没有发生冲突进行数值累加时,都是对base进行累加的,比如刚开始累加成了base = 5。
接着如果发现并发更新的线程数量过多,在发生竞争的情况下,会有一个Cell数组用于将不同线程的操作离散到不同的节点上去 (会根据需要扩容,最大为CPU核)就会开始施行分段CAS的机制,也就是内部会搞一个Cell数组,每个数组是一个数值分段。
这样便可以让大量的线程分别去对不同Cell内部的value值进行CAS累加操作,这样就把CAS计算压力分散到了不同的Cell分段数值中了!而且他内部实现了自动分段迁移的机制,也就是如果某个Cell的value执行CAS失败了,那么就会自动去找另外一个Cell分段内的value值进行CAS操作。
这样也解决了线程空旋转、自旋不停等待执行CAS操作的问题,让一个线程过来执行CAS时可以尽快的完成这个操作。最后,如果你要从LongAdder中获取当前累加的总值,就会把base值和所有Cell分段数值加起来返回。
https://blog.csdn.net/jiangtianjiao/article/details/103844801
LongAdder维护了要给延迟初始化的原子性更新数组cells和一个基值变量base,数组的大小保持是2的N次方大小(最大为CPU线程数),数组表的下标使用每个线程的hashcode值的掩码表示,数组里面的变量实体是Cell类型。
Cell 类型是Atomic的一个改进,用来减少缓存的争用,对于大多数原子操作字节填充是浪费的,因为原子操作都是无规律的分散在内存中进行的,多个原子性操作彼此之间是没有接触的,但是原子性数组元素彼此相邻存放将能经常共享缓存行,也就是伪共享。所以这在性能上是一个提升。另外由于Cells占用内存是相对比较大的,所以一开始并不创建,而是在需要时候再创建,也就是惰性加载,当一开始没有空间时候,所有的更新都是操作base变量。

3. 原子类AtomicInteger的ABA问题,原子引用更新你知道吗?

从AtomicInteger引出下面的问题:
CAS -> Unsafe -> CAS底层思想 -> ABA -> 原子引用更新 -> 如何规避ABA问题

3.1 什么是ABA问题

假设现在有两个线程,分别是T1 和 T2,然后T1执行某个操作的时间为10秒,T2执行某个时间的操作是2秒,最开始AB两个线程,分别从主内存中获取A值,但是因为B的执行速度更快,他先把A的值改成B,然后在修改成A,然后执行完毕,T1线程在10秒后,执行完毕,判断内存中的值为A,并且和自己预期的值一样,它就认为没有人更改了主内存中的值,就快乐的修改成B,但是实际上 可能中间经历了 ABCDEFA 这个变换,也就是中间的值经历了狸猫换太子。
所以ABA问题就是,在进行获取主内存值的时候,该内存值在我们写入主内存的时候,已经被修改了N次,但是最终又改成原来的值了。(尽管CAS操作成功,但是不代表这个过程是没有问题的

3.2 原子引用

原子引用其实和原子包装类是差不多的概念,就是将一个java类,用原子引用类进行包装起来,那么这个类就具备了原子性

  1. public class AtomicReferenceDemo {
  2. public static void main(String[] args) {
  3. User z3 = new User("z3", 22);
  4. User li4 = new User("li4", 25);
  5. // 创建User的原子引用包装类
  6. AtomicReference<User> atomicReference = new AtomicReference<>();
  7. atomicReference.set(z3); // 现在主物理内存的共享变量,为z3
  8. // 调用原子引用的compareAndSet,期望值是z3,如果没有被修改就更改为li4
  9. System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString());
  10. // 比较并交换,现在主物理内存的值是li4了,但是预期为z3,因此交换失败
  11. System.out.println(atomicReference.compareAndSet(z3, li4) + "\t" + atomicReference.get().toString());
  12. }
  13. }

3.3 原子引用+时间戳(AtomicStampedReference) 解决ABA问题

新增一种机制,也就是修改版本号,类似于时间戳的概念
T1: 100 1 2019 2
T2: 100 1 101 2 100 3
如果T1修改的时候,版本号为2,落后于现在的版本号3,所以要重新获取最新值,这里就提出了一个使用时间戳版本号,来解决ABA问题的思路。那么就引入了AtomicStampedReference 带时间戳的原子引用包装类

AtomicReference模拟ABA问题

我们先写个程序来模拟一下ABA问题:

  1. public class ABADemo { // ABA问题的解决 AtomicStampedReference
  2. // 普通原子引用包装类
  3. static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
  4. public static void main(String[] args) {
  5. new Thread(() -> {
  6. atomicReference.compareAndSet(100, 101);
  7. atomicReference.compareAndSet(101, 100);
  8. }, "Thread 1").start();
  9. new Thread(() -> {
  10. try { // t2 暂停1s,保证t1完成ABA操作
  11. TimeUnit.SECONDS.sleep(1);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. // 把100 改成 101 然后在改成100,也就是ABA
  16. System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get());
  17. }, "Thread 2").start();
  18. }
  19. }

image.png上面输出了true,也就是ABA问题

AtomicStampReference解决ABA问题

时间戳原子引用,来这里应用于版本号的更新,也就是每次更新的时候,需要比较期望值和当前值,以及期望版本号和当前版本号

  1. public class ABADemo {
  2. // 普通原子引用包装类
  3. static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);
  4. // ABA问题的解决 AtomicStampedReference
  5. static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference<>(100, 1);
  6. public static void main(String[] args) {
  7. System.out.println("=============以下是ABA问题的产生=================");
  8. new Thread(() -> {
  9. // 先把100改成101,再改回来 完成ABA
  10. atomicReference.compareAndSet(100, 101);
  11. atomicReference.compareAndSet(101, 100);
  12. }, "Thread 1").start();
  13. new Thread(() -> {
  14. try { // t2 暂停1s,保证t1完成ABA操作
  15. TimeUnit.SECONDS.sleep(1);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. // 把100 改成 101 然后在改成100,也就是ABA
  20. System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get());
  21. }, "Thread 2").start();
  22. try {
  23. TimeUnit.SECONDS.sleep(2);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. System.out.println("=============以下是ABA问题的解决=================");
  28. new Thread(() -> {
  29. int stamp = stampedReference.getStamp(); // 获取初始版本号
  30. System.out.println(Thread.currentThread().getName() + "\t第1次版本号:" + stamp);
  31. // 暂停1s t3 保证t4能够获取到相同的初始版本号
  32. try {
  33. TimeUnit.SECONDS.sleep(1);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. // 先把100改成101,再改回来 完成ABA
  38. stampedReference.compareAndSet(100, 101, stampedReference.getStamp(),stampedReference.getStamp() + 1);
  39. System.out.println(Thread.currentThread().getName() + "\t第2次版本号:" + stampedReference.getStamp());
  40. stampedReference.compareAndSet(101, 100,stampedReference.getStamp(),stampedReference.getStamp() + 1);
  41. System.out.println(Thread.currentThread().getName() + "\t第3次版本号:" + stampedReference.getStamp());
  42. }, "Thread 3").start();
  43. new Thread(() -> {
  44. int stamp = stampedReference.getStamp(); // 获取初始版本号
  45. System.out.println(Thread.currentThread().getName() + "\t第1次版本号:" + stamp);
  46. // 暂停3s t4,保证t3完成一次ABA操作
  47. try {
  48. TimeUnit.SECONDS.sleep(3);
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. boolean res = stampedReference.compareAndSet(100, 2019, stamp,stamp + 1);
  53. System.out.println(Thread.currentThread().getName() + "\tt4修改:" + res + " 当前版本号:" + stampedReference.getStamp());
  54. System.out.println(Thread.currentThread().getName() + "\t当前最新值:" + stampedReference.getReference());
  55. }, "Thread 4").start();
  56. }
  57. }

image.png可以看到修改失败,ABA被解决了。

4. 集合类线程不安全问题

4.1 ArrayList不安全

  1. public class ContainerNotSafeDemo {
  2. public static void main(String[] args) {
  3. ArrayList<String> list = new ArrayList<>(); //底层Object[] elementData初始化为{}.并没创建长度为10的数组
  4. list.add("a"); // 第一次调用add()时,底层才创建了长度10的数组,并将数据123添加到elementData[0]
  5. for (int i = 0; i < 30; i++) {
  6. new Thread(() -> {
  7. list.add(UUID.randomUUID().toString().substring(0, 8));
  8. System.out.println(list);
  9. }, "Thread 1").start();
  10. }
  11. }
  12. }

故障现象:
image.png

解决方案

  1. 我们可以使用线程安全的集合类Vector,其add方法是同步方法(保证了数据一致性,但是访问性能下降);
  2. 使用Collections.synchronizedList(new ArrayList<>()) 创建一个线程安全的List (其实就是在add的时候使用了synchronized同步代码块);
  3. CopyOnWriteArrayList 写时复制ArrayList (多线程建议使用这个,lock加锁+写时复制)

我们可以看看CopyOnWriteArrayList中add方法的源码

  1. private transient volatile Object[] array;
  2. public boolean add(E e) {
  3. final ReentrantLock lock = this.lock;
  4. lock.lock();
  5. try {
  6. Object[] elements = getArray(); // 获取当前List中的所有元素
  7. int len = elements.length;
  8. Object[] newElements = Arrays.copyOf(elements, len + 1); // 拷贝旧元素到新的扩容的数组中
  9. newElements[len] = e; // 添加新的值
  10. setArray(newElements); // 更新
  11. return true;
  12. } finally {
  13. lock.unlock();
  14. }
  15. }

CopyOnWrite容器即写时复制的容器,往一个容器中添加元素的时候,不直接往当前容器Object[]添加,而是现将当前容器Object[]进行Copy,而是复制出一个新的容器Object[] newElements向新容器添加元素,添加之后,再将原容器的引用指向新的容器setArray(newElements);这样做的好处时可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

4.2 Set不安全

  1. public class SetNotSafeDemo {
  2. public static void main(String[] args) {
  3. Set<String> set = new HashSet<>();
  4. for (int i = 1; i <= 30; i++) {
  5. new Thread(() -> {
  6. set.add(UUID.randomUUID().toString().substring(0, 8));
  7. System.out.println(set);
  8. }, String.valueOf(i)).start();
  9. }
  10. }
  11. }

这里说一下HashSet的源码,HashSet其实就是一个HashMap,HashSet的中存的值是HashMap的key,HashMap中的Value是一个固定对象PRESENT
上面这段代码同样会出现java.util.ConcurrentModificationException异常;同样可以使用Collections.synchronizedSet()CopyOnWriteArraySet,其具体原理与List的一致(CopyOnWriteArraySet底层就是一个CopyOnWriteArrayList)。
image.png
image.png

4.3 Map不安全

  1. public class MapNotSafeDemo {
  2. public static void main(String[] args) {
  3. // HashMap<String, String> map = new HashMap<>();
  4. // HashMap<String, String> map1 = (HashMap<String, String>) Collections.synchronizedMap(new HashMap<String, String>());
  5. ConcurrentHashMap<String, String> map = new ConcurrentHashMap<String, String>();
  6. for (int i = 1; i <= 30; i++) {
  7. new Thread(() -> {
  8. map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 8));
  9. System.out.println(map);
  10. }, String.valueOf(i)).start();
  11. }
  12. }
  13. }

同样HashMap也是线程不安全的,可以使用集合工具类Collections.synchronizedMap(new HashMap())ConcurrentHashMap创建线程安全的HashMap;

ConcurrentHashMap底层原理

jdk 1.7 :
数据结构:内部主要是一个Segment数组,而数组的每一项又是一个HashEntry数组,元素都存在HashEntry数组里。因为每次锁定的是Segment对象,也就是整个HashEntry数组,所以又叫分段锁。
image.png
jdk 1.8 :
数据结构:与HashMap一样采用:数组+链表+红黑树
image.png
底层原理则是采用锁链表或者红黑树头结点,相比于HashTable的方法锁,力度更细,是对数组(table)中的桶(链表或者红黑树)的头结点进行锁定,这样锁定,只会影响数组(table)当前下标的数据,不会影响其他下标节点的操作,可以提高读写效率。putVal执行流程:

  1. 判断存储的key、value是否为空,若为空,则抛出异常
  2. 计算key的hash值,随后死循环(该循环可以确保成功插入,当满足适当条件时,会主动终止),判断table表为空或者长度为0,则初始化table表
  3. 根据hash值获取table中该下标对应的节点,如果该节点为空,则根据参数生成新的节点,并以CAS的方式进行更新,并终止死循环。
  4. 如果该节点的hash值是MOVED(-1),表示正在扩容,则辅助对该节点进行转移。
  5. 对数组(table)中的节点,即桶的头结点进行锁定,如果该节点的hash大于等于0,表示此桶是链表,然后对该桶进行遍历(死循环),寻找链表中与put的key的hash值相等,并且key相等的元素,然后进行值的替换,如果到链表尾部都没有符合条件的,就新建一个node,然后插入到该桶的尾部,并终止该循环遍历。
  6. 如果该节点的hash小于0,并且节点类型是TreeBin,则走红黑树的插入方式。
  7. 判断是否达到转化红黑树的阈值,如果达到阈值,则链表转化为红黑树。

    5. 锁

    5.1 公平锁/非公平锁

    公平锁:是指多个线程按照申请锁的顺序来获取锁类似排队打饭 先来后到公平锁,就是很公平,在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第1个,就占有锁,否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己;
    非公平锁:是指在多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取到锁,在高并发的情况下,有可能造成优先级反转或者饥饿现象非公平锁比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式。(并发包ReentrantLock的创建可以指定构造函数的boolean类型来得到公平锁或者非公平锁 默认是非公平锁)
    Java ReentrantLock而言,通过构造函数指定该锁是否是公平锁 默认是非公平锁 非公平锁的优点在于吞吐量比公平锁大,对于synchronized而言 也是一种非公平锁。

    5.2 可重入锁(递归锁)

    可重入锁(也叫做递归锁),指的是同一线程外层函数获得锁之后﹐内层递归函数仍然能获取该锁的代码,在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。也即是说,线程可以进入任何一个它已经拥有的锁所同步着的代码块。(某个类中有两个非静态同步方法,可以在一个同步方法中调用另一个同步方法,也就是说拿到一个锁后,这个锁锁定的所有方法都能调用)
    ReentrantLock/synchronized就是一个典型的可重入锁,可重入锁最大的作用就是避免死锁。

    可重入锁例子

    ```java class Phone implements Runnable { public synchronized void sendSms() throws Exception {

    1. System.out.println(Thread.currentThread().getName() + "\tsendSms");
    2. sendEmail();

    }

    public synchronized void sendEmail() throws Exception {

    1. System.out.println(Thread.currentThread().getName() + "\tsendEmail");

    }

    ReentrantLock lock = new ReentrantLock();

    @Override public void run() {

    1. get();

    }

    private void get() {

    1. lock.lock();
    2. lock.lock(); // 可以重复锁定
    3. try {
    4. System.out.println(Thread.currentThread().getName() + "\tget");
    5. set();
    6. } finally {
    7. lock.unlock(); // 但是一定要解锁
    8. lock.unlock();
    9. }

    }

    private void set() {

    1. lock.lock();
    2. try {
    3. System.out.println(Thread.currentThread().getName() + "\tset");
    4. } finally {
    5. lock.unlock();
    6. }

    } }

/**

  • @author mrlinxi
  • @create 2022-04-10 19:49
  • 可重入锁(也叫做递归锁)
  • 指的是同一先生外层函数获得锁后,内层敌对函数任然能获取该锁的代码
  • 在同一线程外外层方法获取锁的时候,在进入内层方法会自动获取锁
  • 也就是说,线程可以进入任何一个它已经标记的锁所同步的代码块 */ public class ReenterLockDemo { public static void main(String[] args) {

    1. Phone phone = new Phone();
    2. new Thread(() -> {
    3. try {
    4. phone.sendSms();
    5. } catch (Exception e) {
    6. e.printStackTrace();
    7. }
    8. }, "t1").start();
    9. new Thread(() -> {
    10. try {
    11. phone.sendSms();
    12. } catch (Exception e) {
    13. e.printStackTrace();
    14. }
    15. }, "t2").start();
  1. Thread t3 = new Thread(phone);
  2. Thread t4 = new Thread(phone);
  3. t3.start();
  4. t4.start();
  5. }

}

  1. <a name="IkpeW"></a>
  2. ### 5.3 自旋锁
  3. 自旋锁:spinlock,是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。原来提到的比较并交换,底层使用的就是自旋,自旋就是多次尝试,多次访问,不会阻塞的状态就是自旋。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/22423156/1649438297763-913a7019-5369-4e3a-b424-62daaef50a89.png#clientId=u9bdb8641-81b7-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=389&id=Kax7l&name=image.png&originHeight=389&originWidth=700&originalType=binary&ratio=1&rotation=0&showTitle=false&size=38301&status=done&style=none&taskId=u4688f68e-0fae-4e60-91e8-de5b07c5e4f&title=&width=700)<br />优点:循环比较获取直到成功为止,没有类似于wait的阻塞<br />缺点:当不断自旋的线程越来越多的时候,会因为执行while循环不断的消耗CPU资源
  4. <a name="oSnbP"></a>
  5. #### 手写一个自旋锁
  6. 通过CAS操作完成自旋锁,A线程先进来调用myLock方法自己持有锁2秒,B随后进来发现当前有线程持有锁,不是null,所以只能通过自旋等待,直到A释放锁后B随后抢到
  7. ```java
  8. public class SpinLockDemo {
  9. // 现在的泛型装的是Thread,原子引用线程
  10. AtomicReference<Thread> atomicReference = new AtomicReference<>();
  11. public void myLock() {
  12. // 获取当前进来的线程
  13. Thread thread = Thread.currentThread();
  14. System.out.println(Thread.currentThread().getName() + "\t come in ");
  15. // 开始自旋,期望值是null,更新值是当前线程,如果是null,则更新为当前线程,否者自旋
  16. while(!atomicReference.compareAndSet(null, thread)) {
  17. System.out.println(Thread.currentThread().getName() + "\t 自旋");
  18. }
  19. }
  20. public void myUnLock() {
  21. // 获取当前进来的线程
  22. Thread thread = Thread.currentThread();
  23. // 自己用完了后,把atomicReference变成null
  24. atomicReference.compareAndSet(thread, null);
  25. System.out.println(Thread.currentThread().getName() + "\t invoked myUnlock()");
  26. }
  27. public static void main(String[] args) {
  28. SpinLockDemo spinLockDemo = new SpinLockDemo();
  29. new Thread(() -> {
  30. spinLockDemo.myLock();
  31. try {
  32. TimeUnit.SECONDS.sleep(2);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. spinLockDemo.myUnLock();
  37. }, "Thread 1").start();
  38. new Thread(() -> {
  39. spinLockDemo.myLock();
  40. spinLockDemo.myUnLock();
  41. }, "Thread 2").start();
  42. }
  43. }

image.png

5.4 独占锁(写)/共享锁(读)/互斥锁

独占锁:指该锁一次只能被一个线程所持有。对ReentrantLock和Synchronized而言都是独占锁;
共享锁:指该锁可被多个线程所持有。
对ReentrantReadWriteLock其读锁是共享锁,其写锁是独占锁。读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。

读写锁案例

  1. // 资源类
  2. class MyCache {
  3. private volatile HashMap<String, Object> map = new HashMap<>();
  4. ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  5. public void put(String key, Object value) {
  6. readWriteLock.writeLock().lock();
  7. try {
  8. System.out.println(Thread.currentThread().getName() + "\t正在写入" + key);
  9. try {
  10. TimeUnit.MILLISECONDS.sleep(300);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. map.put(key, value);
  15. System.out.println(Thread.currentThread().getName() + "\t正在完成");
  16. } catch (Exception e) {
  17. e.printStackTrace();
  18. } finally {
  19. readWriteLock.writeLock().unlock();
  20. }
  21. }
  22. public void get(String key) {
  23. readWriteLock.readLock().lock();
  24. try {
  25. System.out.println(Thread.currentThread().getName() + "\t正在读取");
  26. try {
  27. TimeUnit.MILLISECONDS.sleep(300);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. Object result = map.get(key);
  32. System.out.println(Thread.currentThread().getName() + "\t读取完成" + result);
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. } finally {
  36. readWriteLock.readLock().unlock();
  37. }
  38. }
  39. }
  40. /**
  41. * @author mrlinxi
  42. * @create 2022-04-10 20:35
  43. * 多个线程同时操作 一个资源类没有任何问题 所以为了满足并发量
  44. * * 读取共享资源应该可以同时进行
  45. * * 但是
  46. * * 如果有一个线程想去写共享资源来 就不应该有其他线程可以对资源进行读或写
  47. * * <p>
  48. * * 小总结:
  49. * * 读 读能共存
  50. * * 读 写不能共存
  51. * * 写 写不能共存
  52. * * 写操作 原子+独占 整个过程必须是一个完成的统一整体 中间不允许被分割 被打断
  53. */
  54. public class ReadWriteLockDemo {
  55. public static void main(String[] args) {
  56. MyCache myCache = new MyCache();
  57. for (int i = 0; i < 5; i++) {
  58. final int tempInt = i;
  59. new Thread(() -> {
  60. myCache.put(Thread.currentThread().getName(), tempInt);
  61. }, "Thread " + i).start();
  62. }
  63. for (int i = 0; i < 5; i++) {
  64. final int tempInt = i;
  65. new Thread(() -> {
  66. myCache.get(Thread.currentThread().getName());
  67. }, "Thread " + i).start();
  68. }
  69. }
  70. }

6. CountDownLatch/CyclicBarrier/Semaphore使用过吗?

6.1 CountDownLatch

让一些线程阻塞直到另外一些完成后才被唤醒,CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,调用线程会被阻塞.其他线程调用countDown方法计数器减1(调用countDown方法时线程不会阻塞),当计数器的值变为0,因调用await方法被阻塞的线程会被唤醒,继续执行。

关门案例

  1. public class CountDownLatchDemo {
  2. public static void main(String[] args) throws InterruptedException {
  3. CountDownLatch countDownLatch = new CountDownLatch(6);
  4. for (int i = 0; i < 6; i++) {
  5. final int tempInt = i;
  6. new Thread(() -> {
  7. System.out.println(Thread.currentThread().getName() + "\t" + tempInt + "走了");
  8. countDownLatch.countDown();
  9. }, "Thread" + i).start();
  10. }
  11. countDownLatch.await();
  12. System.out.println("锁门");
  13. }
  14. }

枚举的使用

  1. @Getter
  2. enum CountryEnum {
  3. ONE(1, "齐"),
  4. TWO(2, "楚"),
  5. THREE(3, "燕"),
  6. FOUR(4, "赵"),
  7. FIVE(5, "魏"),
  8. SIX(6, "韩");
  9. private Integer code;
  10. private String name;
  11. CountryEnum(Integer code, String name) {
  12. this.code = code;
  13. this.name = name;
  14. }
  15. public static CountryEnum forEach_CountryEnum(int index) {
  16. CountryEnum[] countryEnums = CountryEnum.values();
  17. for (CountryEnum countryEnum : countryEnums) {
  18. if (countryEnum.getCode().equals(index)) return countryEnum;
  19. }
  20. return null;
  21. }
  22. }
  23. /**
  24. * @author mrlinxi
  25. * @create 2022-04-10 21:02
  26. */
  27. public class CountDownLatchDemo {
  28. public static void main(String[] args) throws InterruptedException {
  29. CountDownLatch countDownLatch = new CountDownLatch(6);
  30. for (int i = 0; i < 6; i++) {
  31. final int tempInt = i;
  32. new Thread(() -> {
  33. System.out.println(Thread.currentThread().getName() + "\t" + tempInt + "国被灭");
  34. countDownLatch.countDown();
  35. }, CountryEnum.forEach_CountryEnum(i).getName()).start();
  36. }
  37. countDownLatch.await();
  38. System.out.println(Thread.currentThread().getName() + "**********秦国一统天下");
  39. }
  40. }

6.2 CyclicBarrier

CyclicBarrier的字面意思是可循环(Cyclic) 使用的屏障(barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫做同步点)时被阻塞,知道最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法。

收集龙珠案例

  1. public class CyclicBarrierDemo {
  2. public static void main(String[] args) {
  3. CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> System.out.println("召唤神龙"));
  4. for (int i = 0; i < 7; i++) {
  5. final int temp = i + 1;
  6. new Thread(() -> {
  7. System.out.println(Thread.currentThread().getName()+"\t 收集到第"+ temp +"颗龙珠");
  8. try {
  9. cyclicBarrier.await();
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. } catch (BrokenBarrierException e) {
  13. e.printStackTrace();
  14. }
  15. }, "Thread" + i).start();
  16. }
  17. }
  18. }

6.3 Semaphore

信号量的主要用户两个目的,一个是用于多个共享资源的相互排斥使用,另一个用于并发资源数的控制。

抢车位案例

  1. public class SemaphoreDemo {
  2. public static void main(String[] args) {
  3. Semaphore semaphore = new Semaphore(3); // 3个车位
  4. for (int i = 0; i < 6; i++) {
  5. new Thread(() -> {
  6. try {
  7. semaphore.acquire();
  8. System.out.println(Thread.currentThread().getName() + "\t抢到了车位");
  9. try {
  10. TimeUnit.SECONDS.sleep(1);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. System.out.println(Thread.currentThread().getName() + "\t离开了了车位");
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. } finally {
  18. semaphore.release();
  19. }
  20. }, "Thread" + i).start();
  21. }
  22. }
  23. }

7. 阻塞队列

阻塞队列是一个队列,在数据结构中起的作用如下图:
image.png
线程1往阻塞队列里添加元素,线程2从阻塞队列里移除元素
当队列是空的,从队列中获取元素的操作将会被阻塞;
当队列是满的,从队列中添加元素的操作将会被阻塞;
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素;
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增;

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤起
为什么需要BlockingQueue?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了;
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

7.1 种类分析

  • ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为integer.MAX_VALUE)阻塞队列。
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
    • SynchronousQueue没有容量,与其他BlcokingQueue不同,SynchronousQueue是一个不存储元素的BlcokingQueue。每个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
  • LinkedTransferQueue:由链表组成的无界阻塞队列。
  • LinkedBlockingDeque:由链表组成的双向阻塞队列。

image.png

7.2 BlockingQueue的核心方法

image.png

  • 抛出异常:当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full;当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException
  • 特殊值:插入方法,成功ture失败false;移除方法,成功返回出队列的元素,队列里没有就返回null
  • 一直阻塞:当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出;当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出

    7.3 SynchronousQueueDemo

    SynchronousQueue没有容量,与其他BlcokingQueue不同,SynchronousQueue是一个不存储元素的BlcokingQueue。每个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

    1. public class SynchronousQueueDemo {
    2. public static void main(String[] args) {
    3. BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
    4. new Thread(() -> {
    5. try {
    6. System.out.println(Thread.currentThread().getName() + "\t put 1");
    7. blockingQueue.put("1");
    8. System.out.println(Thread.currentThread().getName() + "\t put 2");
    9. blockingQueue.put("2");
    10. System.out.println(Thread.currentThread().getName() + "\t put 3");
    11. blockingQueue.put("3");
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }, "Thread A").start();
    16. new Thread(() -> {
    17. try {
    18. try {
    19. TimeUnit.SECONDS.sleep(5);
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
    24. try {
    25. TimeUnit.SECONDS.sleep(5);
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. }
    29. System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
    30. try {
    31. TimeUnit.SECONDS.sleep(5);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
    36. } catch (InterruptedException e) {
    37. e.printStackTrace();
    38. }
    39. }, "Thread B").start();
    40. }
    41. }

    7.4 阻塞队列用在哪里

    阻塞队列可以用在:生产者消费者模式线程池以及消息中间件中; ```java class ShareData { private int number = 0; private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition();

    public void increment() throws InterruptedException {

    1. lock.lock();
    2. try {
    3. while (number != 0) { // 1. 判断
    4. condition.await();
    5. }
    6. number++; // 2.干活
    7. System.out.println(Thread.currentThread().getName()+"\t"+number);
    8. condition.signalAll(); // 3. 通知
    9. } catch (InterruptedException e) {
    10. e.printStackTrace();
    11. } finally {
    12. lock.unlock();
    13. }

    }

    public void decrement() throws InterruptedException {

    1. lock.lock();
    2. try {
    3. while (number == 0) { // 1. 判断
    4. condition.await();
    5. }
    6. number--; // 2.干活
    7. System.out.println(Thread.currentThread().getName()+"\t"+number);
    8. condition.signalAll(); // 3. 通知
    9. } catch (InterruptedException e) {
    10. e.printStackTrace();
    11. } finally {
    12. lock.unlock();
    13. }

    } }

/**

  • @author mrlinxi
  • @create 2022-04-11 0:08
  • 生产消费者模式传统版:一个初始值为零的变量,两个线程对齐交替操作,一个+1一个-1,来五轮
  • 1 线程 操作 资源类
  • 2 判断 干活 通知唤醒
  • 3 防止虚假唤醒机制 */ public class ProdConsumerTraditionDemo { public static void main(String[] args) {

    1. ShareData data = new ShareData();
    2. new Thread(() -> {
    3. for (int i = 0; i < 5; i++) {
    4. try {
    5. data.increment();
    6. } catch (InterruptedException e) {
    7. e.printStackTrace();
    8. }
    9. }
    10. }, "Thread A").start();
    11. new Thread(() -> {
    12. for (int i = 0; i < 5; i++) {
    13. try {
    14. data.decrement();
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. }
    18. }
    19. }, "Thread B").start();

    } } java class MyResource { private volatile boolean FLAG = true; // 默认开启进行生产+消费 private AtomicInteger atomicInteger = new AtomicInteger();

    BlockingQueue blockingQueue = null;

    public MyResource(BlockingQueue blockingQueue) {

    1. this.blockingQueue = blockingQueue;
    2. System.out.println(blockingQueue.getClass().getName());

    }

    public void myProduct() throws Exception {

    1. String data = null;
    2. boolean retValue;
    3. while (FLAG) {
    4. data = atomicInteger.incrementAndGet() + "";
    5. retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
    6. if (retValue) {
    7. System.out.println(Thread.currentThread().getName() + "\t 插入阻塞队列" + data + "成功");
    8. } else {
    9. System.out.println(Thread.currentThread().getName() + "\t 插入阻塞队列" + data + "失败");
    10. }
    11. try {
    12. TimeUnit.SECONDS.sleep(1);
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. }
    17. System.out.println(Thread.currentThread().getName() + "\t FLAG=false,生产工作结束");

    }

    public void myConsumer() throws Exception {

    1. String result = null;
    2. while (FLAG) {
    3. result = blockingQueue.poll(2L, TimeUnit.SECONDS);
    4. if (result == null || result.equalsIgnoreCase("")) {
    5. FLAG = false;
    6. System.out.println(Thread.currentThread().getName() + "\t 超过2s没有取到蛋糕,消费退出");
    7. break;
    8. }
    9. System.out.println(Thread.currentThread().getName() + "\t 消费阻塞队列" + result + "成功");
    10. }

    }

    public void stop() {

    1. this.FLAG = false;

    } }

/**

  • @author mrlinxi
  • @create 2022-04-11 0:14
  • volatile/CAS/AtomicInteger/BlockQueue/线程交互/原子引用
  • 生产消费者模式阻塞队列版:一个初始值为零的变量,两个线程对齐交替操作,一个+1一个-1,来五轮 */ public class ProdConsumerBlockQueueDemo { public static void main(String[] args) {

    1. MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
    2. new Thread(() -> {
    3. try {
    4. System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
    5. myResource.myProduct();
    6. } catch (Exception e) {
    7. e.printStackTrace();
    8. }
    9. }, "Thread Prod").start();
    10. new Thread(() -> {
    11. try {
    12. System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
    13. myResource.myConsumer();
    14. } catch (Exception e) {
    15. e.printStackTrace();
    16. }
    17. }, "Thread Consumer").start();
    18. try {
    19. TimeUnit.SECONDS.sleep(5);
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. System.out.println("================================");
    24. System.out.println("5s时间到,停止生产");
    25. myResource.stop();

    } } ```

    8. synchronized和Lock的区别?用lock有什么好处?

    | synchronized | lock | | —- | —- | | 1、synchronized是JVM层面,它是JAVA的关键字 | 1、Lock是API层面的具体类,它是java5以后新出的一个类 | | 2、synchronized是不需要手动释放锁,synchronized代码执行完以后,系统会自动让线程释放对锁的占用 | 2、lock就需要手动去释放锁,若没有主动的去释放锁,就可能导致死锁的现象 | | 3、synchronized不能中断,除非是抛出了异常或者是正常执行完成 | 3、lock是可以中断的,主要是设置超时的方法
    - 设置超时方法 tryLock(long timeout, TimeUnit unit)
    - lockInterruptibly()放代码块中,调用interrupt()方法可中断
    | | 4、synchronized是非公平锁 | 4、lock默认是非公平锁,但是也支持公平锁 | | 5、synchronized不支持精确唤醒,只能随机唤醒或者是唤醒全部线程 | 5、lock可支持精确唤醒 |

synchronized在字节码中是monitorenter和monitorexit(monitorexit有两个,一个是正常退出一个是异常退出)

  1. /**
  2. * @author mrlinxi
  3. * @create 2022-04-11 0:32
  4. * 请您说说synchronized和Lock的区别?用lock有什么好处?
  5. * synchronized是JVM层面,它是JAVA的关键字
  6. * synchronized 是不需要手动释放锁,当synchronized代码执行完以后,系统会自动让线程释放对锁的占用
  7. * synchronized不能中断,除非是抛出了异常或者是正常执行完成
  8. * synchronized是非公平锁
  9. * synchronized不支持精确唤醒,只能随机唤醒或者是唤醒全部线程
  10. *
  11. * Lock是API层面的具体类,它是java5以后新出的一个类
  12. * lock就需要手动去释放锁,若没有主动的去释放锁,就可能导致死锁的现象
  13. * lock是可以中断的
  14. * 1. 设置超时方法 tryLock(long timeout, TimeUnit unit)
  15. * 2. lockInterruptibly()放代码块中,调用interrupt()方法可中断
  16. * lock默认是非公平锁,但是也支持公平锁
  17. * lock可支持精确唤醒
  18. *
  19. * 多线程之间按顺序调用,实现A->B->C 三个线程启动
  20. * AA打印5次, BB打印10次,CC打印15次
  21. * 紧接着
  22. * AA打印5次, BB打印10次,CC打印15次
  23. * 来十轮
  24. */
  25. class ShareResource{
  26. private int number = 1;
  27. private Lock lock = new ReentrantLock();
  28. private Condition c1 = lock.newCondition();
  29. private Condition c2 = lock.newCondition();
  30. private Condition c3 = lock.newCondition();
  31. //第一个线程
  32. public void print5(){
  33. lock.lock();
  34. try{
  35. //判断
  36. while (number!=1){ //说明是首次线程进来
  37. c1.await(); //进来等着
  38. }
  39. //2.干活
  40. for (int i = 1; i <=5 ; i++) {
  41. System.out.println(Thread.currentThread().getName()+"\t"+i);
  42. }
  43. //3.通知B线程
  44. number = 2;
  45. c2.signal();//通知b线程
  46. }catch (Exception e){
  47. e.printStackTrace();
  48. }finally {
  49. lock.unlock();
  50. }
  51. }
  52. //第二个线程
  53. public void print10(){
  54. lock.lock();
  55. try{
  56. //判断
  57. while (number!=2){ //说明是首次线程进来
  58. c2.await(); //进来等着
  59. }
  60. //2.干活
  61. for (int i = 1; i <=10 ; i++) {
  62. System.out.println(Thread.currentThread().getName()+"\t"+i);
  63. }
  64. //3.通知B线程
  65. number = 3;
  66. c3.signal();//通知b线程
  67. }catch (Exception e){
  68. e.printStackTrace();
  69. }finally {
  70. lock.unlock();
  71. }
  72. }
  73. //第3个线程
  74. public void print15(){
  75. lock.lock();
  76. try{
  77. //判断
  78. while (number!=3){ //说明是首次线程进来
  79. c3.await(); //进来等着
  80. }
  81. //2.干活
  82. for (int i = 1; i <=15 ; i++) {
  83. System.out.println(Thread.currentThread().getName()+"\t"+i);
  84. }
  85. //3.重新回去通知A线程
  86. number = 1;
  87. c1.signal();//通知A线程
  88. }catch (Exception e){
  89. e.printStackTrace();
  90. }finally {
  91. lock.unlock();
  92. }
  93. }
  94. }
  95. public class SyncAndReentrantLockDemo {
  96. public static void main(String[] args) {
  97. ShareResource shareResource = new ShareResource();
  98. new Thread(() -> {
  99. for (int i = 1; i <=10 ; i++) {
  100. shareResource.print5();
  101. }
  102. },"A").start();
  103. new Thread(() -> {
  104. for (int i = 1; i <=10 ; i++) {
  105. shareResource.print10();
  106. }
  107. },"B").start();
  108. new Thread(() -> {
  109. for (int i = 1; i <=10 ; i++) {
  110. shareResource.print15();
  111. }
  112. },"C").start();
  113. }
  114. }

9. Callable与Java线程池

获取多线程的方法,总共有四种:

  • 实现Runnable接口
  • 实现Callable接口
  • 实例化Thread类
  • 使用线程池获取

    9.1 Callable接口

    已经有Runnable接口,为什么还要出现Callable接口?请你谈谈它的诞生的前身背景?

  • 因为并发,异步导致Callable接口的出现;

  • 主要是用Callable,能够实现当多个任务执行当中,若有一个任务完成的耗时时间比较长,那么可以先将其他任务先完成,然后等待这个耗时比较长的任务结束以后一起进行总的计算

我们实现Callable接口,也需要实现call方法,但是这个时候我们还需要有返回值,这个Callable接口的应用场景一般就在于批处理业务,比如转账的时候,需要给一会返回结果的状态码回来,代表本次操作成功还是失败

  1. class MyCallable implements Callable<Integer> {
  2. @Override
  3. public Integer call() throws Exception {
  4. System.out.println("*****************come in callable");
  5. try {
  6. TimeUnit.SECONDS.sleep(2);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. return 1024;
  11. }
  12. }

如何启动Callable呢?Thread类只能传入Runnable,这里需要用到的是FutureTask类。他实现了Runnable接口,并且还需要传递一个实现Callable接口的类作为构造函数;随后再通过Thread(FutureTask).start()去启动这个线程(这是一个代理模式)。

原来我们的方式是main方法一条龙处理,在引入Callable后,对于执行比较久的线程,可以单独新开一个线程进行执行,最后在进行汇总输出;最后需要注意的是 要求获得Callable线程的计算结果,如果没有计算完成就去获取返回值,会导致阻塞,直到计算完成(所以我们建议将futureTask.get()放到最后)。
当然我们可以设置一个自旋的机制while (!futureTask.isDone())来判断是否执行完毕。

注意:同一个FutureTask只能被执行一次,也就是说使用两个Thread开启同一个FutureTask,只会运行一个

  1. public class SeasonCallableDemo {
  2. public static void main(String[] args) throws ExecutionException, InterruptedException {
  3. // 两个线程一个main。一个是FutureTask
  4. FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
  5. new Thread(futureTask, "Thread A").start();
  6. new Thread(futureTask, "Thread B").start(); // 注意,futureTask只会被执行一次
  7. System.out.println(Thread.currentThread().getName() + "***************");
  8. // futureTask.get(); // 获得callable线程的计算结果,如果没有计算完成,会导致阻塞,只到计算完成,建议放在最后
  9. while (!futureTask.isDone()) { // isDone futureTask是否执行完,这里的while有点自旋锁的感觉
  10. System.out.println("执行完毕+,result=" + futureTask.get()); // 执行完了立马取值
  11. }
  12. }
  13. }

Callable和Runnable的区别

  • Callable可以抛出异常,Runnable不行;
  • Callable支持泛型、可以有返回值,Runnable没有返回值;

    9.2 线程池

    为什么用线程池,有什么好处

    线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,如果线程超过了最大数量,超出的数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
    他的主要特点为:线程复用、控制最大并发数、管理线程
    第一:降低资源消耗,通过重复利用自己创建的线程降低线程创建和销毁造成的消耗;
    第二:提高响应速度。当任务到达时,任务可以不需要等到线程和粗昂就爱你就能立即执行;
    第三:提高线程的可管理性。线程是稀缺资源,如果无限的创建,不仅会消耗资源,还会较低系统的稳定性,使用线程池可以进行统一分配,调优和监控。

    线程池的架构

    image.png
    线程池的底层就是ThreadPoolExecutor

    线程池的使用

  • Executors.newFixedThreadPool(int i) :创建一个拥有 i 个线程的线程池

    • 执行长期的任务,性能好很多
    • 创建一个定长线程池,可控制线程数最大并发数,超出的线程会在队列中等待
  • Executors.newSingleThreadExecutor():创建一个只有1个线程的 单线程池
    • 一个任务一个任务执行的场景
    • 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行
  • Executors.newCacheThreadPool():创建一个可扩容的线程池
    • 执行很多短期异步的小程序或者负载教轻的服务器
    • 创建一个可缓存线程池,如果线程长度超过处理需要,可灵活回收空闲线程,如无可回收,则新建新线程
  • Executors.newScheduledThreadPool(int corePoolSize):线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整形的最大数的线程池
  • Executors.newWorkStealingPool(int):java8新增,使用目前机器上可以的处理器作为他的并行级别

    1. public class MyThreadPoolDemo {
    2. public static void main(String[] args) {
    3. // System.out.println(Runtime.getRuntime().availableProcessors());
    4. // ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); // 固定数量的线程池 5个处理线程
    5. // ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor(); // 一池一个处理线程
    6. ExecutorService fixedThreadPool = Executors.newCachedThreadPool(); // 一池N个处理线程
    7. try {
    8. // 模拟10个用户来办理业务,每个用户就是一个来自外部的请求线程
    9. for (int i = 0; i < 10; i++) {
    10. fixedThreadPool.execute(() ->
    11. System.out.println(Thread.currentThread().getName() + "\t 办理业务"));
    12. // int finalI = i;
    13. // Future<Integer> submit = fixedThreadPool.submit(() -> {
    14. // System.out.println(Thread.currentThread().getName());
    15. // return finalI;
    16. // });
    17. // System.out.println(submit.get());
    18. }
    19. }catch (Exception e) {
    20. e.printStackTrace();
    21. }finally {
    22. fixedThreadPool.shutdown();
    23. }
    24. }
    25. }

    线程池7大参数

    image.png

  1. corePoolSize:线程池中的常驻核心线程数;
  2. maximumPoolSize:线程池中能够容纳同时执行的最大线程数,此值必须大于等于1;
  3. keepAliveTime:多余的空闲线程的存活时间当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁直到只剩下corePoolSize个线程为止;
  4. unit:keepAliveTime的单位 ;
  5. workQueue:任务队列(阻塞队列),被提交但尚未被执行的任务;(排号等位)
  6. threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程,一般默认的即可
  7. handler:拒绝策略,表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的runnable的策略;

    *线程池的底层工作原理

    image.png

  8. 在创建了线程池后开始等待请求;

  9. 当调用execute()方法添加一个请求任务时,线程池会做以下判断:
    1. 如果正在运行的线程数量小于corePoolSize的数量,那么立即创建线程运行当前任务;
    2. 如果正在运行的线程数量≥corePoolSize,那么会尝试将当前任务放入阻塞队列;
    3. 如果阻塞队列已满,且正在运行的线程数 < maxPoolSize,那么创建线程运行当前任务;
    4. 如果阻塞队列已满,且正在运行的线程数 = maxPoolSize,那么线程池会启动饱和拒绝策略来执行;
  10. 当线程池中的一个线程完成任务时,他会从阻塞队列中取下一个任务来执行;
  11. 当一个线程空闲超过keepAliveTime时,线程会进行判断:
    1. 如果当前启动线程数大于corePoolSize,则关闭当前线程;
    2. 最终会收缩到corePoolSize的大小;

      线程池的拒绝策略

      阻塞队列已经满了,同时线程池中的线程已经达到了maxPoolSize,此时无法处理新的任务;此时就需要通过拒绝策略来处理当前任务。JDK内置的拒绝策略分为四种(均实现了RejectExecutionHandler接口):
  • AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
  • CallerRunsPolicy:”调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中尝试再次提交当前任务。
  • DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

    9.3 工作中必须自定义线程池

    为什么不使用自带的线程池

    根据阿里巴巴手册:并发控制这章

  • 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程

    • 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题,如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题
  • 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。Executors返回的线程池对象弊端如下:

    • FixedThreadPool和SingleThreadPool、ScheduledThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
    • CacheThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

      自定义线程池

      线程池中能够容纳的最大任务数 = maxPoolSize + 阻塞队列大小
      1. public class MyThreadPoolDemo {
      2. public static void main(String[] args) {
      3. ExecutorService threadPool = new ThreadPoolExecutor(2, 5,
      4. 1L, TimeUnit.SECONDS,
      5. new LinkedBlockingQueue<Runnable>(3),
      6. Executors.defaultThreadFactory(),
      7. new ThreadPoolExecutor.AbortPolicy());
      8. try {
      9. // 模拟10个用户来办理业务,每个用户就是一个来自外部的请求线程
      10. for (int i = 0; i < 10; i++) {
      11. threadPool.execute(() -> System.out.println(Thread.currentThread().getName() + "\t 办理业务")
      12. );
      13. }
      14. } catch (Exception e) {
      15. e.printStackTrace();
      16. } finally {
      17. threadPool.shutdown();
      18. }
      19. }
      20. }
      采用AbortPolicy拒绝策略:
      image.png
      采用CallerRunsPolicy拒绝策略:
      image.png
      采用DiscardOldPolicy和DiscardPolicy(都不会报异常):
      image.png

      线程池怎么配置合理的参数

      生产环境中如何配置 corePoolSize 和 maximumPoolSize这个是根据具体业务来配置的,分为CPU密集型和IO密集型。
  • CPU密集型

CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程)。
CPU密集型任务配置尽可能少的线程数量一般公式: maxPoolSize = CPU核数+1介线程的线程池

  • IO密集型

IO密集型,即该任务需要大量的IO,即大量的阻塞。在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。IO密集型时,大部分线程都阻塞,故需要多配置线程数

  1. 由于IO密集型任务线程并不是一直在执行任务则应配置尽可能多的线程,如maxPoolSize = CPU核数*2
  2. 参考公式: maxPoolSize = CPU核数/(1- 阻塞系数) 阻塞系数在0.8~0.9之间 比如8核CPU:8/(1-0.9)= 80个线程数

    10. 死锁编码及定位分析

    10.1 什么是死锁

    死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去,如果系统资源充足,进程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁。
    image.png
    产生死锁的主要原因:系统资源不足、进程运行推进的顺序不合适、资源分配不当

    10.2 *死锁产生的四个必要条件

  • 互斥条件:同一时间只有一个线程获取资源
    • 解决办法:把互斥的共享资源封装为可同时访问
  • 不可剥夺条件:线程占有的资源在释放前不会被其他线程抢占
    • 解决办法:如果线程不能获取到所需的全部资源便进入等待状态,等待期间他占有的资源将被隐式的释放重新加入到系统的资源列表中,可以被其他的进程使用,而等待的进程只有重新获得自己原有的资源以及新申请的资源才可以重新启动,执行
  • 请求与保持条件:线程在等待时,不会释放已占有资源
    • 解决办法:
      • 静态分配:每个进程在开始执行时就申请需要的全部资源;
      • 动态分配:每个进程在申请所需要的资源时他本身不占用系统资源
  • 循环等待条件:多个线程互相等待对方释放资源

    • 解决办法:采用资源有序分配,基本思想是将所有资源编号,紧缺资源采用较大编号,在申请资源时按照资源编号顺序进行,只有先获取小编号才能申请获取大编号资源。

      10.3 写个死锁案例

      ```java class HoldLockThread implements Runnable {

      private String lockA; private String lockB;

      public HoldLockThread(String lockA, String lockB) { this.lockA = lockA; this.lockB = lockB; }

      @Override public void run() { synchronized (lockA) {

      1. System.out.println(Thread.currentThread().getName() + "\t 自己持有:" + lockA
      2. + "\t 尝试获得:" + lockB);
      3. try {
      4. TimeUnit.SECONDS.sleep(1);
      5. } catch (InterruptedException e) {
      6. e.printStackTrace();
      7. }
      8. synchronized (lockB) {
      9. System.out.println(Thread.currentThread().getName() + "\t 自己持有:" + lockB
      10. + "\t 尝试获得:" + lockA);
      11. }

      } } }

/**

  • @author mrlinxi
  • @create 2022-04-12 23:47
  • 死锁演示
  • 死锁是指两个或两个以上的进程在执行过程中,
  • 因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去 */ public class DeadLockDemo { public static void main(String[] args) {
    1. String lockA = "lockA";
    2. String lockB = "lockB";
    3. new Thread(new HoldLockThread(lockA, lockB), "Thread A").start();
    4. new Thread(new HoldLockThread(lockB, lockA), "Thread B").start();
    } } ``` image.pngmain线程不会结束,产生了死锁。

    分析:我们创建了两个字符串”lockA” “lockB”,线程A中,lockA指向的是字符串”lockA”,lockB指向的是字符串”lockB”,线程A获取到了字符串”lockA”这把锁,sleep后尝试获取”lockB”这把锁; 线程B中,lockA指向的是字符串”lockB”,lockB指向的是字符串”lockA”,在线程A sleep的时候,线程B获取了字符串”lockB”这把锁,sleep后尝试获取字符串”lockA”这把锁。 此时线程A和线程B都在等待对方释放自己的锁,形成了死锁

如何排查是否产生了死锁?

我们来介绍jsp(命令定位进程编号)和jstack(找到死锁查看)
image.png

linux ps -ef | grep xxxx ls -l windows下的java运行程序 也有类似ps的查看进程的命令,但是目前我们需要查看的只是java的,所以jdk为我们提供了jsp jps = java ps jps -l (与linux的命令进行类比)

直接在IDEA Terminal下面执行jps -l
image.png
我们可以看到DeadLockDemo这个类一直在运行
然后使用jstack查看堆栈信息

jstack 23840 这个23840就是我们通过jps -l 查出来的进程号

image.png

Found one Java-level deadlock:

“Thread B”: waiting to lock monitor 0x00000162c0429d58 (object 0x00000007806d22f8, a java.lang.String), which is held by “Thread A” “Thread A”: waiting to lock monitor 0x00000162c042c3d8 (object 0x00000007806d2330, a java.lang.String), which is held by “Thread B”

Java stack information for the threads listed above:

“Thread B”: at com.Season2.juc.HoldLockThread.run(DeadLockDemo.java:27) - waiting to lock <0x00000007806d22f8> (a java.lang.String)

  1. - **locked <0x00000007806d2330> (a java.lang.String)**
  2. at java.lang.Thread.run(Thread.java:748)

“Thread A”: at com.Season2.juc.HoldLockThread.run(DeadLockDemo.java:27) - waiting to lock <0x00000007806d2330> (a java.lang.String) - locked <0x00000007806d22f8> (a java.lang.String) at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.

可以看到,输出了详细的死锁信息

JVM体系结构

1. JVM垃圾回收的时候如何确定垃圾?

简单来说就是内存中已经不再被使用的空间就是垃圾

1.1 什么是GCRoots?

为了解决引用计数的循环引用问题,Java使用了可达性分析算法。
image.png
所谓”GC Roots”或者说tracing GC的”根集合”就是一组必须活跃的引用
基本思路就是通过一系列名为”GC Roots”的对象作为起始点,从这个被称为GC Roots的对象开始向下搜索,如果一个对象到GC Roots没有任何引用链相连时,则说明此对象不可用。也即给定一个集合的引用作为根出发,通过引用关系遍历对象图,能被遍历到的(可到达的)对象就被判定为存活;没有被遍历到的就自然被判定为死亡。

1.2 可以成为GCRoots的对象

GCRoots是一个set,其中包含了如下对象:

  1. 虚拟机栈(栈帧中的局部变量区,也叫做局部变量表)中引用的对象
  2. 方法区中静态属性引用的对象
  3. 方法区中常量引用的对象
  4. 本地方法栈中JNI(Native方法)引用的对象

    2. JVM参数调优

    2.1 JVM参数类型

    标配参数和X参数

  • 标配参数
    • -version
    • -help
    • java -showversion
  • X参数
    • -Xint:解释执行
    • -Xcomp:第一次使用就编译成本地代码
    • -Xmixed:混合模式

image.png

XX参数(重点)

  • Boolean类型:-XX:+或者- 某个属性值 (+表示开启 -表示关闭)
  • key-value类型:-XX:属性key=属性value

    jinfo举例,如何查看当前运行程序的配置

    例子:如何查看一个正在运行中的Java程序,他的某个JVM参数是否开启?具体值是多少?
    先写一个程序让他一直睡
    1. public class HelloGC {
    2. public static void main(String[] args) {
    3. System.out.println("******HelloGC******");
    4. try {
    5. TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    6. } catch (InterruptedException e) {
    7. e.printStackTrace();
    8. }
    9. }
    10. }
    可以使用如下命令查看他的默认参数:

    jps:查看java的后台进程 主要是获取java程序的进程号 jps -l jinfo:查看正在运行的java程序信息 jinfo -flag 配置项 进程编号 jstack:查看正在运行的java程序的栈信息(可以用来查看死锁) jstack 进程号

image.png
可以看到当前GC细节是不打印的,所以我们需要手动设置打印GC细节。
image.png
设置完成后记得重启Java程序,然后再次查看发现已经开启了打印GC细节。
同样可以设置是否使用串行垃圾收集器:-XX:+UseSerialGC
jinfo -flags java进程号 可查看当前进程的所有参数

Xms Xmx

image.png

2.2 查看JVM默认值

  • -XX:+PrintFlagsInitial:查看初始默认值
    • 用法:直接在cmd中输入
      • java -XX:+PrintFlagsInitial -version
      • java -XX:+PrintFlagsInitial
  • -XX:+PrintFlagsFinal:主要查看修改更新
    • 用法,跟上面的一样:
      • java -XX:+PirntFlagsFinal
      • java -XX:+PirntFlagsFinal -version

image.png
我们在查看配置的时候会发现有=号也有:=,=号表示没有被修改,:=表示被修改过了(可能是JVM自己改的)
image.png
使用 -XX:+PrintCommandLineFlags -version 打印出JVM的默认的简单初始化参数:
image.png

2.3 PrintFlagsFinal举例,运行Java命令的同时打印出参数

现在的情况是我既想运行java程序又想要打印输出参数:
java -XX:+PrintFlagsFinal -Xss128k HelloGC
image.png

2.4 JVM常用基本配置参数有哪些

image.png
通过代码查看堆内存

  1. public class HelloGC {
  2. public static void main(String[] args) {
  3. long totalMemory = Runtime.getRuntime().totalMemory(); //返回java虚拟机中的内存变量 默认 1/64
  4. long maxMemory = Runtime.getRuntime().maxMemory(); //返回java虚拟机试图使用的最大内存量 默认 1/4
  5. System.out.println("TOTAL_MEMORY(-Xms) = " + totalMemory + " (字节) 、" +(totalMemory / (double)1024 / 1024 + "MB"));
  6. System.out.println("MAX_MEMORY(-Xmx) = " + maxMemory + " (字节) 、" +(maxMemory / (double)1024 / 1024 + "MB"));
  7. }
  8. }

常用参数

  • -Xms:初始化堆内存,默认为物理内存的1/64,等价于 -XX:initialHeapSize
  • -Xmx:最大堆内存,默认为物理内存的1/4,等价于-XX:MaxHeapSize
  • -Xss:设计单个线程栈的大小,一般默认为512K~1024K,等价于 -XX:ThreadStackSize
    • 使用 jinfo -flag ThreadStackSize [java进程id] 会发现 -XX:ThreadStackSize = 0
    • 这个值的大小是取决于平台的
    • Linux/x64:1024KB
    • OS X:1024KB
    • Oracle Solaris:1024KB
    • Windows:取决于虚拟内存的大小
  • -Xmn:设置年轻代大小 (一般不用设置,默认即可) 等价于:-XX:NewSize
  • -XX:MetaspaceSize:设置元空间大小
    • 元空间的本质和永久代类似,都是对JVM规范中方法区的实现,不过元空间与永久代之间最大的区别在于:元空间并不在虚拟机中,而是使用本地内存,因此,默认情况下,元空间的大小仅受本地内存限制。
    • -Xms10m -Xmx10m -XX:MetaspaceSize=1024m -XX:+PrintFlagsFinal
    • 但是默认的元空间大小:只有20多M
    • 为了防止在频繁的实例化对象的时候,让元空间出现OOM,因此可以把元空间设置的大一些
  • -XX:+PrintGCDetails:输出详细GC收集日志信息
    • GC
    • Full GC

我们使用一段代码,制造出垃圾回收的过程,并让程序抛出OOM heap space
设置程序的JVM配置:-Xms 10m -Xmx 10m -XX:+PrintGCDetails
在代码中创建一个非常大的byte数组:

byte[] arr = new byte[50 1024 1024]; //创建一个 非常大空间的byte类型数组

运行结果,可以看到出现了GC和Full GC
image.png
GC发生在新生代,字段解释:

[GC (Allocation Failure) [PSYoungGen: 504K->488K(2560K)] 680K->688K(9728K), 0.0010533 secs] [Times: user=0.05 sys=0.00, real=0.00 secs]

image.png
Full GC发生在老年代,输出字段解释:
image.png