title: Java并发编程
date: 2021-11-25 19:31:20
tags: Java多线程
categories: Java
cover: imgcat/java.png


1. 多线程

1 多线程基础

从软件和硬件上实现多个进程并发执行的技术,具有多线程能力的计算机由于硬件的支持进而可以同一时间执行多个线程,提升程序运行效率

并发与并行

并发:同一时刻,多条指令在CPU上交替执行

并行:同一时刻,多条指令在CPU上同时执行

进程与线程

进程:进程指正在运行的程序。程序是由指令和数据组成的,指令需要运行,数据需要读写,必须将指令加载值CPU,数据加载至内存;指令运行过程中还需要用到磁盘,网络等设备。进程是用来加载指令,管理内存,管理IO的。

  1. 进程特性:
  • 独立性:进程是一个独立运行的基本单位,是用来分配系统资源和调度的单位
  • 动态性:进程的产生就是程序的一次执行,进程是伴随着程序执行动态产生,动态消亡的
  • 并发性:任何进程都可以与其他进程一起并发执行

线程:进程中的顺序控制流,是一条执行路径。一个进程之内可以分为多个线程,线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给CPU执行。Java中,线程为最小调度单位,进程作为资源分配的最小单位。Windows中进程是不活动的,只是作为线程的容器。

  • 单线程:如果一个进程只有一条执行路径,则称之为单线程程序
  • 多线程:如果一个进程由多条执行路径,则称之为多线程程序

进程与线程的对比

  • 进程基本是相互独立的,线程存在于进程内,属于进程的子集;进程拥有共享的资源(内存空间),由其内部的线程共享。
  • 进程间的通信较为复杂,同一台计算机的进程间通信称为IPC;不同计算机之间的通信需要通过网络,并且最受共同的网络协议,如HTTP协议。
  • 线程间的通信相对简单,因为线程间共享进程内的内存。多个线程可以访问同一个共享变量,线程更加的轻量,线程的上下文切换成本一般比进程的上下文切换低。

进程和线程的切换

上下文切换:系统内核为每一个进程维持一个上下文。上下文是内核重新启动一个被抢占的进程所需要的状态,包括通用目的寄存器、浮点寄存器、程序计数器、状态寄存器、用户栈、内核栈和各种内核数据结构。

进程切换与线程切换的区别:进程的切换会涉及到虚拟地址空间的切换,因为每个进程都有自己的虚拟空间地址,而线程是共享进程的虚拟空间地址,所以线程的切换不会设计到虚拟空间地址的切换

2 多线程的实现

1. 继承Thread类

  • 定义一个MyThread类继承Thread类,重写run方法,run方法里封装被线程执行的代码
  • 创建MyThread对象,调用start方法,Java虚拟机会调用此线程的run方法
  1. public class _01多线程实现方式一{
  2. static class MyThread extends Thread{
  3. @Override
  4. public void run(){
  5. for(int i = 0; i < 100; i++){
  6. System.out.println(getName() + ":" + i);
  7. }
  8. }
  9. }
  10. public static void main(String[] args){
  11. MyThread t1 = new MyThread();
  12. MyThread t2 = new MyThread();
  13. t1.start();
  14. t2.start();
  15. }
  16. }

2. 实现Runnable接口

  • 定义一个MyRunnable类实现Runnalbe接口,重写Runnable接口
  • 创建MyRunnable对象,将该对象作为参数传递给线程对象Thread
  1. public class _02多线程实现方式二{
  2. public static class MyRunnable implements Runnable{
  3. @Override
  4. public void run(){
  5. for(int i = 0; i < 100; i++){
  6. System.out.println(Thread.currentThread()
  7. .getName() + ":" + i);
  8. }
  9. }
  10. }
  11. public static void main(String[] args){
  12. MyRunnable mr = new MyRunnable();
  13. Thread t1 = new Thread(mr, "线程1");
  14. Thread t2 = new Thread(mr, "线程2");
  15. t1.start();
  16. t2.start();
  17. }
  18. }

3. 实现callable接口

  • 定义MyCallable实现Callable接口,重写call方法,call方法中封装线程执行的方法
  • 创建MyCallable对象,将其作为参数传入一个新建的FutureTask对象中
  • 使用FutureTask对象的get方法,可以获得线程执行完成返回的数据
  1. public class MyCallable implements Callable<String>{
  2. @Override
  3. public String call() throws Exception{
  4. for(int i = 0; i < 100; i++){
  5. System.out.println(Thread.currentThread()
  6. .getName() + ":" + i);
  7. return "线程执行完毕";
  8. }
  9. }
  10. public static void main(String[] args) throws Exception{
  11. MyCallable mc = new MyCallable();
  12. // FutureTask可以获得线程执行完毕的结果,也可以作为参数 传递给Thread
  13. FutureTask<String> ft1 = new FutureTask<>(mc);
  14. FutureTask<String> ft2 = new FutureTask<>(mc);
  15. Thread t1 = new Thread(ft1, "线程1");
  16. Thread t2 = new Thread(ft2, "线程2");
  17. t1.start();
  18. t2.start();
  19. System.out.println(ft1.get());
  20. System.out.println(ft2.get());
  21. }

3 线程运行原理

栈与栈帧

  • 每个线程启动后,Java虚拟机栈会为其分配一块栈内存
  • 每个栈由多个栈帧构成,对应着方法调用时占用的内存

线程上下文切换

  1. **Thread Context Switch:** 因为一些原因导致CPU不再执行当前的线程,转而去执行另一个线程的代码,原因如下:
  • CPU为线程分配的时间片用完
  • 垃圾回收线程或者有更高优先级的线程需要运行
  • 线程自身调用了sleep、yield、join、wait、park、synchronized、lock等方法

当线程上下文切换发生时,需要操作系统保存当前线程的状态,并恢复另一个线程的状态,即通过程序计数器PC中保存的jvm指令执行地址来恢复,PC是线程私有的

  • 保存的状态包括程序计数器、虚拟机栈的栈帧信息,局部变量表、操作数栈、方法返回地址等
  • 频繁的线程上下文切换会影响程序执行的性能

4 多线程方法

API

方法 说明
public void run() 线程启动后调用该方法
public void setName(String name) 给当前线程取名字
public void getName() 获取当前线程的名字
线程存在默认名称:子线程是Thread-索引,主线程是main
public void start() 启动一个新线程;Java虚拟机调用此线程的run方法
public static Thread currentThread() 获取当前线程对象,代码在哪个线程中执行
public static void sleep(long time) 让当前线程休眠多少毫秒再继续执行
Thread.sleep(0) : 让操作系统立刻重新进行一次cpu竞争
public static native void yield() 提示线程调度器让出当前线程对CPU的使用
public final int getPriority() 返回此线程的优先级
public final void setPriority(int priority) 更改此线程的优先级,常用1 5 10
public void interrupt() 中断这个线程,异常处理机制
public static boolean interrupted() 判断当前线程是否被打断,清除打断标记
public boolean isInterrupted() 判断当前线程是否被打断,不清除打断标记
public final void join() 等待这个线程结束
public final void join(long millis) 等待这个线程死亡millis毫秒,0意味着永远等待
public final native boolean isAlive() 线程是否存活(还没有运行完毕)
public final void setDaemon(boolean on) 将此线程标记为守护线程或用户线程

start run

被创建的Thread对象直接调用重写的run方法时, run方法是在主线程中被执行的,而不是在我们所创建的线程中执行。所以如果想要在所创建的线程中执行run方法,需要使用Thread对象的start方法。

sleep yield wait

sleep(): 用于使线程阻塞,线程从Running状态进入Timed Waiting状态(阻塞状态);使用interrupt方法可以打断正在运行的线程,sleep方法会抛出InterruptedException;睡眠结束的线程未必会立即执行,需要重新和其他线程竞争获取CPU时间片!

yield():用于退出当前线程,线程从Running状态进入Runnable状态;然后系统调度执行其他线程。

wait(): 锁对象的方法,用于等待其他线程传来的数据,线程处于阻塞状态。

join

join():用于等待某个线程结束,线程处于阻塞状态;如在主线程中调用t1.join(),主线程会等待t1线程结束才会继续执行。

interrupt

interrupt():用于打断处于阻塞状态的线程(sleep, wait, join)。

  • 如果一个线程在在运行中被打断,打断标记会被置为true。
  • 如果是打断因sleep wait join方法而被阻塞的线程,会将打断标记置为false

interrupt方法应用:优雅的打断其他线程

两阶段终止模式:见

线程优先级

线程调度方式

  • 分时调度模型:所线程轮流使用CPU的使用权,平均分配每个线程占用CPU的时间片
  • 抢占式调度模型:优先让优先级高的线程使用CPU,如果优先级相同,随机一个线程抢占时间片,优先级高的线程获得的CPU时间片多一些。Java使用的是抢占式调度模型

final int getPriority() : 返回此线程的优先级

final void setPriority(int newPriority): 更改此线程的优先级线程默认优先级是5;线程优先级的 范围是:1-10

守护线程

void setDaemon(boolean on): 将此线程标记为守护线程,当运行的线程都是守护线程 时,Java虚拟机将退出

5 线程状态

操作系统五种线程状态

线程五种状态.png

  • 【初始状态】仅是在语言层面创建了线程对象,还未与操作系统线程关联(例如线程调用了start方法)
  • 【可运行状态】(就绪状态)指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
  • 【运行状态】指获取了 CPU 时间片运行中的状态
    • 当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
  • 【阻塞状态】
    • 如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入 【阻塞状态】
    • 等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
    • 与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们
  • 【终止状态】表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态

  • 如果有大量处于阻塞状态的进程,进程可能会占用着物理内存空间,显然不是我们所希望的,毕竟物理内存空间是有限的,被阻塞状态的进程占用着物理内存就一种浪费物理内存的行为。所以,在虚拟内存管理的操作系统中,通常会把阻塞状态的进程的物理内存空间换出到硬盘,等需要再次运行的时候,再从硬盘换入到物理内存。那么,就需要一个新的状态,来描述进程没有占用实际的物理内存空间的情况,这个状态就是挂起状态。这跟阻塞状态是不一样,阻塞状态是等待某个事件的返回。

挂起状态可以分为两种:

  • 阻塞挂起状态:进程在外存(硬盘)并等待某个事件的出现;
  • 就绪挂起状态:进程在外存(硬盘),但只要进入内存,即刻立刻运行;

Java并发编程 - 图3

Java六种线程状态

线程由生到死的完整过程(生命周期):当线程被创建并启动以后,既不是一启动就进入了执行状态,也不是一直处于执行状态,在 API 中 java.lang.Thread.State 这个枚举中给出了六种线程状态:

线程状态 导致状态发生条件
NEW(新建) 线程刚被创建,但是并未启动,还没调用 start 方法,只有线程对象,没有线程特征
Runnable(可运行) 线程可以在 Java 虚拟机中运行的状态,可能正在运行自己代码,也可能没有,这取决于操作系统处理器,调用了 t.start() 方法:就绪(经典叫法)
Blocked(锁阻塞) 当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入 Blocked 状态;当该线程持有锁时,该线程将变成 Runnable 状态
Waiting(无限等待) 一个线程在等待另一个线程执行一个(唤醒)动作时,该线程进入 Waiting 状态,进入这个状态后不能自动唤醒,必须等待另一个线程调用 notify 或者 notifyAll 方法才能唤醒
Timed Waiting (计时等待) 有几个方法有超时参数,调用将进入 Timed Waiting 状态,这一状态将一直保持到超时期满或者接收到唤醒通知。带有超时参数的常用方法有 Thread.sleep 、Object.wait
Teminated(被终止) run 方法正常退出而死亡,或者因为没有捕获的异常终止了 run 方法而死亡

线程六种状态.png
ps: Java里的RUNNABLE状态中是包含阻塞状态的(不是指BLOCKED,它特指竞争锁失败的状态),如拷贝文件时使用阻塞IO,此时线程不使用CPU,但需要一直等待IO结束,此时线程就处于RUNNABLE的阻塞状态。

情况一:NEW –> RUNNABLE

  • 当调用了t.start()方法时,由 NEW –> RUNNABLE

情况二: RUNNABLE <–> WAITING

  • 当调用了t 线程用 synchronized(obj) 获取了对象锁后
    • 调用 obj.wait() 方法时,t 线程从 RUNNABLE –> WAITING
    • 调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
      • 竞争锁成功,t 线程从 WAITING –> RUNNABLE
      • 竞争锁失败,t 线程从 WAITING –> BLOCKED

情况三:RUNNABLE <–> WAITING

  • 当前线程
    调用 t.join() 方法时,当前线程从 RUNNABLE –> WAITING
    • 注意是当前线程在t 线程对象的监视器上等待
  • t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING –> RUNNABLE

情况四: RUNNABLE <–> WAITING

  • 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE –> WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING –> RUNNABLE

情况五: RUNNABLE <–> TIMED_WAITING

t 线程用 synchronized(obj) 获取了对象锁后

  • 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE –> TIMED_WAITING
  • t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时
    • 竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE
    • 竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED

情况六:RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 t.join
    (long n
    ) 方法时,当前线程从 RUNNABLE –> TIMED_WAITING
    • 注意是当前线程在t 线程对象的监视器上等待
  • 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING –> RUNNABLE

情况七:RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE –> TIMED_WAITING
  • 当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING –> RUNNABLE

情况八:RUNNABLE <–> TIMED_WAITING

  • 当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线 程从 RUNNABLE –> TIMED_WAITING
  • 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE

情况九:RUNNABLE <–> BLOCKED

  • t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED
  • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED

情况十: RUNNABLE <–> TERMINATED

  • 当前线程所有代码运行完毕,进入 TERMINATED

2.管程

1 线程安全问题

线程安全问题:根本原因是多线程的上下文切换,导致当前线程中的指令没有执行完就切换执行其他线程

  1. class _01ThreadSafty{
  2. private static int count = 0;
  3. public static void main(String[] args) throws InterruptedException{
  4. Thread t1 = new Thread(() -> {
  5. for(int i = 0; i < 5000; i++){
  6. count++;
  7. }
  8. });
  9. Thread t2 = new Thread(() -> {
  10. for(int i = 0; i < 5000; i++){
  11. count--;
  12. }
  13. });
  14. t1.start();
  15. t2.start();
  16. t1.join();
  17. t2.join();
  18. log.debug("count的值为{}", count);
  19. }
  20. }
  1. 打印信息:count的值不为0
  2. 16:46:12.488 h._01ThreadSafty02 [main] - count的值为77

这是因为count++自增与count—自减都不是原子操作
分成以下四个原子操作
image.png

  1. getstatic i // 获取静态变量i的值
  2. iconst_1 // 准备常量1
  3. iadd // 自增
  4. putstatic i // 将修改后的值存入静态变量i
  5. getstatic i // 获取静态变量i的值
  6. iconst_1 // 准备常量1
  7. isub // 自减
  8. putstatic i // 将修改后的值存入静态变量i
  • 可以看到count++ 和 count— 操作实际都是需要这个4个指令完成的,那么这里问题就来了!Java 的内存模型如下,完成静态变量的自增,自减需要在主存和工作内存中进行数据交换:

image.png
如果代码按照下面左图正常的顺序执行,count的值是不会计算出错的,而右图中两个线程间发生了上下文切换,两个线程操作共享变量i
导致i的计算出现了错误,产生了线程安全问题
未命名文件 (5).png

  1. 临界区
  • 多个线程访问共享资源时会出现问题
    • 多个线程对共享资源进行读操作时不会出现问题
    • 在多个线程对共享变量进行写操作时,可能会发生指令交错,就会出现线程安全问题
  • 一段代码如果存在对共享资源的多线程读写操作,这段代码称之为临界区 ```java static int counter = 0;

static void increment() // 临界区 {
counter++; }

static void decrement() // 临界区 { counter—; }

  1. 2. **竞态条件**
  2. - 多个线程在临界区执行,由于指令执行的顺序不确定造成运行结果不确定的问题,称为竞态条件
  3. <a name="8e0cac83"></a>
  4. ## 2 synchronized
  5. 为了避免临界区中竞态条件的发生,可以使用多种手段达到
  6. - **阻塞式解决方案:synchronized, lock (ReentrantLock)**
  7. - **非阻塞式解决方案:原子变量CAS**
  8. synchronized即俗称的**对象锁**,它采用**互斥**的方式使得同一时刻下至多有一个线程拥有对象锁,**其他线程若想获得对象锁就会被阻塞住**,这样就可以使得拥有锁的线程可以安全的执行临界区中的代码,**不用担心发生线程上下文切换**
  9. <a name="j0P3O"></a>
  10. ### 2.1 synchronized语法
  11. ```java
  12. synchronized(对象){
  13. // 临界区
  14. }

上述案例使用synchronized加锁,既可以解决线程安全问题

  1. class _01ThreadSafty{
  2. private static int count = 0;
  3. public static void main(String[] args) throws InterruptedException{
  4. Thread t1 = new Thread(() -> {
  5. synchronized (_01ThreadSafty.class){
  6. for(int i = 0; i < 5000; i++){
  7. count++;
  8. }
  9. }
  10. });
  11. Thread t2 = new Thread(() -> {
  12. synchronized (_01ThreadSafty.class){
  13. for(int i = 0; i < 5000; i++){
  14. count--;
  15. }
  16. }
  17. });
  18. t1.start();
  19. t2.start();
  20. t1.join();
  21. t2.join();
  22. log.debug("count的值为{}", count);
  23. }
  24. }
  1. 打印信息:count的值为0
  2. 18:09:03.621 h._01ThreadSafty [main] - count的值为0

2.2 synchronized原理

  • synchronized利用对象锁保证了临界区代码的原子性,临界区的代码在外界看来是不可分割的,不会被线程切换所打断

上述加锁代码执行逻辑
未命名文件 (6).png

  • 当多个线程对临界资源进行写操作时,或出现线程安全问题
  • 使用synchronized关键字,使得多个线程共有一个对象锁可以避免竞态条件的发生

    2.3 synchronized加在方法上

  1. synchronized加在实例方法上,锁对象就是实例对象

    1. public class Demo{
    2. // synchronized加在实例方法上
    3. public synchronized void test(){
    4. ...
    5. }
    6. // 等价于
    7. public void test(){
    8. synchronized(this){
    9. ...
    10. }
    11. }
    12. }
  2. synchronized加在静态方法上,锁对象就是当前列的Class实例

    1. public class Demo{
    2. // synchronized加在实例方法上
    3. public synchronized static void test(){
    4. ...
    5. }
    6. // 等价于
    7. public static void test(){
    8. synchronized(Demo.class){
    9. ...
    10. }
    11. }
    12. }

    3 线程安全分析

    3.1 成员变量与静态变量的线程安全分析

  • 如果变量没有在线程间共享,变量则是线程安全的
  • 如果变量在线程间共享

    • 如果线程内只有读操作,则是线程安全的
    • 如果线程内有读写操作,则需要考虑线程安全

      3.2 局部变量线程安全分析

      局部变量被存储在栈帧内
  • 如果局部变量被初始化为基本数据类型,则是线程安全的

  • 如果局部变量被初始化为引用对象,则可能引发线程安全问题

    • 如果引用对象没有逃离方法的作用范围,则引用对象是线程安全的
    • 如果引用对象逃离方法的作用范围,则需要考虑线程安全问题

      3.3 线程安全的情况

  • 局部变量存储于栈帧中,虚拟机栈是线程私有的

  • 如果局部变量被初始化为基本数据类型,则是线程安全的,示例如下

    1. public static void test01(){
    2. int i=10;
    3. i++;
    4. }

    线程调用test01方法时,会在每个线程中分别创建栈帧,所以变量i存在于多个不同的栈内存,因此不存在变量的共享
    image.png

    3.4 线程不安全的情况

    • 如果引用对象逃离方法的作用范围,则需要考虑线程安全问题 ```java @Slf4j(topic = “h.UnsafeTest”) class UnsafeTest{

      ArrayList arrayList = new ArrayList<>();

      public void method01(){ for(int i = 0; i < 100; i++){

      1. method02();
      2. method03();

      } }

      public void method02(){ arrayList.add(“1”); }

      public void method03(){ arrayList.add(“0”); }

}

@Slf4j(topic = “h.ThreadUnSafty”) class ThreadUnSafty{

  1. public static void main(String[] args){
  2. method();
  3. }
  4. private static void method(){
  5. UnsafeTest unsafeTest = new UnsafeTest();
  6. // 创建100个线程
  7. for(int i = 0; i < 100; i++){
  8. new Thread(() -> {
  9. unsafeTest.method01();
  10. }, "线程" + i).start();
  11. }
  12. Sleep.sleep(100);
  13. System.out.println(unsafeTest.arrayList.size());
  14. }

}

  1. ```shell
  2. unsafeTest.arrayList.size == 19773
  3. 线程安全时应为 20000

线程不安全:
原因: 多个线程对arrayList进行读写操作,出线了指令交错

  • 一个 ArrayList ,在添加一个元素的时候,它可能会有两步来完成:
    • 第一步: 在 arrayList[size]的位置存放此元素
    • 第二步: size++
  • 在单线程运行的情况下,如果 size = 0,添加一个元素后,此元素在位置 0,而且 size=1;(没问题)
  • 在多线程情况下,比如有两个线程,线程 A 先将元素存放在位置 0。但是此时 CPU 进行上下文切换 (线程A还没来得及size++),线程 B 得到运行的机会。线程B也向此 ArrayList 添加元素,因为此时 Size 仍等于0 (注意哦,我们假设的是添加一个元素是要两个步骤哦,而线程A仅仅完成了步骤1),所以线程B也将元素存放在位置0。然后线程A和线程B都继续运行,都增加 size 的值。
  • 那好,现在我们来看看 ArrayList 的情况,元素实际上只有一个,存放在位置 0,而 size 却等于 2。这就是“线程不安全”了。

4 Monitor

4.1 Java对象头

对象头包括两部分:运行时元数据(Mark Word)和类型指针(Klass Word)

  1. 运行时元数据

  2. 类型指针

  • 对于32位的虚拟机来说,普通对象的对象头如下:

image.png

  • 数组对象的对象头另需要32位数据来存储数组的长度,对象头如下所示

image.png
Mark Word的结构

2022-02-26 15-15-32屏幕截图.png


4.2 moniter原理(重量级锁)

未命名文件 (1).png

5 synchronized原理

  1. class SychronizedTest {
  2. static final Object lock = new Object();
  3. static int count = 0;
  4. public static void main(String[] args) {
  5. synchronized (lock) {
  6. count++;
  7. }
  8. }
  9. }
  1. 0 getstatic #7 // 从常量池中获取锁对象lock
  2. 3 dup // 复制一份锁对象
  3. 4 astore_1 // 将lock对象引用存储到 局部变量表slot_1处
  4. 5 monitorenter // 将lock对象的Mark word设置为monitor指针
  5. 6 getstatic #13 // 执行临界区代码,从常量池中获取count
  6. 9 iconst_1 // 准备常数1
  7. 10 iadd // 将count + 1
  8. 11 putstatic #13 // +1后的结果赋值给count
  9. 14 aload_1 // 将局部变量表slot_1中的变量即lock对象引用进栈
  10. 15 monitorexit // 将lock对象Mark Word重置,唤醒EntryList
  11. 16 goto 24 (+8)
  12. 19 astore_2 // 下面为临界区代码出现异常时,锁对象lock的释放
  13. 20 aload_1
  14. 21 monitorexit
  15. 22 aload_2
  16. 23 athrow
  17. 24 return

5.1 轻量级锁

  • 轻量级锁的使用场景:如果多个线程对同一个对象进行加锁,但是加锁的时间是错开的。即虽然是多线程访问同一段代码块,但不会产生竞争,那么就可以使用轻量级锁来优化;避免使用重量级锁上下文切换的消耗
  • 轻量级锁对使用者是透明的,即语法仍然是synchronized (jdk6对synchronized的优化),假设有两个方法同步块,利用同一个对象加锁
  • 线程A来操作临界区的资源, 给资源加锁,到执行完临界区代码,释放锁的过程, 没有线程来竞争, 此时就可以使用轻量级锁; 如果这期间有线程来竞争的话, 就会升级为重量级锁(synchronized)

轻量级锁.png

5.2 锁膨胀

锁膨胀.png

5.3 自旋锁优化

优化重量级锁竞争
当发生重量级锁竞争的时候,还可以使用自旋来进行优化 (不加入Monitor的阻塞队列EntryList中),如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换(持锁线程执行完synchronized同步块后,释放锁,Owner为空,唤醒阻塞队列来竞争,胜出的线程得到cpu执行权的过程) 就获得了锁
优化的点: 不用将线程加入到阻塞队列, 减少cpu切换.
[

](https://blog.csdn.net/m0_37989980/article/details/111408759)

  1. 自旋重试成功

image.png

  1. 自旋重试失败

自旋了一定次数还是没有等到 持锁的线程释放锁, 线程2就会加入Monitor的阻塞队列(EntryList)
image.png
自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。
在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。Java 7 之后不能控制是否开启自旋功能
[

](https://blog.csdn.net/m0_37989980/article/details/111408759)

5.4 偏向锁

场景: 没有竞争的时候, 一个线程中多次使用synchronized需要重入加锁的情况; (只有一个线程进入临界区)

在经常需要竞争的情况下就不使用偏向锁, 因为偏向锁是默认开启的, 我们可以通过JVM的配置, 将偏向锁给关闭
将进入临界区的线程的ID, 直接设置给锁对象的Mark word, 下次该线程又获取锁, 发现线程ID是自己, 就不需要CAS了

在轻量级的锁中,我们可以发现,如果同一个线程对同一个对象进行重入锁时,也需要执行CAS替换操作,这是有点耗时。
那么java6开始引入了偏向锁,将进入临界区的线程的ID, 直接设置给锁对象的Mark word, 下次该线程又获取锁, 发现线程ID是自己, 就不需要CAS了

  • 升级为轻量级锁的情况 (会进行偏向锁撤销) : 获取偏向锁的时候, 发现线程ID不是自己的, 此时通过CAS替换操作, 操作成功了, 此时该线程就获得了锁对象。( 此时是交替访问临界区, 撤销偏向锁, 升级为轻量级锁)
  • 升级为重量级锁的情况 (会进行偏向锁撤销) : 获取偏向锁的时候, 发现线程ID不是自己的, 此时通过CAS替换操作, 操作失败了, 此时说明发生了锁竞争。( 此时是多线程访问临界区, 撤销偏向锁, 升级为重量级锁)

轻量级锁 (1).png
偏向锁状态
Mark Word(运行时元数据)的结构:
image.png

  • Normal:一般状态,没有加任何锁,前面62位保存的是对象的信息,最后2位为状态(01),倒数第三位表示是否使用偏向锁(未使用:0)
  • Biased:偏向状态,使用偏向锁,前面54位保存的当前线程的ID,最后2位为状态(01),倒数第三位表示是否使用偏向锁(使用:1)
  • Lightweight:使用轻量级锁前62位保存的是锁记录的指针,最后2位为状态(00)
  • Heavyweight:使用重量级锁,前62位保存的是Monitor的地址指针,最后2位为状态(10)

一个对象的创建过程

  • 如果开启了偏向锁(默认是开启的),那么对象刚创建之后,Mark Word 最后三位的值101,并且这是它的ThreadId,epoch,age(年龄计数器)都是0,在加锁的时候进行设置这些的值.
  • 偏向锁默认是延迟的,不会在程序启动的时候立刻生效,如果想避免延迟,可以添加虚拟机参数来禁用延迟:-XX:BiasedLockingStartupDelay=0来禁用延迟
  • 注意 : 处于偏向锁的对象解锁后,线程id仍存储于对象头中

批量重偏向
如果对象被多个线程访问,但是没有竞争 (上面撤销偏向锁就是这种情况: 一个线程执行完, 另一个线程再来执行, 没有竞争), 这时偏向T1的对象仍有机会重新偏向T2
重偏向会重置Thread ID
当撤销偏向锁101 升级为 轻量级锁00超过20次后(超过阈值),JVM会觉得是不是偏向错了,这时会在给对象加锁时,重新偏向至加锁线程 (T2)。
批量撤销偏向锁

  • 当 撤销偏向锁的阈值超过40以后 ,就会将整个类的对象都改为不可偏向

锁消除

  • 线程同步的代价是相当高的,同步的后果是降低并发性和性能。
  • 在动态编译同步块的时候,JIT编译器可以借助逃逸分析来判断同步块所使用的锁对象是否只能够被一个线程访问而没有被发布到其他线程。
  • 如果没有,那么JIT编译器在编译这个同步块的时候就会取消对这部分代码的同步。这样就能大大提高并发性和性能。这个取消同步的过程就叫同步省略,也叫锁消除

6 wait-notify

设计模式-保护性暂停

7 park-unpark

8 活跃性

定义: 由于某种原因使得代码一直无法执行,这样的现象叫做活跃性

多把锁

将锁的粒度细分

  • 好处,是可以增强并发度
  • 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁

死锁 (重点)

如果一个线程需要同时获得多把锁,就容易发生死锁

下面的代码中,t1线程获得了A锁,然后t2线程获得了B锁,这是t1线程有尝试获得B锁,就等着t2释放锁;同时t2尝试获得A锁,等着t1释放锁;造成了t1,t2之间相互等着释放锁,此时就称为死锁

  1. public static void main(String[] args){
  2. final Object A = new Object();
  3. final Object B = new Object();
  4. new Thread(() -> {
  5. synchronized(A){
  6. Thread.sleep(1000);
  7. synchronized(B){
  8. // 临界区
  9. }
  10. }
  11. }, "t1").start();
  12. new Thread(() -> {
  13. synchronized(B){
  14. Thread.sleep(1000);
  15. synchronized(A){
  16. // 临界区
  17. }
  18. }
  19. }, "t2").start();
  20. }

活锁

未完成

饥饿


9 ReentrantLock

9.1 ReentrantLock特点

  1. 支持锁重入
  • 可重入锁是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此 有权利再次获取这把锁
  1. 可中断
  • lock.lockInterruptibly() : 可以被其他线程打断的中断锁
  1. 可以设置超时时间
  • lock.tryLock(时间) : 尝试获取锁对象, 如果超过了设置的时间, 还没有获取到锁, 此时就退出阻塞队列, 并释放掉自己拥有的锁
  1. 可以设置为公平锁
  • (先到先得) 默认是非公平, true为公平 new ReentrantLock(true)
  1. 支持多个条件变量
  • (可避免虚假唤醒) - lock.newCondition()创建条件变量对象; 通过条件变量对象调用 await/signal方法, 等待/唤醒

设计模式-顺序控制

10. ThreadLocal

11. InheritableThreadLocal

3. 内存

1. Java内存模型(JMM)

JMM: Java Memory Model, 从Java层面上定义了两个抽象概念即主存,工作内存;主存是线程共享的,工作内存是线程私有的,他们对应着计算机底层的CPU寄存器,缓存,硬件内存,CPU指令优化,让程序员直接面对底层是不友好的,所以从Java层面上抽象出了Java内存模型这一概念。
JMM的存在必要性直接体现先三个方面:原子性,可见性,有序性。

  • 原子性: 保证指令不受线程上下文切换的影响(synchronized, lock)
  • 可见性: 保证指令不受CPU缓存的影响,Java中即时编译器JIT对热点代码缓存优化(volatile)
  • 有序性: 保证指令不受CPU指令优化(指令重排)的影响(volatile)

    2. 可见性

    2.1 退不出的循环

    下面的代码中,main线程将run修改为false,但t1线程无法停止,循环退不出去

    1. public class _01MemoryModel{
    2. static boolean run = true;
    3. public static void main(String[] args) throws InterruptedException{
    4. new Thread(() -> {
    5. while(run){
    6. // run为真,循环一直执行
    7. }
    8. }, "t1").start();
    9. Thread.sleep(1000);
    10. log.debug("runing...");
    11. run = false;
    12. }
    13. }

    为什么无法退出循环

  • 初始状态, t线程刚开始从主内存(成员变量), 因为主线程sleep(1)秒, 这时候t1线程循环了好多次run的值, 超过了一定的阈值, JIT就会将主存中的run值读取到工作内存 (相当于缓存了一份, 不会去主存中读run的值了)。

image.png

  • 因为t线程频繁地从主存中读取run的值,JIT即时编译器会将run的值缓存至自己工作内存中的高速缓存中,减少对主存中run的访问以提高效率

image.png

  • 1 秒之后,main线程修改了run的值, 并同步至主存。而 t线程是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值

image.png
重要:这里出现了认识错误!Volatile并不是禁止高速缓存,直接存主存中读取变量!
出现无法退出的循环是因为t1线程的循环执行了很多次,即时编译器(JIT)会对这份热点代码进行优化,直接将run修改为true,进而造成主线程修改run,而对t1线程的run值无法修改!使用volatile关键字修饰的变量,即时编译器不会对其优化,而是一直从主存中读取变量,解决了这个问题!

2.2 解决方法

  1. volatile关键字

volitale用来修饰成员变量和静态成员变量(放在主存中的变量),它可以避免线程从工作内存中查找变量的值,必须从主存中获取变量值,即线程操作volatile变量直接操作主存中的变量。
所以volatile可以保证代码块内部变量的可见性,但不可以保证代码块的原子性;

  1. public class _01MemoryModel{
  2. volatile static boolean run = true;
  3. public static void main(String[] args) throws InterruptedException{
  4. new Thread(() -> {
  5. while(run){
  6. // run为真,循环一直执行
  7. }
  8. }, "t1").start();
  9. Thread.sleep(1000);
  10. log.debug("runing...");
  11. run = false;
  12. }
  13. }

volatile 可以认为是一个轻量级的锁,被 volatile 修饰的变量,汇编指令会存在于一个”lock”的前缀。在CPU层面与主内存层面,通过缓存一致性协议,加锁后能够保证写的值同步到主内存,使其他线程能够获得最新的值。
volatile仅仅用在一个线程写,多个线程读的情况;如果涉及多个线程写的情况,就必须使用synchronized或者lock保证线程安全!

  1. synchronized关键字

synchronized关键字既可以保证代码块的原子性,又可以保证代码块内部变量的可见性;在Java内存模型中,synchronized规定,线程在加锁时,先清空工作内存→在主内存中拷贝最新变量的副本到工作内存 →执行完代码→将更改后的共享变量的值刷新到主内存中→释放互斥锁。

  1. public class _01MemoryModel{
  2. static boolean run = true;
  3. public static void main(String[] args) throws InterruptedException{
  4. new Thread(() -> {
  5. while(run){
  6. synchronized(_01MemoryModel.class){
  7. // run为真,循环一直执行
  8. }
  9. }
  10. }, "t1").start();
  11. Thread.sleep(1000);
  12. log.debug("runing...");
  13. run = false;
  14. }
  15. }

但是synchronized关键字属于重量级操作,性能相较volatile关键字较低;因此在只需要保证变量的可见性操作中,应优先考虑使用volatile关键字。

2.3 两阶段终止模式优化

2.4 同步模式之犹豫模式

3. 有序性

多线程的有序性问 题:Java虚拟机即时编译器(JIT)的优化,可能会导致指令重排
为什么要优化? 因为CPU 支持多级指令流水线,例如支持同时执行 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 的处理器。效率快 ~

指令重排
Java虚拟机在不影响代码执行正确性的前提下,可以调整代码的执行顺序,称之为指令重排优化
image.png
PS: 多线程下的指令重排可能会影响代码执行的正确性!!
在懒加载的双检锁单例模式中多线程对于单例的创建就出现了指令重排问题!!!

4. volatile原理

volatile.drawio (1).png

5. 单例模式

5.1 饿汉式单例

  1. final class Singleton01 implements Serializable {
  2. private Singleton01() {
  3. // 防止反射第二次调用私有构造方法
  4. if(INSTANCE != null){
  5. throw new RuntimeException("单例对象不能重复创建");
  6. }
  7. System.out.println("private Singleton01() ...");
  8. }
  9. private static final Singleton01 INSTANCE = new Singleton01();
  10. public static Singleton01 getInstance() {
  11. return INSTANCE;
  12. }
  13. public static void other () {
  14. System.out.println("other method ...");
  15. }
  16. // 防止被反序列化,如果发现重写了该方法,反序列化就会使用这个方法返回对象
  17. public Object readResolve() {
  18. return INSTANCE;
  19. }

5.2 枚举类饿汉式单例

  1. enum Singleton02 implements Serializable {
  2. INSTANCE;
  3. private Singleton02 () {
  4. System.out.println("private Singleton02 () ...");
  5. }
  6. public static Singleton02 getInstance() {
  7. return INSTANCE;
  8. }
  9. public static void other(){
  10. System.out.println("other method() ...");
  11. }
  12. @Override
  13. public String toString() {
  14. return getClass().getName() + "&" + Integer.toHexString(hashCode());
  15. }
  16. // 枚举类不会反序列化不会破坏单例
  17. public Object readResolve() {
  18. return INSTANCE;
  19. }
  20. }

5.3 懒汉式单例

  1. class Singleton03 implements Serializable {
  2. private Singleton03(){
  3. System.out.println("private Singleton03() ...");
  4. }
  5. private static volatile Singleton03 INSTANCE = null;
  6. public static synchronized Singleton03 getInstance() {
  7. if(INSTANCE == null){
  8. INSTANCE = new Singleton03();
  9. }
  10. return INSTANCE;
  11. }
  12. public static void other(){
  13. System.out.println("other method() ...");
  14. }
  15. }

5.4 双检索懒汉式单例

  1. class Singleton04 implements Serializable {
  2. private Singleton04(){
  3. System.out.println("private Singleton04() ...");
  4. }
  5. private static volatile Singleton04 INSTANCE = null;
  6. public static Singleton04 getInstance() {
  7. if(INSTANCE == null){ // 第一把锁是为了在INSTANCE != null时直接return, 避免无意义的加锁
  8. synchronized (Singleton04.class){
  9. if(INSTANCE == null){ // 第二把锁是为了避免多线程下new出多个对象
  10. INSTANCE = new Singleton04();
  11. }
  12. }
  13. }
  14. return INSTANCE;
  15. }
  16. public static void other(){
  17. System.out.println("other method() ...");
  18. }
  19. }

5.5 内部类懒汉式单例

  1. class Singleton05 implements Serializable {
  2. private Singleton05() {
  3. if(Inner.INSTANCE != null) {
  4. throw new RuntimeException("单例对象不能重复创建");
  5. }
  6. System.out.println("private Singleton05()...");
  7. }
  8. // 内部类:内部类只有在调用的时候还会加载类
  9. public static class Inner {
  10. static Singleton05 INSTANCE = new Singleton05();
  11. }
  12. public static Singleton05 getInstance() {
  13. return Inner.INSTANCE;
  14. }
  15. // 防止反序列化
  16. private Object readResolve(){
  17. return Inner.INSTANCE;
  18. }
  19. }

4. 无锁

5. 线程不可变

6. 线程池

池化技术 是一种常用的技术,比如线程池、数据库连接池、HTTP连接池等等都是使用了池化技术的思想。池化技术的思想是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池的好处:

  1. 降低资源消耗。通过重复利用已经创建好的线程来减少线程创建和销毁时带来的消耗。
  2. 提高响应速度。任务到达时,无须等待线程创创建就能立即执行任务
  3. 提高线程安全的可管理性。 线程是稀缺资源,无限制的创建线程资源,不仅会造成资源消耗,还会降低系统的稳定性,使用线程池进行统一的分配,调优和监控既可以避免线程无限制的创建。

    1. 自己定义一个简单的线程池

    代码逻辑

    未命名文件 (2).png
  • 主线程:生产者,产生任务放入阻塞队列中
  • 线程池:消费者,消费任务;获取阻塞队列中的任务并执行
  • 阻塞队列:维护主线程中产生的任务;主线程生产任务放入阻塞队列里,线程池从阻塞队列中获取任务执行

    1.1 定义阻塞队列

    1. class BlockingQueue<T> {
    2. // 1. 定义任务队列
    3. private Deque<T> queue = new ArrayDeque<T>();
    4. // 2. 声明锁
    5. ReentrantLock lock = new ReentrantLock();
    6. // 3. 生产者条件变量(阻塞队列塞满任务时,没有空间进入,进入此条件变量中)
    7. private Condition fullWaitSet = lock.newCondition();
    8. // 4. 消费者条件变量(阻塞队列空时,没有任务可取,进入此条件变量中)
    9. private Condition emptyWaitSet = lock.newCondition();
    10. // 5. 定义阻塞队列的容量
    11. private int capacity;
    12. public BlockingQueue(int capacity){
    13. this.capacity = capacity;
    14. }
    15. // 线程池从阻塞队列中获取任务,如果没有任务会一直等待
    16. public T take() {
    17. lock.lock();
    18. try{
    19. // 判断阻塞队列是否为空
    20. while(queue.isEmpty()){
    21. log.debug("阻塞队列为空,等待新任务的产生...");
    22. try {
    23. emptyWaitSet.await();
    24. } catch (InterruptedException e) {
    25. e.printStackTrace();
    26. }
    27. }
    28. // 阻塞队列不为空,获取队列头部任务
    29. T t = queue.removeFirst();
    30. // 队列中已有空间,唤醒生产者进行生产
    31. fullWaitSet.signal();
    32. return t;
    33. }
    34. finally {
    35. // 释放锁
    36. lock.unlock();
    37. }
    38. }
    39. // 其他线程往阻塞队列中添加任务
    40. public void put(T task){
    41. lock.lock();
    42. try{
    43. // 判断阻塞队列是否已满
    44. while(queue.size() == capacity){
    45. log.debug("阻塞队列已满,等待加入阻塞队列");
    46. try {
    47. fullWaitSet.await();
    48. } catch (InterruptedException e) {
    49. e.printStackTrace();
    50. }
    51. }
    52. // 阻塞队列不满,往阻塞队列中添加任务
    53. queue.addLast(task);
    54. log.debug("已往阻塞队列中加入任务{}", task);
    55. // 队列中已有任务,唤醒消费者消费任务
    56. emptyWaitSet.signal();
    57. }finally {
    58. lock.unlock();
    59. }
    60. }
    61. // 获取队列大小
    62. public int size(){
    63. lock.lock();
    64. try{
    65. return queue.size();
    66. }finally {
    67. lock.lock();
    68. }
    69. }
    70. }

    1.2 定义线程池

    ```java class ThreadPool{

    // 声明阻塞任务队列 private BlockingQueue taskQueue;

    // 声明线程集合 private HashSet workers = new HashSet<>();

    // 核心线程数 private int coreSize;

    // 获取任务的超时时间 private long timeOut; // 超时时间单位 private TimeUnit timeUnit;

    // 拒绝策略 private RejectPolicy rejectPolicy;

    // 构造函数 public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity){

    1. this.taskQueue = new BlockingQueue<Runnable>(queueCapacity);
    2. this.coreSize = coreSize;
    3. this.timeOut = timeOut;
    4. this.timeUnit = timeUnit;

    // this.rejectPolicy = rejectPolicy; }

    // 执行任务 public void execute(Runnable task){

    1. // 工作线程数 < 总线程数,worker可以执行这些任务,无须加入阻塞队列中
    2. if(workers.size() < coreSize){
    3. // 创建线程
    4. Worker worker = new Worker(task);
    5. log.debug("新增worker线程 {}, {}", worker, task);
    6. workers.add(worker);
    7. worker.start(); // 开启线程执行任务
    8. }else{
    9. // 线程数不够,加入阻塞队列
    10. taskQueue.put(task);
    11. }

    }

    /**

    • 线程池中的工作线程 */ class Worker extends Thread{

      // 声明线程要执行的任务 private Runnable task;

      public Worker(Runnable task){

      1. this.task = task;

      }

      @Override public void run(){

      1. /**
      2. * 1. task不为null时,执行任务
      3. * 2. task为null时,从阻塞队列中取出任务并执行
      4. */
      5. while(task != null || (task = taskQueue.take()) != null){
      6. try{
      7. log.debug("正在执行...{}", task);
      8. task.run();
      9. }finally {
      10. // 任务执行完毕,置为null
      11. task = null;
      12. }
      13. }
      14. // 任务执行完毕,将该线程移除
      15. synchronized (workers){
      16. log.debug("worker被移除 {}", this);
      17. workers.remove(this);
      18. }

      } }

}

  1. <a name="gj3eg"></a>
  2. ### 1.3 测试
  3. 创建五个任务,由线程池的两个线程轮流来执行,阻塞队列可以容纳10个任务!
  4. ```java
  5. class TestPool{
  6. // 任务生产者
  7. public static void main(String[] args){
  8. ThreadPool threadPool = new ThreadPool(2, 1, TimeUnit.SECONDS, 3);
  9. // 创建5个任务
  10. for(int i = 0; i < 5; i++){
  11. int j = i;
  12. threadPool.execute(new Runnable(){
  13. @Override
  14. public void run(){
  15. try {
  16. log.debug("正在执行任务{}...", j);
  17. Thread.sleep(1000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. log.debug("{}", j);
  22. }
  23. });
  24. }
  25. }
  26. }
  1. 正在创建第1个任务!!!
  2. 15:39:04.532 h.ThreadPool [main] - 新增worker线程 0 _06线程池.TestPool$1@1ef7fe8e
  3. 正在创建第2个任务!!!
  4. 15:39:04.536 h.ThreadPool [main] - 新增worker线程 1 _06线程池.TestPool$1@50675690
  5. 15:39:04.536 h.ThreadPool [线程0] - 正在执行..._06线程池.TestPool$1@1ef7fe8e
  6. 15:39:04.536 h.TestPool [线程0] - 正在执行任务1...等待一秒...
  7. 正在创建第3个任务!!!
  8. 15:39:04.536 h.BlockingQueue [main] - 已往阻塞队列中加入任务_06线程池.TestPool$1@3ac42916
  9. 正在创建第4个任务!!!
  10. 15:39:04.536 h.ThreadPool [线程1] - 正在执行..._06线程池.TestPool$1@50675690
  11. 15:39:04.536 h.TestPool [线程1] - 正在执行任务2...等待一秒...
  12. 15:39:04.536 h.BlockingQueue [main] - 已往阻塞队列中加入任务_06线程池.TestPool$1@47d384ee
  13. 正在创建第5个任务!!!
  14. 15:39:04.536 h.BlockingQueue [main] - 已往阻塞队列中加入任务_06线程池.TestPool$1@2d6a9952
  15. 15:39:05.537 h.TestPool [线程1] - 任务2执行完毕...
  16. 15:39:05.537 h.TestPool [线程0] - 任务1执行完毕...
  17. 15:39:05.537 h.ThreadPool [线程0] - 正在执行..._06线程池.TestPool$1@47d384ee
  18. 15:39:05.537 h.TestPool [线程0] - 正在执行任务4...等待一秒...
  19. 15:39:05.537 h.ThreadPool [线程1] - 正在执行..._06线程池.TestPool$1@3ac42916
  20. 15:39:05.537 h.TestPool [线程1] - 正在执行任务3...等待一秒...
  21. 15:39:06.537 h.TestPool [线程1] - 任务3执行完毕...
  22. 15:39:06.537 h.TestPool [线程0] - 任务4执行完毕...
  23. 15:39:06.537 h.ThreadPool [线程1] - 正在执行..._06线程池.TestPool$1@2d6a9952
  24. 15:39:06.537 h.TestPool [线程1] - 正在执行任务5...等待一秒...
  25. 15:39:06.537 h.BlockingQueue [线程0] - 阻塞队列为空,等待新任务的产生...
  26. 15:39:07.537 h.TestPool [线程1] - 任务5执行完毕...
  27. 15:39:07.537 h.BlockingQueue [线程1] - 阻塞队列为空,等待新任务的产生...
  28. ...
  29. 后面会一直死等,因为线程一直等待从阻塞队列中取任务

1.4 拒绝策略

拒绝策略是指当阻塞队列已满时,核心线程的选择;在上面的逻辑中,当阻塞队列已满、核心线程都在工作时。新任务一直等待加入阻塞队列中;可以选择设置不同的状态,比如带超时时间等待、抛出异常等等;拒绝策略将状态的设置交给用户,使得线程池的功能更加丰富,扩展性更强!

  1. @FunctionalInterface // 拒绝策略
  2. interface RejectPolicy<T> {
  3. void reject(BlockingQueue<T> queue, T task);
  4. }
  5. class ThreadPool{
  6. // 成员变量与上面一样
  7. //................
  8. // 拒绝策略
  9. private RejectPolicy<Runnable> rejectPolicy;
  10. // 构造函数
  11. public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy){
  12. this.taskQueue = new BlockingQueue<Runnable>(queueCapacity);
  13. this.coreSize = coreSize;
  14. this.timeOut = timeOut;
  15. this.timeUnit = timeUnit;
  16. this.rejectPolicy = rejectPolicy;
  17. }
  18. // 执行任务
  19. public void execute(Runnable task){
  20. // 工作线程数 < 总线程数,worker可以执行这些任务,无须加入阻塞队列中
  21. if(workers.size() < coreSize){
  22. // 创建线程
  23. Worker worker = new Worker(task, "线程" + workers.size());
  24. log.debug("新增worker线程 {}, {}", workers.size(), task);
  25. workers.add(worker);
  26. worker.start(); // 开启线程执行任务
  27. }else{
  28. // 线程数不够,加入阻塞队列
  29. // taskQueue.put(task);
  30. taskQueue.tryPut(rejectPolicy, task);
  31. }
  32. }
  33. /**
  34. * 线程池中的工作线程
  35. */
  36. class Worker extends Thread{
  37. // 声明线程要执行的任务
  38. private Runnable task;
  39. public Worker(Runnable task, String name){
  40. super(name);
  41. this.task = task;
  42. }
  43. @Override
  44. public void run(){
  45. /**
  46. * 1. task不为null时,执行任务
  47. * 2. task为null时,从阻塞队列中取出任务并执行
  48. */
  49. while(task != null || (task = taskQueue.poll(1000, TimeUnit.MILLISECONDS)) != null){
  50. try{
  51. log.debug("正在执行...{}", task);
  52. task.run();
  53. }finally {
  54. // 任务执行完毕,置为null
  55. task = null;
  56. }
  57. }
  58. // 任务执行完毕,将该线程移除
  59. synchronized (workers){
  60. log.debug("worker被移除 {}", this);
  61. workers.remove(this);
  62. }
  63. }
  64. }
  65. }
  66. class BlockingQueue<T> {
  67. // 1. 定义任务队列
  68. private Deque<T> queue = new ArrayDeque<T>();
  69. // 2. 声明锁
  70. ReentrantLock lock = new ReentrantLock();
  71. // 3. 生产者条件变量(阻塞队列塞满任务时,没有空间进入,进入此条件变量中)
  72. private Condition fullWaitSet = lock.newCondition();
  73. // 4. 消费者条件变量(阻塞队列空时,没有任务可取,进入此条件变量中)
  74. private Condition emptyWaitSet = lock.newCondition();
  75. // 5. 定义阻塞队列的容量
  76. private int capacity;
  77. public BlockingQueue(int capacity){
  78. this.capacity = capacity;
  79. }
  80. // 带超时等待,获取阻塞队列中的任务
  81. public T poll(long timeout, TimeUnit timeUnit){
  82. lock.lock();
  83. try{
  84. long nanos = timeUnit.toNanos(timeout);
  85. // 判断阻塞队列是否为空
  86. while(queue.isEmpty()){
  87. // 如果超时等待时间小于0,返回null
  88. if(nanos <= 0){
  89. return null;
  90. }
  91. try {
  92. // 返回时间 = 等待的时间 - 经过的时间,可以防止虚假唤醒
  93. nanos = emptyWaitSet.awaitNanos(nanos);
  94. } catch (InterruptedException e) {
  95. e.printStackTrace();
  96. }
  97. }
  98. // 阻塞队列不为空
  99. T t = queue.removeFirst();
  100. fullWaitSet.signal();
  101. return t;
  102. }finally {
  103. lock.unlock();
  104. }
  105. }
  106. // 带超时时间,往阻塞队列中添加任务:阻塞队列一直为满,等待超时则放弃等待
  107. public boolean offer(T task, long timeout, TimeUnit timeUnit){
  108. lock.lock();
  109. try{
  110. long nanos = timeUnit.toNanos(timeout);
  111. // 阻塞队列已满
  112. while(queue.size() == capacity){
  113. if(nanos <= 0){
  114. // 超时,放弃等待
  115. log.debug("已超时,放弃等待添加新任务...");
  116. return false;
  117. }
  118. try {
  119. nanos = fullWaitSet.awaitNanos(nanos);
  120. } catch (InterruptedException e) {
  121. e.printStackTrace();
  122. }
  123. }
  124. // 阻塞队列有空闲
  125. queue.addLast(task);
  126. emptyWaitSet.signal(); // 唤醒阻塞队列空条件
  127. return true;
  128. }finally {
  129. lock.unlock();
  130. }
  131. }
  132. public void tryPut(RejectPolicy<T> rejectPolicy, T task){
  133. lock.lock();
  134. try{
  135. // 阻塞队列已满
  136. if(queue.size() == capacity){
  137. // 根据拒绝策略选择线程等待阻塞队列中的任务方式
  138. rejectPolicy.reject(this, task);
  139. }else{
  140. // 阻塞队列未满,可以继续添加任务
  141. queue.addLast(task);
  142. }
  143. }finally {
  144. lock.unlock();
  145. }
  146. }
  147. }

带拒绝策略测试

  1. public class TestPool {
  2. public static void main(String[] args) {
  3. ThreadPool threadPool = new ThreadPool(1,
  4. 1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
  5. // 1. 死等
  6. // queue.put(task);
  7. // 2) 带超时等待
  8. queue.offer(task, 1500, TimeUnit.MILLISECONDS);
  9. // 3) 让调用者放弃任务执行
  10. // log.debug("放弃{}", task);
  11. // 4) 让调用者抛出异常
  12. // throw new RuntimeException("任务执行失败 " + task);
  13. // 5) 让调用者自己执行任务
  14. // task.run();
  15. });
  16. for (int i = 0; i < 4; i++) {
  17. int j = i;
  18. threadPool.execute(() -> {
  19. try {
  20. Thread.sleep(1000L);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. log.debug("{}", j);
  25. });
  26. }
  27. }
  28. }

1.2 JDK中的线程池 ThreadPoolExexutor

1. 线程池状态

ThreadPoolExexutor 使用int的高三位表示线程池状态,低29位表示线程数量

状态名称 高3位的值 描述
RUNNING 111 接收新任务,同时处理任务队列中的任务
SHUTDOWN 000 不接受新任务,但是处理任务队列中的任务
STOP 001 中断正在执行的任务,同时抛弃阻塞队列中的任务
TIDYING 010 任务执行完毕,活动线程为0时,即将进入终结阶段
TERMINATED 011 终结状态