1. 基本概念

1.1 程序、进程、线程、多线程

  • 程序(program)
    • 是为完成特定任务、用某种语言编写的一组指令的集合。即指一段静态的代码,静态对象。
  • 进程(process)
    • 是程序的一次执行过程,或是正在运行的一个程序。是一个动态的过程,有它自身的产生、存在和消亡的过程,即生命周期。
    • 进程作为资源分配的单位,系统在运行时会为每个进程分配不同的内存区域,即堆、方法区。
  • 线程(thread)
    • 进程可进一步细化为线程,是一个程序内部的一条执行路径。
    • 若一个进程同一时间并行执行多个线程,就是支持多线程的。
    • 线程作为调度和执行的单位,每个线程拥有独立的运行栈和程序计数器 pc,线程切换的开销小。
    • 一个进程中的多个线程共享这个进程的堆和方法区资源,但每个线程拥有自己独立的程序计数器、虚拟机栈和本地方法栈,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小。
    • 一个进程中的多个线程共享相同的内存单元/内存地址空间,它们从同一堆中分配对象,可以访问相同的变量和对象,这就使得线程间通信更简便、高效。但多个线程操作共享的系统资源可能就会带来安全的隐患。
    • 线程就是能并行运行并且与他们的父进程(创建他们的进程)共享同一地址空间(一段内存区域)和其他资源的轻量级的进程。
  • 单核CPU和多核CPU
    • 单核CPU,假的多线程,因为在一个时间单元内,只能执行一个线程的任务。
    • 多核CPU,才能更好的发挥多线程的效率。现在的服务器都是多核的。
    • 一个Java应用程序java.exe,至少有三个线程:main()主线程,gc()垃圾回收线程,异常处理线程。如果发生异常,会影响主线程。
  • 并行与并发
    • 并行:多个CPU同时执行多个任务。比如:多个人同时做不同的事。
    • 并发:一个CPU(采用时间片)同时执行多个任务。比如:秒杀,多个人做同一件事。
  • 使用多线程的优点
    • 提高应用程序的响应。对图形化界面更有意义,可增强用户体验。
    • 提高计算机系统CPU的利用率。
    • 改善程序结构,将既长又复杂的进程分为多个线程,独立运行,利于理解和修改。
  • 何时需要多线程

    • 程序需要同时执行两个或多个任务。
    • 程序需要实现一些需要等待的任务时,如用户输入、文件读写操作、网络操作、搜索等。
    • 需要一些后台运行的程序时。

      1.2 线程上下文切换

      巧妙地利用了时间片轮转的方式, CPU 给每个任务都服务一定的时间,然后把当前任务的状态保存
      下来,在加载下一任务的状态后,继续服务下一任务,任务的状态保存及再加载, 这段过程就叫做
      上下文切换。时间片轮转的方式使多个任务在同一颗 CPU 上执行变成了可能。
      image.png
      上下文
      是指某一时间点 CPU 寄存器和程序计数器的内容。
      寄存器
      是 CPU 内部的数量较少但是速度很快的内存(与之对应的是 CPU 外部相对较慢的 RAM 主内存)。寄存器通过对常用值(通常是运算的中间值)的快速访问来提高计算机程序运行的速度。
      程序计数器
      是一个专用的寄存器,用于表明指令序列中 CPU 正在执行的位置,存的值为正在执行的指令的位置或者下一个将要被执行的指令的位置,具体依赖于特定的系统。
      PCB-“切换桢”
      上下文切换可以认为是内核(操作系统的核心)在 CPU 上对于进程(包括线程)进行切换,上下
      文切换过程中的信息是保存在进程控制块(PCB, process control block)中的。PCB 还经常被称
      作“切换桢”(switchframe)。信息会一直保存到 CPU 的内存中,直到他们被再次使用。
      上下文切换的活动
      1. 挂起一个进程,将这个进程在 CPU 中的状态(上下文)存储于内存中的某处。
      2. 在内存中检索下一个进程的上下文并将其在 CPU 的寄存器中恢复。
      3. 跳转到程序计数器所指向的位置(即跳转到进程被中断时的代码行),以恢复该进程在程序
      中。
      引起线程上下文切换的原因
      1. 当前执行任务的时间片用完之后,系统 CPU 正常调度下一个任务;
      2. 当前执行任务碰到 IO 阻塞,调度器将此任务挂起,继续下一任务;
      3. 多个任务抢占锁资源,当前任务没有抢到锁资源,被调度器挂起,继续下一任务;
      4. 用户代码挂起当前任务,让出 CPU 时间;
      5. 硬件中断;

      1.3 JUC:JAVA 并发知识库

      JUC包参考自EDU.oswego.cs.dl.util.concurrent,是JSR 166标准规范的一个实现。JSR 166是一个关于Java并发编程的规范提案,在JDK中该规范由java.util.concurrent包实现。
      在多线程并发编程中,java.util.concurrent非常重要,里面提供的方法类主要分这几类:
  • tools:CountDownLatch(闭锁)、CyclicBarrier(栅栏)、Semaphore(信号量)等。

  • locks:Lock、ReentrantLock(重入锁)、ReadWritLock(读写锁)等。
  • executor:Executor(线程池)、Future、Callable等。
  • collection:ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue(阻塞队列)等。
  • atomic:AtomicInteger、AtomicLong等。

5 多线程 - 图2
5 多线程 - 图3

2. 线程的创建和使用

2.1 Thread类

JVM允许程序运行多个线程,它通过java.lang.Thread类来体现。Thread 类本质上是实现了 Runnable 接口的一个实例,代表一个线程的实例。public class Thread implements Runnbale

Thread类的特性

  • 每个线程都是通过某个特定Thread对象的run()方法来完成操作的,把run()方法的主体称为线程执行体
  • 通过该Thread对象的start()方法来启动这个线程,而非直接调用run()。启动线程的唯一方法就是通过 Thread 类的 start()实例方法。start()方法是一个 native 方法,它将启动一个新线程,并执行run()方法。
  • native方法:让不同的编程语言为Java所用。使用了native的方法会进入本地方法栈,然后调用(底层C或者C++写的) 本地方法接口JNI(java Native Interface),通过本地方法接口调用(底层C或者C++代码写的)本地方法库Native Method Labrary。比如,多线程的start()方法内调用了start0()。

Thread类的构造器

  • Thread():创建新的Thread对象,即创建一个线程
  • Thread(String name):创建一个线程并指定线程实例名
  • Thread(Runnable target):创建一个线程并指定线程的目标对象,它实现了Runnable接口中的run方法
  • Thread(Runnable target, String name):创建一个线程并指定线程实例名和线程的目标对象。参数 name为线程名,参数 target为包含线程体的目标对象。

    2.2 创建线程的方式一:继承Thread类

  • 步骤:

1) 定义子类,继承Thread类;
2) 子类中重写Thread类中的run()方法,将线程需要执行的操作声明在run()中;
3) 创建Thread类的子类的对象,即创建线程对象;
4) 调用线程对象的start()方法,start()方法执行了两步:①启动当前线程 ②调用当前线程的run()方法。

  • 注意点:

    • 要启动多线程,必须调用start()方法。如果自己手动调用run()方法,那只是普通方法,并没有启动多线程模式。
    • run()方法由JVM调用,什么时候调用,执行的过程控制都由操作系统的CPU调度决定。
    • 一个线程对象只能调用一次start()方法启动,如果重复调用,将抛出IllegalThreadStateException异常。即不能让已经start()的线程再去执行start()。
      1. class MyThread extends Thread {//1. 创建一个继承于Thread类的子类
      2. @Override
      3. public void run() {//2. 子类中重写Thread类的run()
      4. for (int i = 0; i < 100; i++) {
      5. if(i % 2 == 0){
      6. System.out.println(Thread.currentThread().getName() + ":" + i);
      7. }
      8. }
      9. }
      10. }
      11. public class ThreadTest {
      12. public static void main(String[] args) {
      13. //子线程1
      14. MyThread t1 = new MyThread();//3. 创建Thread类的子类的对象
      15. t1.start();//4.通过此对象调用start():①启动当前线程 ② 调用当前线程的run()
      16. //t1.run();//不能通过直接调用run()的方式启动线程。
      17. //t1.start();/不能让已经执行start()的线程再去执行start()
      18. //子线程2
      19. MyThread t2 = new MyThread();//需要重新创建一个线程的对象
      20. t2.start();
      21. //main线程
      22. for (int i = 0; i < 100; i++) {//如下操作仍然是在main线程中执行的
      23. if(i % 2 == 0){
      24. System.out.println(Thread.currentThread().getName() + ":" + i + "***********main()************");
      25. }
      26. }
      27. }
      28. }

      2.3 创建线程的方式二:实现Runnable接口

  • 步骤:

1) 定义实现类,实现Runnable接口;
2) 实现类中实现Runnable接口中的run()方法,将线程需要执行的操作声明在run()中;
3) 创建Runnable接口的实现类的对象,将Runnable接口的实现类的对象作为实际参数传递给Thread类的含参构造器中,创建Thread类的对象,即创建线程对象;
4) 调用线程对象的start()方法,start()方法执行了两步:①启动当前线程 ②调用当前线程的run()方法。

  • 适用于:如果自己的类已经继承另一个类,就无法直接继承Thread,此时可以实现一个Runnable 接口。

    1. class MThread implements Runnable{//1. 创建一个实现了Runnable接口的类
    2. @Override
    3. public void run() {//2. 实现类去实现Runnable中的run()
    4. for (int i = 0; i < 100; i++) {
    5. if(i % 2 == 0){
    6. System.out.println(Thread.currentThread().getName() + ":" + i);
    7. }
    8. }
    9. }
    10. }
    11. public class ThreadTest1 {
    12. public static void main(String[] args) {
    13. MThread mThread = new MThread();//3.创建实现类的对象
    14. Thread t1 = new Thread(mThread);//4.将实现类的对象作为参数传递到Thread类的构造器中,创建Thread类的对象
    15. t1.setName("线程1");
    16. t1.start();//5.通过Thread类的对象调用start()
    17. Thread t2 = new Thread(mThread);//再创建一个线程对象
    18. t2.setName("线程2");
    19. t2.start();
    20. }
    21. }

    2.4 创建线程的方式三:实现Callable接口

  • 步骤

1) 定义实现类,实现Callable接口;
2) 实现类中实现Callable接口中的call()方法,将线程需要执行的操作写在call()中,且call()方法有返回值;
3) 创建Callable接口的实现类的对象,将Callable接口的实现类的对象作为实际参数传递给FutureTask类的含参构造器,创建FutureTask类的对象,该FutureTask对象封装了该Callable对象的call()方法的返回值;
4) 将FutureTask类的对象作为实际参数传递给Thread类的含参构造器中,创建Thread类的对象,即创建线程对象;
5) 调用线程对象的start()方法,start()方法执行了两步:①启动当前线程 ②调用当前线程的run()方法。
6) 获取Callable接口的实现类的对象所在实现类中call()方法的返回值:通过调用FutureTask对象的get()方法

  • 与实现Runnable接口相比,实现Callable接口创建线程的方式,功能更强大:
    • 相比Runnable接口的run()方法,Callable接口的call()方法可以有返回值
      • 借助FutureTask类,获取Callable任务的返回值:通过FutureTask的对象.get()方法,获取创建此FutureTask对象时,传入FutureTask含参构造器参数的,Callable接口的实现类的对象所在实现类中重写的call()方法的返回值。
      • 返回值可以给别的线程使用
      • call()方法支持泛型的返回值
    • call()方法可以抛出异常,被外面的操作捕获,获取异常的信息
  • Future接口、FutrueTask实现类:
    • 可以对具体Runnable、Callable任务的执行结果进行取消、查询是否完成、获取结果等。
    • FutrueTask是Futrue接口的唯一的实现类
    • FutureTask同时实现了Runnable, Future接口。它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值
  • 适用于:

有返回值的任务必须实现 Callable 接口。

  • 继承方式和实现方式的联系与区别:
    • 联系:public class Thread implements Runnbale
    • 共同点:
      • 两种方式都需要重写run()方法,将线程要执行的逻辑声明在run()方法中
      • 要想启动线程,都是调用start()方法
    • 区别:
      • 继承Thread类,线程代码存放Thread类的子类run方法中。
      • 实现Runnable/Runnable接口,线程代码存在接口的实现类的run方法。
    • 实现方式的优缺点:
      • 优:
        • 避免了类的单继承的局限性。线程类只是实现了Runnable或Callable接口,还可以继承其他类;
        • 在这种方式下,多个线程可以共享同一个接口实现类的对象(target),所以非常适合多个相同线程来处理同一份资源的情况,从而可以将CPU、代码和数据分开,形成清晰的模型,较好地体现了面向对象的思想。
      • 缺:编程稍稍复杂,如果需要访问当前线程,则必须使用Thread.currentThread()方法。
    • 继承方式的优缺点:
      • 缺:因为线程类已经继承了Thread类,所以不能再继承其他父类。
      • 优:编写简单,如果需要访问当前线程,则无须使用Thread.currentThread()方法,直接使用this即可获得当前线程。
    • 开发中优先选择:实现接口的方式来创建多线程。
      1. class NumThread implements Callable{//1.创建一个实现Callable的实现类
      2. @Override
      3. public Object call() throws Exception {//2.实现call方法,将此线程需要执行的操作声明在call()中
      4. int sum = 0;
      5. for (int i = 1; i <= 100; i++) {
      6. if(i % 2 == 0){
      7. System.out.println(i);
      8. sum += i;
      9. }
      10. }
      11. return sum;
      12. }
      13. }
      14. public class ThreadNew {
      15. public static void main(String[] args) {
      16. //3.创建Callable接口实现类的对象,将此Callable接口实现类的对象作为传递到FutureTask构造器中,创建FutureTask的对象
      17. NumThread numThread = new NumThread();
      18. FutureTask futureTask = new FutureTask(numThread);
      19. //4.将FutureTask的对象作为参数传递到Thread类的构造器中,创建Thread对象
      20. new Thread(futureTask).start();//5.调用线程对象的start()方法
      21. try {
      22. //6.获取Callable接口实现类的对象所在实现类中call()方法的返回值
      23. //FutureTask的对象.get()方法的返回值,即为创建此FutureTask对象时,
      24. //传入FutureTask含参构造器参数的Callable接口实现类对象所在实现类重写的call()的返回值
      25. Object sum = futureTask.get();
      26. System.out.println("总和为:" + sum);
      27. } catch (InterruptedException e) {
      28. e.printStackTrace();
      29. } catch (ExecutionException e) {
      30. e.printStackTrace();
      31. }
      32. }
      33. }

      2.5 创建线程的方式四:使用线程池

      具体看6.线程池

      2.6 线程的常用方法

      线程等待(wait)
      调用该方法的线程进入 WAITING 状态,只有等待另外线程的通知或被中断才会返回,需要注意的是调用 wait()方法后,会释放对象的锁。因此,wait 方法一般要用在同步方法或同步代码块中。

线程睡眠(sleep)
sleep 导致当前线程休眠,与 wait 方法不同的是 sleep 不会释放当前占有的锁,sleep(long)会导致线程进入 TIMED-WATING 状态,而 wait()方法会导致当前线程进入 WATING 状态。

sleep 与 wait 区别
sleep()方法导致了程序暂停执行指定的时间,让出 cpu 该其他线程,但是他的监控状态依然保持者,当指定的时间到了又会自动恢复可运行状态。在调用 sleep()方法的过程中,线程不会释放当前当有的对象锁。

而当调用 wait()方法的时候,线程会放弃对象锁,进入等待此对象的等待锁定池,只有针对此
对象调用 notify()方法后本线程才进入对象锁定池准备获取对象锁进入可运行状态。

sleep() wait()
定义或属于 Thread类中的静态方法 Object类中的成员方法
使用位置 在任何地方使用 在同步方法或同步代码块中使用
线程会不会释放锁 不会释放锁,不需要手动唤醒,时间到后进入Runnable状态 会释放锁,需要通过notify()/notifyAll()唤醒到Runnable状态,重新获取锁
调用后 会导致线程进入TIMED-WATING 状态 会导致当前线程进入 WATING 状态
抛出异常 都会抛出InterruptedException异常

线程让步(yield)
yield 会使当前线程让出 CPU 执行时间片,与其他线程一起重新竞争 CPU 时间片。一般情况下,优先级高的线程有更大的可能性成功竞争得到 CPU 时间片,但这又不是绝对的,有的操作系统对线程优先级并不敏感。

线程中断(interrupt)
中断一个线程,其本意是给这个线程一个通知信号,会影响这个线程内部的一个中断标识位。这个线程本身并不会因此而改变状态(如阻塞,终止等)。
1. 调用 interrupt()方法并不会中断一个正在运行的线程。也就是说处于 Running 状态的线程并不会因为被中断而被终止,仅仅改变了内部维护的中断标识位而已。
2. 若调用 sleep()而使线程处于 TIMED-WATING 状态,这时调用 interrupt()方法,会抛出InterruptedException,从而使线程提前结束 TIMED-WATING 状态。
3. 许多声明抛出 InterruptedException 的方法(如 Thread.sleep(long mills 方法)),抛出异常前,都会清除中断标识位,所以抛出异常后,调用 isInterrupted()方法将会返回 false。
4. 中断状态是线程固有的一个标识位,可以通过此标识位安全的终止线程。比如,你想终止一个线程 thread 的时候,可以调用 thread.interrupt()方法,在线程的 run 方法内部可以根据 thread.isInterrupted()的值来优雅的终止线程。

等待其他线程终止(join)
join() 方法,等待其他线程终止,在当前线程中调用一个线程的 join() 方法,则当前线程转为阻塞状态,回到另一个线程结束,当前线程再由阻塞状态变为就绪状态,等待 cpu 的宠幸。

为什么要用 join()方法
很多情况下,主线程生成并启动了子线程,需要用到子线程返回的结果,也就是需要主线程需要在子线程结束后再结束,这时候就要用到 join() 方法。

  1. System.out.println(Thread.currentThread().getName() + "线程运行开始!");
  2. Thread6 thread1 = new Thread6();
  3. thread1.setName("线程 B");
  4. thread1.join();
  5. System.out.println("这时 thread1 执行完毕之后才能执行主线程");

线程唤醒(notify)
Object 类中的 notify() 方法,唤醒在此对象监视器上等待的单个线程,如果所有线程都在此对象上等待,则会选择唤醒其中一个线程,选择是任意的,并在对实现做出决定时发生,线程通过调用其中一个 wait() 方法,在对象的监视器上等待,直到当前的线程放弃此对象上的锁定,才能继续执行被唤醒的线程,被唤醒的线程将以常规方式与在该对象上主动同步的其他所有线程进行竞争。类似的方法还有 notifyAll() ,唤醒再次监视器上等待的所有线程。

start 与 run 区别:为什么调用start()方法时会执行run()方法,为什么不能直接调用run()方法?
new一个Thread,线程进入了新建状态。调用start()方法,会启动一个线程并使线程进入了就绪状态,当分配到时间片后就可以开始运行。start()会执行线程的相应准备工作,然后自动执行run()方法中的内容,这是真正的多线程工作。 但是,直接执行run()方法,会把run()方法当成一个main线程下的普通方法去执行,并不会以多线程的方式去运行。

  1. start()方法来启动线程,真正实现了多线程运行。这时无需等待run方法体代码执行完毕,可以继续执行下面的代码。通过调用 Thread 类的 start()方法来启动一个线程, 这时线程是处于就绪状态,并没有立即运行。
  2. run()方法称为线程体,它包含了要执行的这个线程的内容,若直接调用线程的run()方法,该线程就进入了运行状态,开始运行 run 函数当中的代码,run 方法运行结束, 此线程终止,然后 CPU 再调度其它线程。

其他方法

  1. Thread类的常用静态方法:
  • static Thread currentThread(): 静态方法,返回当前正在执行的线程。在Thread子类中就是this,通常用于主线程和Runnable实现类
  • interrupted():返回当前执行的线程是否已经被中断
  • static void sleep(long millis): 静态方法,强迫当前执行的线程睡眠多少毫秒数。令当前活动线程在指定时间段内放弃对CPU控制,使其他线程有机会被执行,时间到后重排队(进入就绪状态)。即让当前线程睡眠指定的毫秒时间,在指定的时间内,当前线程是阻塞状态。抛出InterruptedException异常
    • object.wai():Object类提供的线程等待方法。强迫一个线程等待。
    • object.notify()、object.notifyAll():Object类提供的线程唤醒方法。通知一个线程继续运行。
  • static void yield(): 线程让步,自愿释放当前cpu的执行权。暂停当前正在执行的线程,把CPU执行机会让给优先级相同或更高的线程。若队列中没有同优先级的线程,忽略此方法


  1. Thread类的常用实例方法:
  • void start(): 启动当前线程,并调用当前线程的的run()方法
  • run(): 线程在被调度时执行的操作。通常需要重写Thread类中的run()方法,将创建的线程要执行的操作声明在run()方法中
  • join(): 在线程a中调用线程b的join()方法,此时线程a就进入阻塞状态,需要等待直到线程b完全执行完以后,线程a才结束阻塞状态。低优先级的线程也可以获得执行
  • join(long millis):等待线程b终止,最多等待多少毫秒数
  • stop(): 强制当前线程生命期结束。此方法已过时,不推荐使用
  • getId():返回当前线程的id
  • String getName(): 返回当前线程的名称
  • void setName(String name): 为当前线程设置一个名称
  • getPriority(): 返回当前线程优先级等级
  • setPriority(int newPriority): 设置当前线程的优先级等级
  • interrupt():使该线程中断;
  • isInterrupted():返回该线程是否被中断
  • isDaemon():返回该线程是否是守护线程
  • setDaemon(boolean on):将该线程标记为守护线程或用户线程,如果不标记默认是非守护线程
  • boolean isAlive(): 判断当前线程是否还处于活动状态
  • activeCount():程序中活跃的线程数。
  • enumerate():枚举程序中的线程。

    2.6 终止线程四种方式

  1. 正常运行结束

程序运行结束,线程自动结束。

  1. 使用退出标志退出线程

一般 run()方法执行完,线程就会正常结束,然而,常常有些线程是伺服线程。它们需要长时间的运行,只有在外部某些条件满足的情况下,才能关闭这些线程。使用一个变量来控制循环,例如:最直接的方法就是设一个 boolean 类型的标志,并通过设置这个标志为 true 或 false 来控制 while循环是否退出。

伺服线程是指只有获得某种信号,才会停止的线程。最常见的就是在while循环当中的线程,设置一个boolean类型的标志,来退出循环。将伺服线程放到while循环当中,通过控制boolean类型的标志,来控制线程是否结束。

  1. public class ThreadSafe extends Thread {
  2. public volatile boolean exit = false;//退出标志
  3. public void run() {
  4. while (!exit){
  5. //do something
  6. }
  7. }
  8. }

定义了一个退出标志 exit,当 exit 为 true 时,while 循环退出,exit 的默认值为 false.在定义 exit时,使用了一个 Java 关键字 volatile,这个关键字的目的是使 exit 同步,也就是说在同一时刻只能由一个线程来修改 exit 的值。

  1. Interrupt 方法结束线程

使用 interrupt()方法来中断线程有两种情况:
(1)线程处于阻塞状态:如使用了 sleep,同步锁的 wait,socket 中的 receiver,accept 等方法时,会使线程处于阻塞状态。当调用线程的 interrupt()方法时,会抛出 InterruptException 异常。阻塞中的那个方法抛出这个异常,通过try-catch结构,在catch块内捕获该异常,然后利用 break 跳出循环状态,从而让我们有机会结束这个线程的执行。通常很多人认为只要调用 interrupt 方法线程就会结束,实际上是错的, 一定要先捕获InterruptedException 异常之后通过 break 来跳出循环,才能正常结束 run 方法,才能结束线程。
(2)线程未处于阻塞状态:使用 isInterrupted()判断线程的中断标志来退出循环。当使用interrupt()方法时,中断标志会置 true,和使用自定义的标志(终止线程的方式二:使用退出标志退出线程)来控制循环是一样道理。

  1. public class ThreadSafe extends Thread {
  2. public void run() {
  3. while (!isInterrupted()){ //非阻塞过程中通过判断中断标志来退出
  4. try{
  5. Thread.sleep(5*1000);//阻塞过程捕获中断异常来退出
  6. }catch(InterruptedException e){
  7. e.printStackTrace();
  8. break;//捕获到异常之后,执行 break 跳出循环
  9. }
  10. }
  11. }
  12. }
  1. stop 方法终止线程(线程不安全)

一旦调用stop()方法,创建并启动子线程的线程会抛出错误,并释放子线程所持有的所有锁。这个时候数据是不受保护的,其他线程进来很可能会触发一些奇怪的错误。

程序中可以直接使用 thread.stop()来强行终止线程,但是 stop 方法是很危险的,就像突然关闭计算机电源,而不是按正常程序关机一样,可能会产生不可预料的结果,不安全主要是:thread.stop()调用之后,创建子线程的线程就会抛出 ThreadDeatherror 的错误,并且会释放子线程所持有的所有锁。一般任何进行加锁的代码块,都是为了保护数据的一致性,如果在调用thread.stop()后导致了该线程所持有的所有锁的突然释放(不可控制),那么被保护数据就有可能呈现不一致性,其他线程在使用这些被破坏的数据时,有可能导致一些很奇怪的应用程序错误。因此,并不推荐使用 stop 方法来终止线程。

2.7 JAVA 线程的分类:后台、用户线程

  1. Java中的线程分为两类:一种是守护线程(JAVA服务线程/后台线程/Daemon),一种是用户线程。
  2. 守护线程和用户线程在几乎每个方面都是相同的,唯一的区别是判断JVM何时离开,即是否等待主线程依赖于主线程结束而结束。
  3. 守护线程是用来为用户线程提供公共服务,在没有用户线程可服务时会自动离开。即若JVM中都是守护线程时,当前JVM将退出。
  4. 优先级:守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务。
  5. 通过在调用线程.start()方法前调用线程.setDaemon(true)方法,可以将一个用户线程设置为守护线程。
  6. 在守护线程中产生的新线程也是守护线程。
  7. 线程则是 JVM 级别的,以 Tomcat 为例,如果你在 Web 应用中启动一个线程,这个线程的生命周期并不会和 Web 应用程序保持同步。也就是说,即使你停止了 Web 应用,这个线程依旧是活跃的。
  8. 垃圾回收线程就是一个经典的守护线程,当我们的程序中不再有任何运行的Thread,程序就不会再产生垃圾,垃圾回收器也就无事可做,所以当垃圾回收线程是 JVM 上仅剩的线程时,垃圾回收线程会自动离开。它始终在低级别的状态中运行,用于实时监控和管理系统中的可回收资源。
  9. 生命周期:守护线程是运行在后台的一种特殊线程。它独立于控制终端并且周期性地执行某种任务或等待处理某些发生的事件。也就是说守护线程不依赖于终端,但是依赖于系统,与系统同生共死。当 JVM 中所有的线程都是守护线程的时候,JVM 就可以退出了;如果还有一个或以上的非守护线程则 JVM 不会退出。

    2.8 线程的调度与优先级

    线程的优先级
  • 线程的优先级等级:MAX_PRIORITY=10、MIN _PRIORITY=1、NORM_PRIORITY=5(默认的优先级)
  • 线程创建时继承父线程的优先级
  • 低优先级的线程只是获得调度的概率低,并非一定是在高优先级的线程执行完之后才被调用

线程的调度

  • 调度策略:
    • 时间片;
    • 抢占式:高优先级的线程抢占CPU
  • Java的调度方法:
    • 同优先级线程组成先进先出队列(先到先服务),使用时间片策略;
    • 对高优先级,使用优先调度的抢占式策略

Java 中用到的线程调度

  1. 抢占式调度:抢占式调度指的是每条线程执行的时间、线程的切换都由系统控制,系统控制指的是在系统某种运行机制下,可能每条线程都分同样的执行时间片,也可能是某些线程执行的时间片较长,甚至某些线程得不到执行的时间片。在这种机制下,一个线程的堵塞不会导致整个进程堵塞。
  2. 协同式调度:协同式调度指某一线程执行完后主动通知系统切换到另一线程上执行,这种模式就像接力赛一样,一个人跑完自己的路程就把接力棒交接给下一个人,下个人继续往下跑。线程的执行时间由线程本身控制,线程切换可以预知,不存在多线程同步问题,但它有一个致命弱点:如果一个线程编写有问题,运行到一半就一直堵塞,那么可能导致整个系统崩溃。

JVM 的线程调度实现(抢占式调度)
java 使用的线程调使用抢占式调度,Java 中线程会按优先级分配 CPU 时间片运行,且优先级越高越优先执行,但优先级高并不代表能独自占用执行时间片,可能是优先级高得到越多的执行时间片,反之,优先级低的分到的执行时间少但不会分配不到执行时间。
线程让出 cpu 的情况

  1. 当前运行线程主动放弃 CPU,JVM 暂时放弃 CPU 操作(基于时间片轮转调度的 JVM 操作系统不会让线程永久放弃 CPU,或者说放弃本次时间片的执行权),例如调用 yield()方法。
  2. 当前运行线程因为某些原因进入阻塞状态,例如阻塞在 I/O 上。
  3. 当前运行线程结束,即运行完 run()方法里面的任务

    3. 线程的生命周期(状态)

  • 当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态。在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5 种状态。尤其当线程启动以后,它不可能一直”霸占”着 CPU 独自运行,所以 CPU 需要在多条线程之间切换,于是线程状态也会多次在运行、阻塞之间切换。
  • JDK中用Thread.State类定义了线程在一个完整的生命周期中通常要经历的几种状态:

    • 新建 NEW:使用 new 新创建的线程对象,还没有调用start()方法启动时,也称为初始或开始状态。此时仅由 JVM 在堆内存中为其分配内存,并初始化其成员变量的值
    • 就绪 RUNNABLE:处于新建状态的线程调用start()后,将进入线程队列,等待CPU的使用权,此时它已具备了运行的条件,只是没分配到CPU资源,也称为可运行状态。此时 JVM 会为其创建方法调用栈和程序计数器,等待调度运行。
      • 调用start()方法后线程进入就绪状态,并不是说只要调用start()方法线程就马上变为当前线程,在变为当前线程之前都是就绪状态。
      • 线程在睡眠或挂起恢复时,即sleep()超时,或join()等待线程终止或超时,或获取同步锁,或I/O操作完毕,或调用notify()/notifyAll()线程唤醒方法,线程也会进入就绪状态。
      • 调用yield()方法,线程让步,释放线程的CPU执行权,暂停当前正在执行的线程,把执行机会让给优先级相同或更高的线程,线程也会进入就绪状态。
    • 运行 RUNNING:处于就绪状态的线程被调度并获得CPU资源时,便进入运行状态,开始执行run()或call()方法的线程执行体,方法内定义了线程的操作和功能。
    • 阻塞 BLOCKED:由于某种原因,线程让出/放弃了CPU使用权并临时中止自己的执行,进入阻塞状态

      • 等待阻塞(o.wait->等待对列):运行的线程通过调用Object.wait()方法,JVM会将该线程放进入等待队列(waitting queue),让线程等待某工作的完成。
      • 同步阻塞塞(lock->锁池):运行的线程在获取同步锁时,该同步锁被别的线程占用,获取同步锁失败,JVM会将该线程放入锁池(lock pool)中中。线程因为等待监视锁而被阻塞,进入同步阻塞状态。

        一段代码被加上了锁,线程是怎么进去的,就是线程获得了这个锁,其他线程来了,一看这个锁被占了,只能等着。有点美女和一群恶霸的感觉,只有现在霸占美女的恶霸走了,才能有另一个恶霸接手。

      • 其他阻塞(sleep/join):运行的线程通过调用该线程的Thread.sleep(),或调用其他线程的join(),或发出了I/O操作请求时,JVM 会把该线程置为阻塞状态,线程会进入阻塞。当sleep()超时,或join()等待线程终止或超时,或I/O处理完毕,线程就重新进入就绪状态。

    • 等待 WAITING
      • 造成线程等待的原因有三种,分别是调用Object.wait()、Thread.join()以及LockSupport.park()方法。
      • 处于等待状态的线程,正在等待其他线程去执行一个特定的操作。例如:因为Object.wait()而等待的线程正在等待另一个线程去调用Object.notify()或Object.notifyAll();因为Thread.join()而等待的线程正在等待另一个线程结束。
    • 限时等待 TIMED_WAITING:一个在限定时间内等待的线程的状态,也称为限时等待状态。造成线程限时等待状态的原因有五种,分别是:Thread.sleep(long)、Object.wait(long)、Thread.join(long)、LockSupport.parkNanos(obj,long)、LockSupport.parkUntil(obj,long)。
    • 终止 TERMINATED:一个完全运行完成的线程的状态,也称为终止状态、结束状态。
    • 死亡 DEAD:线程完成了它的全部工作,或线程被提前强制性地中止或出现异常导致结束。线程会以下面三种方式结束,结束后就是死亡状态。
      • 正常结束:run()或call()方法执行完成,线程正常结束。
      • 异常结束:线程抛出一个未捕获的 Exception 或 Error。
      • 调用 stop():直接调用该线程的stop()方法来结束该线程。该方法通常容易导致死锁,不推荐使用。

image.png
5 多线程 - 图5
image.pngimage.png

4. 线程的同步

4.1 多线程安全问题

  • 多线程出现安全问题的原因:当多条语句在操作同一个线程共享数据时,一个线程对多条语句只执行了一部分,还没有执行完,另一个线程参与进来执行,导致共享数据的错误。即,多个线程执行的不确定性引起执行结果的不稳定,会造成操作的不完整性,破坏数据。
  • 解决办法:对多条操作共享数据的语句,只让一个线程都执行完,在执行过程中其他线程不可以参与执行。

    4.2 同步机制 synchronized

  • 同步机制

    • 在Java中,通过同步机制synchronized,来解决多线程的安全问题。
    • 好处:同步的方式解决了线程的安全问题。
    • 局限性:只能有一个线程参与,其他线程等待。相当于是一个单线程的过程,效率低。

image.png

  • 同步机制中的锁
    • 同步锁机制:对于并发工作,你需要某种方式来防止两个任务访问相同的资源(其实就是共享资源竞争)。防止这种冲突的方法就是当资源被一个任务使用时,在其上加锁。第一个访问某项资源的任务必须锁定这项资源,使其他的任务在其被解锁之前,就无法访问资源,而在其被解锁之时,另一个任务就可以锁定并使用资源了。
    • 同步机制中的锁是什么:同步锁、监视器、同步监视器
      • 任何一个类的对象都可以作为同步锁。所有对象都自动含有单一的锁,即监视器。

      • 同步代码块的锁:自己指定,很多时候指定为this或类名.class
      • 同步方法的锁:静态方法:类名.class;非静态方法:this
    • 注意:
      • 要求:必须确保使用同一个资源的多个线程必须共用同一把锁,否则无法保证共享资源的安全
      • 一个线程类中的所有静态方法共用同一把锁:类名.class;所有非静态方法共用同一把锁:this;同步代码块:指定同步锁时需谨慎
  • 同步的范围
    • 如何找问题,即代码是否存在线程安全?
      • 明确哪些代码是多线程运行的代码
      • 明确多个线程是否有共享数据
      • 明确多线程运行代码中是否有多条语句操作共享数据
    • 如何解决呢?
      • 对多条操作共享数据的语句,只能让一个线程都执行完,在执行过程中,其他线程不可以参与执行。即所有操作共享数据的这些语句都要放在同步范围中。
    • 同步的范围:操作共享数据(多个线程共同操作的变量)的代码,即为需要被同步的代码。不能包含代码多了,也不能包含少了。
      • 范围太小:没锁住所有有安全问题的代码。
      • 范围太大:没发挥多线程的功能。
  • 释放锁的操作
    • 当前线程的同步代码块、同步方法执行结束。
    • 当前线程在同步代码块、同步方法中遇到break、return终止了该代码块、该方法的继续执行。
    • 当前线程在同步代码块、同步方法中出现了未处理的Error或Exception,导致异常结束。
    • 当前线程在同步代码块、同步方法中执行了线程对象的Object.wait()方法,当前线程暂停,并释放锁。
  • 不会释放锁的操作
    • 线程执行同步代码块或同步方法时,程序调用Thread.sleep()、Thread.yield()方法暂停当前线程的执行。
    • 线程执行同步代码块时,其他线程调用了该线程的suspend()方法将该线程挂起,该线程不会释放锁。
      • 应尽量避免使用suspend()和resume()来控制线程
  • 线程的死锁问题

    • 死锁:
      • 不同的线程分别占用对方需要的同步资源不放弃,都在等待对方放弃自己需要的同步资源,就形成了线程的死锁。
      • 出现死锁后,不会出现异常,不会出现提示,只是所有的线程都处于阻塞状态,无法继续。
    • 解决方法:
      • 专门的算法、原则;
      • 尽量减少同步资源的定义;
      • 尽量避免嵌套同步

        4.2 解决线程安全问题的方式一:同步代码块

  • 使用同步代码块解决实现Runnable接口的方式创建多线程的线程安全问题中,可以考虑使用this充当同步监视器。

  • 使用同步代码块解决继承Thread类的方式创建多线程的线程安全问题中,慎用this充当同步监视器,可以考虑使用当前类.class充当同步监视器。
    1. synchronized (对象){//此处传入的对象就是同步锁、同步监视器
    2. // 需要被同步的代码;
    3. }
    ```java class Window1 implements Runnable{ private int ticket = 100; // Object obj = new Object(); // Dog dog = new Dog(); @Override public void run() { // Object obj = new Object();
    1. while(true){
    2. synchronized (this){//此时的this:唯一的Window1的对象 //方式二:synchronized (dog) {
    3. if (ticket > 0) {
    4. try {
    5. Thread.sleep(100);
    6. } catch (InterruptedException e) {
    7. e.printStackTrace();
    8. }
    9. System.out.println(Thread.currentThread().getName() + ":卖票,票号为:" + ticket);
    10. ticket--;
    11. } else {
    12. break;
    13. }
    14. }
    15. }
    } }

public class WindowTest1 { public static void main(String[] args) { Window1 w = new Window1(); Thread t1 = new Thread(w); Thread t2 = new Thread(w); Thread t3 = new Thread(w); t1.setName(“窗口1”); t2.setName(“窗口2”); t3.setName(“窗口3”); t1.start(); t2.start(); t3.start(); } }

class Dog{ }

  1. ```java
  2. class Window2 extends Thread{
  3. private static int ticket = 100;
  4. private static Object obj = new Object();
  5. @Override
  6. public void run() {
  7. while(true){
  8. //正确的
  9. // synchronized (obj){
  10. synchronized (Window2.class){//Class clazz = Window2.class,Window2.class只会加载一次
  11. //错误的方式:this代表着t1,t2,t3三个对象
  12. // synchronized (this){
  13. if(ticket > 0){
  14. try {
  15. Thread.sleep(100);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. System.out.println(getName() + ":卖票,票号为:" + ticket);
  20. ticket--;
  21. }else{
  22. break;
  23. }
  24. }
  25. }
  26. }
  27. }
  28. public class WindowTest2 {
  29. public static void main(String[] args) {
  30. Window2 t1 = new Window2();
  31. Window2 t2 = new Window2();
  32. Window2 t3 = new Window2();
  33. t1.setName("窗口1");
  34. t2.setName("窗口2");
  35. t3.setName("窗口3");
  36. t1.start();
  37. t2.start();
  38. t3.start();
  39. }
  40. }

4.3 解决线程安全问题的方式二:同步方法

  • synchronized还可以放在方法声明中,表示整个方法为同步方法。
  • 同步方法仍然涉及到同步监视器,只是不需要我们显示的声明。
  • 非静态的同步方法,同步监视器是:this。
  • 静态的同步方法,同步监视器是:当前类本身。所以这时特别注意当处理继承Thread类的线程安全问题时,同步监视器不能是this,而应该是当前类,所以此时要把同步方法改为静态的。

    1. public synchronized void show (String name){
    2. }

    ```java class Window3 implements Runnable {

    private int ticket = 100;

    @Override public void run() {

    1. while (true) {
    2. show();
    3. }

    }

    private synchronized void show(){//同步监视器:this

    1. //synchronized (this){
    2. if (ticket > 0) {
    3. try {
    4. Thread.sleep(100);
    5. } catch (InterruptedException e) {
    6. e.printStackTrace();
    7. }
    8. System.out.println(Thread.currentThread().getName() + ":卖票,票号为:" + ticket);
    9. ticket--;
    10. }
    11. //}

    } }

public class WindowTest3 { public static void main(String[] args) { Window3 w = new Window3();

  1. Thread t1 = new Thread(w);
  2. Thread t2 = new Thread(w);
  3. Thread t3 = new Thread(w);
  4. t1.setName("窗口1");
  5. t2.setName("窗口2");
  6. t3.setName("窗口3");
  7. t1.start();
  8. t2.start();
  9. t3.start();
  10. }

}

  1. ```java
  2. class Bank{
  3. private Bank(){}
  4. private static Bank instance = null;
  5. public static Bank getInstance(){
  6. //方式一:效率稍差
  7. // synchronized (Bank.class) {
  8. // if(instance == null){
  9. // instance = new Bank();
  10. // }
  11. // return instance;
  12. // }
  13. //方式二:效率更高
  14. if(instance == null){
  15. synchronized (Bank.class) {
  16. if(instance == null){
  17. instance = new Bank();
  18. }
  19. }
  20. }
  21. return instance;
  22. }
  23. }
  24. public class BankTest {
  25. Bank b1=Bank.getInstance();
  26. Bank b2=Bank.getInstance();
  27. System.out.println(b1==b2);
  28. }

4.4 解决线程安全问题的方式三:Lock锁同步锁

  • 从JDK 5.0开始,Java提供了更强大的线程同步机制:通过显式定义同步锁对象来实现同步,同步锁使用Lock对象充当。
  • java.util.concurrent.locks.Lock接口是控制多个线程对共享资源进行访问的工具。锁提供了对共享资源的独占访问,每次只能有一个线程对Lock对象加锁,线程开始访问共享资源之前应先获得Lock对象。
  • ReentrantLock类实现了Lock接口,它拥有与synchronized相同的并发性和内存语义,在实现线程安全的控制中,比较常用的是ReentrantLock,可以显式加锁、释放锁。

    1. class A{
    2. private final ReentrantLock lock = new ReenTrantLock();
    3. public void m(){
    4. lock.lock();
    5. try{
    6. //保证线程安全的代码;
    7. }
    8. finally{
    9. lock.unlock(); //注意:如果同步代码有异常,要将unlock()写入finally语句块
    10. }
    11. }
    12. }
    1. class Window implements Runnable{
    2. private int ticket = 100;
    3. //1.实例化ReentrantLock
    4. private ReentrantLock lock = new ReentrantLock();
    5. @Override
    6. public void run() {
    7. while(true){
    8. try{
    9. //2.调用锁定方法lock()
    10. lock.lock();
    11. if(ticket > 0){
    12. try {
    13. Thread.sleep(100);
    14. } catch (InterruptedException e) {
    15. e.printStackTrace();
    16. }
    17. System.out.println(Thread.currentThread().getName() + ":售票,票号为:" + ticket);
    18. ticket--;
    19. }else{
    20. break;
    21. }
    22. }finally {
    23. //3.调用解锁方法:unlock()
    24. lock.unlock();
    25. }
    26. }
    27. }
    28. }
    29. public class LockTest {
    30. public static void main(String[] args) {
    31. Window w = new Window();
    32. Thread t1 = new Thread(w);
    33. Thread t2 = new Thread(w);
    34. Thread t3 = new Thread(w);
    35. t1.setName("窗口1");
    36. t2.setName("窗口2");
    37. t3.setName("窗口3");
    38. t1.start();
    39. t2.start();
    40. t3.start();
    41. }
    42. }
  • synchronized 与 Lock 的对比

    • Lock是显式锁,需要手动开启和关闭锁;synchronized是隐式锁,出了作用域自动释放。
    • Lock只有代码块锁,synchronized有代码块锁和方法锁。
    • 使用Lock锁,JVM将花费较少的时间来调度线程,性能更好。并且具有更好的扩展性(提供更多的子类)
  • 优先使用顺序:

    • Lock -> 同步代码块(已经进入了方法体,分配了相应资源)-> 同步方法(在方法体之外)

      5. 线程的通信

  • wait():令当前线程挂起并放弃CPU、同步资源并等待,使别的线程可访问并修改共享资源,而当前线程排队等候其他线程调用notify()或notifyAll()方法唤醒,唤醒后等待重新获得对监视器的所有权后才能继续执行。

    • 在当前线程中调用方法:对象名.wait()
    • 使当前线程进入等待(某对象)状态,直到另一线程对该对象发出notify或notifyAll为止。
    • 调用方法的必要条件:当前线程必须具有对该对象的监控权(加锁)
    • 调用此方法后,当前线程将释放对象监控权,然后进入等待
    • 在当前线程被notify()或notifyAll()方法唤醒后,要重新获得监控权,然后从断点处继续代码的执行。
  • notify()/notifyAll():唤醒正在排队等待同步资源的线程中优先级最高者结束等待 / 唤醒正在排队等待资源的所有线程结束等待。
    • 在当前线程中调用方法:对象名.notify()/notifyAll()
    • 功能:唤醒等待该对象监控权的一个/所有线程。
    • 调用方法的必要条件:当前线程必须具有对该对象的监控权(加锁)
  • 这三个方法只有在synchronized方法或代码块中或加锁才能使用,否则会报IllegalMonitorStateException异常。
  • 因为这三个方法必须有锁对象调用,而任意对象都可以作为synchronized的同步锁,因此这三个方法只能在Object类中声明。 ```java /**

    • 线程通信的应用:经典例题:生产者/消费者问题 *
    • 生产者(Productor)将产品交给店员(Clerk),而消费者(Customer)从店员处取走产品,
    • 店员一次只能持有固定数量的产品(比如:20),如果生产者试图生产更多的产品,店员
    • 会叫生产者停一下,如果店中有空位放产品了再通知生产者继续生产;如果店中没有产品
    • 了,店员会告诉消费者等一下,如果店中有产品了再通知消费者来取走产品。
    • 可能出现两个问题:
    • 1.生产者比消费者快时,消费者会漏掉一些数据没有取到。
    • 2.消费者比生产者快时,消费者会取相同的数据。
    • 分析:
      1. 是否是多线程问题?是,生产者线程,消费者线程
      1. 是否有共享数据?是,店员(或产品)
      1. 如何解决线程的安全问题?同步机制,有三种方法
      1. 是否涉及线程的通信?是 */ class Clerk{

      private int productCount = 0; //生产产品 public synchronized void produceProduct() {

      1. if(productCount < 20){
      2. productCount++;
      3. System.out.println(Thread.currentThread().getName() + ":开始生产第" + productCount + "个产品");
      4. notify();
      5. }else{
      6. //等待
      7. try {
      8. wait();
      9. } catch (InterruptedException e) {
      10. e.printStackTrace();
      11. }
      12. }

      } //消费产品 public synchronized void consumeProduct() {

      1. if(productCount > 0){
      2. System.out.println(Thread.currentThread().getName() + ":开始消费第" + productCount + "个产品");
      3. productCount--;
      4. notify();
      5. }else{
      6. //等待
      7. try {
      8. wait();
      9. } catch (InterruptedException e) {
      10. e.printStackTrace();
      11. }
      12. }

      } }

class Producer extends Thread{//生产者

  1. private Clerk clerk;
  2. public Producer(Clerk clerk) {
  3. this.clerk = clerk;
  4. }
  5. @Override
  6. public void run() {
  7. System.out.println(getName() + ":开始生产产品.....");
  8. while(true){
  9. try {
  10. Thread.sleep(10);
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. clerk.produceProduct();
  15. }
  16. }

}

class Consumer extends Thread{//消费者 private Clerk clerk;

  1. public Consumer(Clerk clerk) {
  2. this.clerk = clerk;
  3. }
  4. @Override
  5. public void run() {
  6. System.out.println(getName() + ":开始消费产品.....");
  7. while(true){
  8. try {
  9. Thread.sleep(20);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. clerk.consumeProduct();
  14. }
  15. }

}

public class ProductTest {

  1. public static void main(String[] args) {
  2. Clerk clerk = new Clerk();
  3. Producer p1 = new Producer(clerk);
  4. p1.setName("生产者1");
  5. Consumer c1 = new Consumer(clerk);
  6. c1.setName("消费者1");
  7. Consumer c2 = new Consumer(clerk);
  8. c2.setName("消费者2");
  9. p1.start();
  10. c1.start();
  11. c2.start();
  12. }

}

  1. <a name="Fty2T"></a>
  2. # 6. 线程池
  3. <a name="ayAvY"></a>
  4. ## 6.1 线程池的好处和原理
  5. **背景**<br />线程和数据库连接这些资源都是非常宝贵的资源。每次需要的时候创建,不需要的时候销毁,是非常浪费资源的。经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。
  6. **思路**<br />可以使用缓存的策略,也就是使用线程池。提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁创建销毁、实现重复利用。
  7. 线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕, 再从队列中取出任务来执行。
  8. **好处:**线程复用;控制最大并发数;管理线程
  9. - 提高响应速度:减少了创建新线程的时间。当任务到达时,任务可以不需要等到线程创建就能立即执行
  10. - 降低资源消耗:重复利用线程池中已创建的线程。不需要每次都创建,降低线程创建和销毁造成的消耗
  11. - 便于线程管理:设置线程池的属性。提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
  12. **线程复用**<br />每一个 Thread 的类都有一个 start 方法。 当调用 start 启动线程时 Java 虚拟机会调用该类的 run 方法。 那么该类的 run() 方法中就是调用了 Runnable 对象的 run() 方法。 我们可以继承重写Thread 类,在其 start 方法中添加不断循环调用传递过来的 Runnable 对象。 这就是线程池的实现原理。循环方法中不断获取 Runnable 是用 Queue 实现的,在获取下一个 Runnable 之前可以是阻塞的。
  13. [参考](https://blog.csdn.net/u013541140/article/details/95225769)
  14. <a name="ZAgzR"></a>
  15. ## 6.1 **线程池的组成**
  16. 一般的线程池主要分为以下 4 个组成部分:
  17. 1. 线程池管理器:用于创建并管理线程池
  18. 1. 工作线程:线程池中的线程
  19. 1. 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
  20. 1. 任务队列:用于存放待处理的任务,提供一种缓冲机制
  21. <a name="ro1Eb"></a>
  22. ## 6.2 线程池的API和属性
  23. Java 中的线程池是通过 Executor 框架实现的,其中用到了 Executor接口,ExecutorService接口,Executors工具类,ThreadPoolExecutor实现类 ,Callable 和 Future、FutureTask 这几个类。<br />![image.png](https://cdn.nlark.com/yuque/0/2022/png/26064559/1649762992056-0546897a-a6d8-42a9-b401-c4fb48186e23.png#clientId=u30a77377-edf9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=248&id=DxlZg&name=image.png&originHeight=453&originWidth=924&originalType=binary&ratio=1&rotation=0&showTitle=false&size=237804&status=done&style=none&taskId=u85e69156-2542-4c01-a681-308d4a2e1c6&title=&width=504.9985656738281)<br />**Executor接口**<br />Java 里面线程池的顶级接口是 Executor,其中public interface ExecutorService extends Executor
  24. **ExecutorService接口**
  25. - 真正的线程池接口,JDK普通线程池。常见实现类ThreadPoolExecutor
  26. - void execute(Runnable command):执行任务/命令,没有返回值,一般用来执行Runnable接口的实现类对象
  27. - <T> Future<T> submit(Callable<T> task):执行任务,有返回值,一般用来执行Callable接口的实现类对象
  28. - void shutdown():关闭连接池
  29. **ThreadPoolExecutor实现类**<br />ThreadPoolExecutor是ExecutorService的实现类,线程池的真正实现类,有4种构造方法。
  30. ```java
  31. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  32. BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
  33. RejectedExecutionHandler handler) {

线程池的属性/参数:setXxx、getXxx

  • corePoolSize(必需):指定了线程池中核心线程数量(核心池的大小)。默认情况下,核心线程会一直存活,但是当将 allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
  • maximumPoolSize(必需):指定了线程池所能容纳的最大线程数量。当活跃线程数达到该数值后,后续的新任务将会阻塞。
  • keepAliveTime(必需):线程闲置超时时长,即线程没有任务时最多保持的时长,超过该时长,非核心线程就会被回收。如果将 allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
  • unit(必需):指定 keepAliveTime 参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
  • workQueue(必需):任务队列。通过线程池的 execute() 方法提交的 Runnable 对象将存储在该参数中。其采用阻塞队列实现。被提交但尚未被执行的任务存储在任务队列中。
  • threadFactory(可选):线程工厂。用于指定为线程池创建新线程的方式。一般用默认的即可。
  • handler(可选):拒绝策略。当达到最大线程数时需要执行的饱和策略。当任务太多来不及处理时,如何拒绝任务。

    6.1 线程池的使用

    线程池的使用步骤
    1) 提供/创建一个指定线程数量的线程池
    2) 设置线程池的属性(可省略)
    3) 执行指定的线程的操作:需要提供实现Runnable接口或Callable接口的实现类的对象
    4) 关闭连接池

    1. // 创建线程池
    2. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
    3. MAXIMUM_POOL_SIZE,
    4. KEEP_ALIVE,
    5. TimeUnit.SECONDS,
    6. sPoolWorkQueue,
    7. sThreadFactory);
    8. // 向线程池提交任务
    9. threadPool.execute(new Runnable() {
    10. @Override
    11. public void run() {
    12. ... // 线程执行的任务
    13. }
    14. });
    15. // 关闭线程池
    16. threadPool.shutdown(); // 设置线程池的状态为SHUTDOWN,然后中断所有没有正在执行任务的线程
    17. threadPool.shutdownNow(); // 设置线程池的状态为 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
    1. public class ThreadPool {
    2. public static void main(String[] args) {
    3. //1. 提供一个指定线程数量的线程池
    4. // 通过Executors.newFixedThreadPool(n); 创建ExecutorService类型的对象
    5. // 再强转成ThreadPoolExecutor类型
    6. ExecutorService service = Executors.newFixedThreadPool(10);
    7. ThreadPoolExecutor service1 = (ThreadPoolExecutor) service;
    8. //2.设置线程池的属性
    9. // System.out.println(service.getClass());
    10. // service1.setCorePoolSize(15);
    11. // service1.setKeepAliveTime();
    12. //3.执行指定的线程的操作。需要提供实现Runnable接口或Callable接口实现类的对象
    13. service.execute(new NumberThread());//适合适用于Runnable
    14. service.execute(new NumberThread1());//适合适用于Runnable
    15. // service.submit(Callable callable);//适合使用于Callable
    16. //4.关闭连接池
    17. service.shutdown();
    18. }
    19. }
    20. class NumberThread implements Runnable{//实现Runnable接口的实现类
    21. @Override
    22. public void run() {
    23. for(int i = 0;i <= 100;i++){
    24. if(i % 2 == 0){
    25. System.out.println(Thread.currentThread().getName() + ": " + i);
    26. }
    27. }
    28. }
    29. }
    30. class NumberThread1 implements Runnable{//实现Runnable接口的实现类
    31. @Override
    32. public void run() {
    33. for(int i = 0;i <= 100;i++){
    34. if(i % 2 != 0){
    35. System.out.println(Thread.currentThread().getName() + ": " + i);
    36. }
    37. }
    38. }
    39. }
    1. // 创建线程池
    2. ExecutorService threadPool = Executors.newFixedThreadPool(10);
    3. while(true) {
    4. threadPool.execute(new Runnable() { // 提交多个线程任务,并执行
    5. @Override
    6. public void run() {
    7. System.out.println(Thread.currentThread().getName() + " is running ..");
    8. try {
    9. Thread.sleep(3000);
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. });
    15. }

    ExecutorService结合Callable实现有返回结果的多线程:

  • Callable创建线程,结合线程池接口 ExecutorService 就可以实现有返回结果的多线程。

    1. // 创建一个线程池
    2. ExecutorService pool = Executors.newFixedThreadPool(taskSize);
    3. // 创建多个有返回值的任务
    4. List<Future> list = new ArrayList<Future>();
    5. for (int i = 0; i < taskSize; i++) {
    6. Callable c = new MyCallable(i + " ");
    7. // 执行任务并获取 Future 对象
    8. Future f = pool.submit(c);
    9. list.add(f);
    10. }
    11. // 关闭线程池
    12. pool.shutdown();
    13. // 获取所有并发任务的运行结果
    14. for (Future f : list) {
    15. // 从 Future 对象上获取任务的返回值,并输出到控制台
    16. System.out.println("res:" + f.get().toString());
    17. }

    6.2 四种常见的功能线程池

    Executors工具类

  • 线程池的工厂类,用于创建并返回不同类型的线程池。

  • ThreadPoolExecutor是Executors类的底层实现

定长线程池(FixedThreadPool)
固定大小线程池。

其缓冲/阻塞/任务队列是LinkedBlockingQueue,大小默认是Integer.MAX_VALUE,相当于无界的。
其corePoolSize和maximumPoolSize一样,都是在创建时传入的参数n,相当于只有核心线程。

过程:创建一个固定大小的线程池来完成任务。每次提交一个任务,就创建一个线程,直到线程达到线程池的最大大小n,线程池的大小一旦达到最大值就会保持不变;执行完任务后就立即回收线程;如果所有线程都处于活动状态,那么新来的任务,放入阻塞队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。线程池中的线程除非被显式的关闭,否则将一直存在。

  • Executors.newFixedThreadPool(n):创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
  • 特点:只有核心线程,线程数量固定,执行完立即回收,任务队列为链表结构的有界队列LinkedBlockingQueue。
  • 应用场景:控制线程最大并发数。
    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,
    3. 0L, TimeUnit.MILLISECONDS,
    4. new LinkedBlockingQueue<Runnable>());
    5. }
    6. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    7. return new ThreadPoolExecutor(nThreads, nThreads,
    8. 0L, TimeUnit.MILLISECONDS,
    9. new LinkedBlockingQueue<Runnable>(),
    10. threadFactory);
    11. }
    1. // 1. 创建定长线程池对象 & 设置线程池线程数量固定为3
    2. ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
    3. // 2. 创建好Runnable类线程对象 & 需执行的任务
    4. Runnable task =new Runnable(){
    5. public void run() {
    6. System.out.println("执行任务啦");
    7. }
    8. };
    9. // 3. 向线程池提交任务
    10. fixedThreadPool.execute(task);
    定时线程池(ScheduledThreadPool )
    创建一个大小无限的线程池。

此线程池支持定时以及周期性执行任务的需求。

在创建时,传入时间参数,让线程在给定延迟后运行命令或者定期执行。

  • Executors.newScheduledThreadPool(n):创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
  • 特点:核心线程数量固定,非核心线程数量无限,执行完闲置 10ms 后回收,任务队列为延时阻塞队列DelayedWorkQueue。
  • 应用场景:执行定时或周期性的任务 ```java private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }

public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); }

  1. ```java
  2. // 1. 创建 定时线程池对象 & 设置线程池线程数量固定为5
  3. ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
  4. // 2. 创建好Runnable类线程对象 & 需执行的任务
  5. Runnable task =new Runnable(){
  6. public void run() {
  7. System.out.println("执行任务啦");
  8. }
  9. };
  10. // 3. 向线程池提交任务
  11. scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延迟1s后执行任务
  12. scheduledThreadPool.scheduleAtFixedRate(task,10,1000,TimeUnit.MILLISECONDS);// 延迟10ms后、每隔1000ms执行任务
  1. ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(3);
  2. scheduledThreadPool.schedule(new Runnable(){
  3. @Override
  4. public void run() {
  5. System.out.println("延迟三秒");
  6. }
  7. }, 3, TimeUnit.SECONDS);
  8. scheduledThreadPool.scheduleAtFixedRate(new Runnable(){
  9. @Override
  10. public void run() {
  11. System.out.println("延迟 1 秒后每三秒执行一次");
  12. }
  13. },1,3,TimeUnit.SECONDS);

可缓存线程池(CachedThreadPool)
无界线程池,可以进行自动线程回收。

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程(60秒不执行任务),当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

根据需要创建线程的线程池,适合短期异步任务,一个线程执行完任务,可重复利用,并且会移除那些60秒钟未被使用的线程。这个线程池可以节省资源。发现没有可用线程,才会创建线程。

  • Executors.newCachedThreadPool():创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
  • 特点:无核心线程,非核心线程数量无限,执行完闲置 60s 后回收,任务队列为不存储元素的阻塞队列SynchronousQueue。
  • 应用场景:执行大量、耗时少的任务。

    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>());
    5. }
    6. public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    7. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    8. 60L, TimeUnit.SECONDS,
    9. new SynchronousQueue<Runnable>(),
    10. threadFactory);
    11. }
    1. // 1. 创建可缓存线程池对象
    2. ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    3. // 2. 创建好Runnable类线程对象 & 需执行的任务
    4. Runnable task =new Runnable(){
    5. public void run() {
    6. System.out.println("执行任务啦");
    7. }
    8. };
    9. // 3. 向线程池提交任务
    10. cachedThreadPool.execute(task);
  • 单线程化线程池(SingleThreadExecutor)

单个后台线程 (其缓冲队列是无界的) 。

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

  • Executors.newSingleThreadExecutor():创建一个只有一个线程的线程池。返回一个线程池(这个线程池只有一个线程),这个线程池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去。
  • 特点:只有 1 个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列LinkedBlockingQueue。
  • 应用场景:不适合并发但可能引起 IO 阻塞性及影响 UI 线程响应的操作,如数据库操作、文件操作等。

    1. public static ExecutorService newSingleThreadExecutor() {
    2. return new FinalizableDelegatedExecutorService
    3. (new ThreadPoolExecutor(1, 1,
    4. 0L, TimeUnit.MILLISECONDS,
    5. new LinkedBlockingQueue<Runnable>()));
    6. }
    7. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    8. return new FinalizableDelegatedExecutorService
    9. (new ThreadPoolExecutor(1, 1,
    10. 0L, TimeUnit.MILLISECONDS,
    11. new LinkedBlockingQueue<Runnable>(),
    12. threadFactory));
    13. }
    1. // 1. 创建单线程化线程池
    2. ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    3. // 2. 创建好Runnable类线程对象 & 需执行的任务
    4. Runnable task =new Runnable(){
    5. public void run() {
    6. System.out.println("执行任务啦");
    7. }
    8. };
    9. // 3. 向线程池提交任务
    10. singleThreadExecutor.execute(task);
  • 对比

image.png

  • 总结

Executors 的 4 个功能线程池虽然方便,但现在已经不建议使用了,而是建议直接通过使用 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

其实 Executors 的 4 个功能线程有如下弊端:

  • FixedThreadPoolSingleThreadExecutor:主要问题是堆积的请求处理队列均采用 LinkedBlockingQueue,可能会耗费非常大的内存,甚至 OOM。
  • CachedThreadPoolScheduledThreadPool:主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM

image.png

6.2 线程池工作过程

image.png

  1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
  2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
    a) 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    b) 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    c) 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
    d) 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常 RejectExecutionException。通过Handler指定的拒绝策略处理任务,默认AbortPolicy。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  4. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。回收空闲超时的非核心线程。

image.png

6.3 任务队列/阻塞队列(workQueue)

阻塞队列原理
任务队列是基于阻塞队列实现的,即采用生产者消费者模式。
阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:

  1. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放
    入队列。
  2. 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有
    空的位置,线程被自动唤醒。

image.png image.png
阻塞队列的主要方法
image.png
抛出异常:抛出一个异常;特殊值:返回一个特殊值(null 或 false,视情况而定);
阻塞:在成功操作之前,一直阻塞线程;超时:放弃前只在最大的时间内阻塞;
插入操作

  1. public abstract boolean add(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。如果该元素是 NULL,则会抛出 NullPointerException 异常。
  2. public abstract boolean offer(E paramE):将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false。
  3. public abstract void put(E paramE) throws InterruptedException: 将指定元素插入此队列中,将等待可用的空间(如果有必要)
  4. offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入 BlockingQueue,则返回失败。

    1. public void put(E paramE) throws InterruptedException {
    2. checkNotNull(paramE);
    3. ReentrantLock localReentrantLock = this.lock;
    4. localReentrantLock.lockInterruptibly();
    5. try {
    6. while (this.count == this.items.length)
    7. this.notFull.await();//如果队列满了,则线程阻塞等待
    8. enqueue(paramE);
    9. localReentrantLock.unlock();
    10. } finally {
    11. localReentrantLock.unlock();
    12. }
    13. }

    获取数据操作

  5. poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null;

  6. poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
  7. take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 BlockingQueue 有新的数据被加入。
  8. drainTo():一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

Java 中的七种阻塞队列
在 Java 中需要实现 BlockingQueue 接口。但 Java 已经为我们提供了七种阻塞队列的实现。

ArrayBlockingQueue(公平、非公平)
一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)。
用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。默认创建的是非公平的阻塞队列,同时可以使用以下代码创建一个公平的阻塞队列:ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

LinkedBlockingQueue(两个独立锁提高并发)
一个由链表结构组成的可有界可无界的阻塞队列,在未指明容量时,容量默认为 Integer.MAX_VALUE。参考
基于链表的可以有界可以无界的阻塞队列。同 ArrayListBlockingQueue 类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。默认一个类似无限大小的容量Integer.MAX_VALUE。

PriorityBlockingQueue(compareTo 排序实现优先)
一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现 Comparable 接口也可以提供 Comparator 来对队列中的元素进行比较。跟时间没有任何关系,仅仅是按照优先级取任务。

是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

DelayQueue(缓存失效、定时任务 )
类似于PriorityBlockingQueue,是一个二叉堆/优先级队列实现的无界优先级阻塞队列。要求元素都实现 Delayed 接口,通过执行时延从队列中提取任务,时间没到任务取不出来。

是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将 DelayQueue 运用在以下应用场景:

  1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询
    DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
  2. 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue 中获取到任务就开始执行,从比如 TimerQueue 就是使用 DelayQueue 实现的。

SynchronousQueue
一个不存储元素的阻塞队列,消费者线程调用 take() 方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用 put() 方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。

每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用, SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和ArrayBlockingQueue。

消费者线程调用 take() 方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用 put() 方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。

LinkedTransferQueue
一个由链表结构组成的无界阻塞队列。它是ConcurrentLinkedQueue、LinkedBlockingQueue 和 SynchronousQueue 的结合体,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行为一致,都是无界的阻塞队列。

相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

  1. transfer 方法:如果当前有消费者正在等待接收元素(消费者使用 take()方法或带时间限制的poll()方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。
  2. tryTransfer 方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回。而 transfer 方法是必须等到消费者消费了才返回。对于带有时间限制的 tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。

LinkedBlockingDeque
一个由双向队列/链表结构实现的有界双端/向阻塞队列。双端意味着可以像普通队列一样 FIFO(先进先出),也可以像栈一样 FILO(先进后出)。

是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法却等同于 takeFirst,不知道是不是 Jdk 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。在初始化 LinkedBlockingDeque 时可以设置容量防止其过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。

有界队列和无界队列的区别
如果使用有界队列,当队列饱和时并超过最大线程数时就会执行拒绝策略;
如果使用无界队列,因为任务队列永远都可以添加任务,直到系统资源耗尽。

ArrayBlockingQueue、LinkedBlockingQueue、ConcurrentLinkedQueue的区别


ArrayBlockingQueue LinkedBlockingQueue ConcurrentLinkedQueue
阻塞与否 阻塞 阻塞 非阻塞
是否有界 有界,适合已知最大存储容量的场景 可配置,可有界可以无界 无界
线程安全保障
(并发方面)
一把全局锁(采用一把锁,两个condition)(还支持公平锁) 存取采用2把锁(头尾各1把锁) CAS
适用场景 生产消费模型,平衡两边处理速度 生产消费模型,平衡两边处理速度 对全局的集合进行操作的场景
注意事项 用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间) 无界的时候注意内存溢出问题,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。 size() 是要遍历一遍集合,慎用
内存方面 用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间) 用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。
吞吐量 LinkedBlockingQueue在大多数并发的场景下吞吐量比ArrayBlockingQueue高,但是性能不稳定。LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步。

6.4 线程工厂(threadFactory)

线程工厂指定创建线程的方式,需要实现 ThreadFactory 接口,并实现 newThread(Runnable r) 方法。该参数可以不用指定,Executors 框架已经为我们实现了一个默认的线程工厂:DefaultThreadFactory (详细看源码)

6.5 拒绝策略(handler)

当线程池中的线程数达到了最大线程数,并且线程池中的线程已经用完了,无法继续为新任务服务,同时等待队列也已经排满了,再也塞不下新任务了。这时候就需要执行拒绝策略,合理的处理问题。拒绝策略需要实现 RejectedExecutionHandler 接口,并实现 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法。不过 JDK 内置的 Executors 框架已经为我们实现了 4 种拒绝策略:

  1. AbortPolicy(默认):丢弃任务并直接抛出 RejectedExecutionException 异常,阻止系统正常运行。
  2. CallerRunsPolicy:由调用线程处理该任务。只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
  3. DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
  4. DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际需要,完全可以自己扩展 RejectedExecutionHandler 接口。

7. ThreadLocal(线程本地存储)

ThreadLocal,很多地方叫做线程本地变量,也有些地方叫做线程本地存储,ThreadLocal 的作用是提供线程内的局部变量,这种变量在线程的生命周期内起作用,减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度。

ThreadLocalMap(线程的一个属性)

  1. 每个线程中都有一个自己的 ThreadLocalMap 类对象,可以将属于线程自己的对象保持到其中,各管各的,线程可以正确的访问到自己的对象。
  2. 将一个共用的 ThreadLocal 静态实例作为 key,将不同对象的引用保存到不同线程的ThreadLocalMap 中,然后在线程执行的各处通过这个静态 ThreadLocal 实例的 get()方法取得自己线程保存的那个对象,避免了将这个对象作为参数传递的麻烦。
  3. ThreadLocal中的set、get方法用来获取ThreadLocalMap中存放的key对应的value,其中key是当前线程,value是当前线程自己的对象(源码)。
  4. ThreadLocalMap 其实就是线程里面的一个属性,它在 Thread 类中定义ThreadLocal.ThreadLocalMap threadLocals = null;

image.png
使用场景
最常见的 ThreadLocal 使用场景为 用来解决 数据库连接、Session 管理等。

  1. private static final ThreadLocal threadSession = new ThreadLocal();
  2. public static Session getSession() throws InfrastructureException {
  3. Session s = (Session) threadSession.get();
  4. try {
  5. if (s == null) {
  6. s = getSessionFactory().openSession();
  7. threadSession.set(s);
  8. }
  9. } catch (HibernateException ex) {
  10. throw new InfrastructureException(ex);
  11. }
  12. return s;
  13. }

8. 如何在两个线程之间共享数据

Java 里面进行多线程通信的主要方式就是共享内存的方式,共享内存主要的关注点有两个:可见性和有序性原子性。Java 内存模型(JMM)解决了可见性和有序性的问题,而锁解决了原子性的问题,理想情况下我们希望做到同步和互斥。有以下常规实现方法:

  1. 将数据抽象成一个类,并将对这个数据的操作作为这个类的方法,这么设计可以和容易做到
    同步,只要在方法上加 synchronized
    ```java public class MyData { private int j=0; public synchronized void add(){
    1. j++;
    2. System.out.println("线程"+Thread.currentThread().getName()+"j 为:"+j);
    } public synchronized void dec(){
    1. j--;
    2. System.out.println("线程"+Thread.currentThread().getName()+"j 为:"+j);
    } public int getData(){
    1. return j;
    } }

public class AddRunnable implements Runnable{ MyData data; public AddRunnable(MyData data){ this.data= data; } public void run() { data.add(); } }

public class DecRunnable implements Runnable { MyData data; public DecRunnable(MyData data){ this.data = data; } public void run() { data.dec(); } } public static void main(String[] args) { MyData data = new MyData(); Runnable add = new AddRunnable(data); Runnable dec = new DecRunnable(data); for(int i=0;i<2;i++){ new Thread(add).start(); new Thread(dec).start(); }

  1. 2. Runnable 对象作为一个类的内部类,共享数据作为这个类的成员变量,每个线程对共享数据的操作方法也封装在外部类,以便实现对数据的各个操作的同步和互斥,作为内部类的各个 Runnable 对象调用外部类的这些方法。
  2. ```java
  3. public class MyData {
  4. private int j=0;
  5. public synchronized void add(){
  6. j++;
  7. System.out.println("线程"+Thread.currentThread().getName()+"j 为:"+j);
  8. }
  9. public synchronized void dec(){
  10. j--;
  11. System.out.println("线程"+Thread.currentThread().getName()+"j 为:"+j);
  12. }
  13. public int getData(){
  14. return j;
  15. }
  16. }
  17. public class TestThread {
  18. public static void main(String[] args) {
  19. final MyData data = new MyData();
  20. for(int i=0;i<2;i++){
  21. new Thread(new Runnable(){
  22. public void run() {
  23. data.add();
  24. }
  25. }).start();
  26. new Thread(new Runnable(){
  27. public void run() {
  28. data.dec();
  29. }
  30. }).start();
  31. }
  32. }
  33. }

9. 锁的终极整理

分类

· 概念上的锁:悲观锁、乐观锁、互斥锁、公平锁、非公平锁、共享锁、独占锁、死锁
· 有具体实现类的锁:synchronized、reentrantLock、ReadWriteLock
· 底层原理锁:自旋锁、自适应自旋锁、可重入锁、重量级锁、轻量级锁、偏向锁
· 数据库锁:行锁、表锁、页锁

乐观锁

乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,采取在写时先读出当前版本号,然后加锁操作(比较跟上一次的版本号,如果一样则更新),如果失败则要重复读-比较-写的操作。

JAVA 中的乐观锁基本都是通过 CAS 操作实现的,CAS 是一种更新的原子操作,比较当前值跟传入值是否一样,一样则更新,否则失败。

总结

每次去拿数据时,都默认不会有其他线程修改数据,所以每次都不会加锁,不会阻止其他线程进入。但在更新数据的时候,会通过数据版本号/时间戳以及CAS机制,比对一下数据有没有被修改。如果被修改,就重新拿取数据(自旋重新获取数据);没有,就更新修改数据。

乐观锁一般使用CAS机制实现。如AtomicInteger等确保原子操作的类,valitile关键字是一种同步机制,不是锁,解决不了原子性问题。

乐观锁适用于读多的场景。

悲观锁

悲观锁是就是悲观思想,即认为写多,遇到并发写的可能性高,每次去拿数据的时候都认为别人会修改,所以每次在读写数据的时候都会上锁,这样别人想读写这个数据就会 block 直到拿到锁。

JAVA中的悲观锁就是Synchronized。AQS框架下的锁则是先尝试cas乐观锁去获取锁,获取不到,才会转换为悲观锁,如 RetreenLock。

总结

每次拿取数据都认为会有其他线程修改我的数据,所以每次都会上锁,阻止其他线程进入(一次只让一个线程进入,其他线程等待)。

JAVA中的悲观锁:Synchronized、ReentrantLock。
AQS框架下的锁:先尝试CAS乐观锁去获取锁,获取不到转为悲观锁,如RetreenLock。

悲观锁适用于写多的场景。

自旋锁

目前用到的所有锁的底层都有自旋锁,自旋锁是对锁的一种优化。自旋锁给其他的锁提供了自旋的功能。可以设定自旋的时间。自旋锁底层由java编写,是个native方法。

自旋锁原理非常简单,如果持有锁的线程能在很短时间内释放锁资源,那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞挂起状态,它们只需要等一等(自旋),等持有锁的线程释放锁后即可立即获取锁,这样就避免用户线程和内核的切换的消耗。

线程自旋是需要消耗 cup 的,说白了就是让 cup 在做无用功,如果一直获取不到锁,那线程也不能一直占用 cup 自旋做无用功,所以需要设定一个自旋等待的最大时间。

如果持有锁的线程执行的时间超过自旋等待的最大时间扔没有释放锁,就会导致其它争用锁的线程在最大等待时间内还是获取不到锁,这时争用线程会停止自旋进入阻塞状态。

自旋锁的优缺点
自旋锁尽可能的减少线程的阻塞,这对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能能大幅度的提升,因为自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗,这些操作会导致线程发生两次上下文切换!

但是如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁执行同步块,这时候就不适合使用自旋锁了,因为自旋锁在获取锁前一直都是占用 cpu 做无用功,占着 XX 不 XX,同时有大量线程在竞争一个锁,会导致获取锁的时间很长,线程自旋的消耗大于线程阻塞挂起操作的消耗,其它需要 cup 的线程又不能获取到 cpu,造成 cpu 的浪费。所以这种情况下我们要关闭自旋锁;

自旋锁时间阈值(1.6 引入了适应性自旋锁)
自旋锁的目的是为了占着 CPU 的资源不释放,等到获取到锁立即进行处理。但是如何去选择自旋的执行时间呢?如果自旋执行时间太长,会有大量的线程处于自旋状态占用 CPU 资源,进而会影响整体系统的性能。因此自旋的周期选的额外重要!

JVM 对于自旋周期的选择,jdk1.5 这个限度是一定的写死的,在 1.6 引入了适应性自旋锁,适应性自旋锁意味着自旋的时间不在是固定的了,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定,基本认为一个线程上下文切换的时间是最佳的一个时间,同时 JVM 还针对当前 CPU 的负荷情况做了较多的优化,如果平均负载小于 CPUs 则一直自旋,如果有超过(CPUs/2)个线程正在自旋,则后来线程直接阻塞,如果正在自旋的线程发现 Owner 发生了变化则延迟自旋时间(自旋计数)或进入阻塞,如果 CPU 处于节电模式则停止自旋,自旋时间的最坏情况是 CPU的存储延迟(CPU A 存储了一个数据,到 CPU B 得知这个数据直接的时间差),自旋时会适当放弃线程优先级之间的差异。

自适应自旋锁:对自旋锁的优化,具备自适应策略,也就是自旋的时间是根据上一次自旋的时间动态的改变的。比如上一次我自旋了5次获取到锁资源,下回我就自旋3次;上回自旋5次获取不到资源,下回我自旋15次。

自旋锁的开启
JDK1.6 中-XX:+UseSpinning 开启;
-XX:PreBlockSpin=10 为自旋次数;
JDK1.7 后,去掉此参数,由JVM 控制;

  1. /**
  2. * 自旋锁
  3. */
  4. public class SpinlockDemo {
  5. // int 0
  6. // Thread null
  7. AtomicReference<Thread> atomicReference = new AtomicReference<>();
  8. // 加锁
  9. public void myLock(){
  10. Thread thread = Thread.currentThread();
  11. System.out.println(Thread.currentThread().getName() + "==> mylock");
  12. // 自旋锁
  13. while (!atomicReference.compareAndSet(null,thread)){
  14. }
  15. }
  16. // 解锁
  17. // 加锁
  18. public void myUnLock(){
  19. Thread thread = Thread.currentThread();
  20. System.out.println(Thread.currentThread().getName() + "==> myUnlock");
  21. atomicReference.compareAndSet(thread,null);
  22. }
  23. }
  24. class TestSpinLock {
  25. public static void main(String[] args) throws InterruptedException {
  26. // ReentrantLock reentrantLock = new ReentrantLock();
  27. // reentrantLock.lock();
  28. // reentrantLock.unlock();
  29. // 底层使用的自旋锁CAS
  30. SpinlockDemo lock = new SpinlockDemo();
  31. new Thread(()-> { lock.myLock();
  32. try {
  33. TimeUnit.SECONDS.sleep(5);
  34. } catch (Exception e) { e.printStackTrace();
  35. } finally {
  36. lock.myUnLock();
  37. }
  38. },"T1").start();
  39. TimeUnit.SECONDS.sleep(1);
  40. new Thread(()-> { lock.myLock();
  41. try {
  42. TimeUnit.SECONDS.sleep(1);
  43. } catch (Exception e) { e.printStackTrace();
  44. } finally {
  45. lock.myUnLock();
  46. }
  47. },"T2").start();
  48. }
  49. }
  50. T1==> mylock
  51. T2==> mylock
  52. T1==> myUnlock
  53. T2==> myUnlock

Synchronized 同步锁

当多个线程同时访问同一个数据时,很容易出现问题。为了避免这种情况出现,我们要保证线程同步互斥,就是指并发执行的多个线程,在同一时间内只允许一个线程访问共享数据。 Java 中可以使用 synchronized 关键字来取得一个对象的同步锁。

synchronized 它可以把任意一个非 NULL 的对象当作锁。他属于独占式的悲观锁,同时属于可重入锁,属于非公平锁。

Synchronized 作用范围

  1. 作用于非静态方法时,锁住的是对象的实例(this);
  2. 作用于静态方法时,锁住的是Class实例,又因为Class的相关数据存储在永久带PermGen(jdk1.8 则是metaspace,元空间也就是方法区),永久带是全局共享的,因此静态方法锁相当于类的一个全局锁,会锁所有调用该方法的线程;
  3. synchronized 作用于一个对象实例时,锁住的是所有以该对象为锁的代码块。它有多个队列,当多个线程一起访问某个对象监视器的时候,对象监视器会将这些线程存储在不同的容器中。

使用同步代码块解决实现Runnable接口的方式创建多线程的线程安全问题中,可以考虑使用this充当同步监视器。使用同步代码块解决继承Thread类的方式创建多线程的线程安全问题中,慎用this充当同步监视器,可以考虑使用当前类.class充当同步监视器。

同步方法仍然涉及到同步监视器,只是不需要我们显示的声明。非静态的同步方法的同步监视器是:this。静态的同步方法的同步监视器是:当前类本身(类.class,即Class实例)。所以这时特别注意当处理继承Thread类的线程安全问题时,同步监视器不能是this,而应该是当前类.class,所以此时要把同步方法改为静态的。

Synchronized 核心组件
1) Wait Set:哪些调用 wait 方法被阻塞的线程被放置在这里;
2) Contention List:竞争队列,所有请求锁的线程首先被放在这个竞争队列中;
3) Entry List:Contention List 中那些有资格成为候选资源的线程被移动到 Entry List 中;
4) OnDeck:任意时刻,最多只有一个线程正在竞争锁资源,该线程被成为 OnDeck;
5) Owner:当前已经获取到所资源的线程被称为 Owner;
6) !Owner:当前释放锁的线程。

Synchronized 实现
image.png

  1. JVM 每次从队列的尾部取出一个数据用于锁竞争候选者(OnDeck),但是并发情况下,ContentionList 会被大量的并发线程进行 CAS 访问,为了降低对尾部元素的竞争,JVM 会将一部分线程移动到 EntryList 中作为候选竞争线程。
  2. Owner 线程会在 unlock 时,将 ContentionList 中的部分线程迁移到 EntryList 中,并指定EntryList 中的某个线程为 OnDeck 线程(一般是最先进去的那个线程)。
  3. Owner 线程并不直接把锁传递给 OnDeck 线程,而是把锁竞争的权利交给 OnDeck,OnDeck 需要重新竞争锁。这样虽然牺牲了一些公平性,但是能极大的提升系统的吞吐量,在JVM 中,也把这种选择行为称之为“竞争切换”。
  4. OnDeck 线程获取到锁资源后会变为 Owner 线程,而没有得到锁资源的仍然停留在 EntryList中。如果Owner 线程被 wait 方法阻塞,则转移到 WaitSet 队列中,直到某个时刻通过 notify或者 notifyAll 唤醒,会重新进去 EntryList 中。WaitSet阻塞的线程所在的地方,被唤醒进入EntryList队列,重新竞争锁资源。
  5. 处于 ContentionList、EntryList、WaitSet 中的线程都处于阻塞状态,该阻塞是由操作系统来完成的(Linux 内核下采用 pthread_mutex_lock 内核函数实现的)。
  6. Synchronized 是非公平锁(所有线程进入队列前都会自旋尝试直接获取锁 )。Synchronized 在线程进入 ContentionList 时,等待的线程会先尝试自旋获取锁,如果获取不到就进入 ContentionList,这明显对于已经进入队列的线程是不公平的,还有一个不公平的是自旋获取锁的线程还可能直接抢占 OnDeck 线程的锁资源。参考
  7. 每个对象都有个 monitor 对象,加锁就是在竞争 monitor 对象(在Java的设计中,每一个对象自打娘胎里出来,就带了一把看不见的锁,通常叫内部锁或Monitor锁),代码块加锁是在前后分别加上 monitorenter 和 monitorexit 指令来实现的,方法加锁是通过一个标记位来判断的。
  8. synchronized 是一个重量级操作,需要调用操作系统相关接口,性能是低效的,有可能给线程加锁消耗的时间比有用操作消耗的时间更多。
  9. Java1.6,synchronized 进行了很多的优化,有适应自旋、锁消除、锁粗化、轻量级锁及偏向锁等,效率有了本质上的提高。在之后推出的 Java1.7 与 1.8 中,均对该关键字的实现机理做了优化。引入了偏向锁和轻量级锁。都是在对象头中有标记位,不需要经过操作系统加锁。
  10. 锁可以从偏向锁升级到轻量级锁,再升级到重量级锁。这种升级过程叫做锁膨胀;
  11. JDK 1.6 中默认是开启偏向锁和轻量级锁,可以通过-XX:-UseBiasedLocking 来禁用偏向锁。
  1. public class ChongRu {
  2. // Synchronized public class Demo01 {
  3. public static void main(String[] args) { Phone phone = new Phone();
  4. new Thread(()->{
  5. phone.sms();
  6. },"A").start();
  7. new Thread(()->{
  8. phone.sms();
  9. },"B").start();
  10. }
  11. }
  12. class Phone{
  13. public synchronized void sms(){
  14. System.out.println(Thread.currentThread().getName() + "sms");
  15. call(); // 这里也有锁
  16. }
  17. public synchronized void call(){
  18. System.out.println(Thread.currentThread().getName() + "call");
  19. }
  20. }
  21. Asms
  22. Acall
  23. Bsms
  24. Bcall

ReentrantLock 可重入锁

ReentrantLock 类实现接口 Lock 并实现了接口中定义的方法,它是一种可重入锁,除了能完成 synchronized 所能完成的所有工作外,还提供了诸如可响应中断锁、可轮询锁请求、定时锁等避免多线程死锁的方法。

Lock 接口的主要方法

  1. void lock(): 执行此方法时, 如果锁处于空闲状态, 当前线程将获取到锁. 相反, 如果锁已经被其他线程持有, 将禁用当前线程, 直到当前线程获取到锁.
  2. boolean tryLock():如果锁可用, 则获取锁, 并立即返回 true, 否则返回 false. 该方法和lock()的区别在于, tryLock()只是”试图”获取锁, 如果锁不可用, 不会导致当前线程被禁用,当前线程仍然继续往下执行代码. 而 lock()方法则是一定要获取到锁, 如果锁不可用, 就一直等待, 在未获得锁之前,当前线程并不继续向下执行.
  3. void unlock():执行此方法时, 当前线程将释放持有的锁. 锁只能由持有者释放, 如果线程并不持有锁, 却执行该方法, 可能导致异常的发生.
  4. Condition newCondition():条件对象,获取等待通知组件。该组件和当前的锁绑定,当前线程只有获取了锁,才能调用该组件的 await()方法,而调用后,当前线程将缩放锁。
  5. getHoldCount() :查询当前线程保持此锁的次数,也就是执行此线程执行 lock 方法的次数。
  6. getQueueLength():返回正等待获取此锁的线程估计数,比如启动 10 个线程,1 个线程获得锁,此时返回的是 9
  7. getWaitQueueLength:(Condition condition)返回等待与此锁相关的给定条件的线程估计数。比如 10 个线程,用同一个 condition 对象,并且此时这 10 个线程都执行了condition 对象的 await 方法,那么此时执行此方法返回 10
  8. hasWaiters(Condition condition):查询是否有线程等待与此锁有关的给定条件(condition),对于指定 contidion 对象,有多少线程执行了 condition.await 方法
  9. hasQueuedThread(Thread thread):查询给定线程是否等待获取此锁
  10. hasQueuedThreads():是否有线程等待此锁
  11. isFair():该锁是否公平锁
  12. isHeldByCurrentThread(): 当前线程是否保持锁锁定,线程的执行 lock 方法的前后分别是 false 和 true
  13. isLock():此锁是否有任意线程占用
  14. lockInterruptibly():如果当前线程未被中断,获取锁
  15. tryLock():尝试获得锁,仅在调用时锁未被线程占用,获得锁
  16. tryLock(long timeout TimeUnit unit):如果锁在给定等待时间内没有被另一个线程保持,则获取该锁。

ReentrantLock 实现

  1. public class MyService {
  2. private Lock lock = new ReentrantLock();//默认是非公平锁
  3. //Lock lock=new ReentrantLock(true);//公平锁
  4. //Lock lock=new ReentrantLock(false);//非公平锁
  5. private Condition condition=lock.newCondition();//创建 Condition
  6. public void testMethod() {
  7. try {
  8. lock.lock();//lock 加锁
  9. //1. wait方法等待:Condition类的awiat方法和Object类的wait方法等效
  10. //System.out.println("开始 wait");
  11. condition.await();
  12. // 通过创建Condition对象来使线程wait,必须先执行lock.lock方法获得锁
  13. //2. signal方法唤醒:Condition类的signal方法和Object类的notify方法等效
  14. condition.signal();//condition对象的signal方法可以唤醒wait线程
  15. for (int i = 0; i < 5; i++) {
  16. System.out.println("ThreadName=" + Thread.currentThread().getName()+ (" " + (i + 1)));
  17. }
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. finally
  22. {
  23. lock.unlock();
  24. }
  25. }
  26. }

Condition 类和 Object 类锁方法区别

  1. Condition 类的 awiat 方法和 Object 类的 wait 方法等效。但是只有当前线程获取了锁,即调用了ReentrantLock 类对象的lock()方法以后,才能调用Condition类的 await()方法。
  2. Condition 类的 signal 方法和 Object 类的 notify 方法等效
  3. Condition 类的 signalAll 方法和 Object 类的 notifyAll 方法等效
  4. ReentrantLock 类可以唤醒指定条件的线程,而 object 的唤醒是随机的

tryLock 和 lock 和 lockInterruptibly 的区别

  1. tryLock 能获得锁就返回 true,不能就立即返回 false,tryLock(long timeout,TimeUnit unit),可以增加时间限制,如果超过该时间段还没获得锁,返回 false
  2. lock 能获得锁就返回 true,不能的话一直等待获得锁
  3. lock 和 lockInterruptibly,如果两个线程分别执行这两个方法,但此时中断这两个线程,lock 不会抛出异常,而 lockInterruptibly 会抛出异常。如果当前线程未被中断,则都获取锁。

    ReentrantLock 与 synchronized

    synchronized:重量级锁、独占锁、互斥锁、悲观锁、非公平锁。释放锁的过程依赖于JVM。
    renntrantlock:重量级锁、独占锁、互斥锁、悲观锁、默认非公平锁(可以指定为公平锁)。 释放锁的过程由自己手动在final中实现,可使用Thread.interrupt()方法中断

  4. ReentrantLock 通过方法 lock()与 unlock()来进行加锁与解锁操作,与 synchronized 会被JVM 自动解锁机制不同,ReentrantLock 加锁后需要手动进行解锁。为了避免程序出现异常而无法正常解锁的情况,使用 ReentrantLock 必须在 finally 控制块中进行解锁操作。

  5. ReentrantLock 相比 synchronized 的优势是可中断、公平锁、多个锁。这种情况下需要使用 ReentrantLock。
  6. synchronized功能单一,某些大并发场景下优化手段有限,ReentrantLock功能相对多一些,可以针对性进行一些优化。比如用tryLock加超时机制来避免大面积、长时间的线程阻塞等。
  7. 只是重量级锁的情况下ReentrantLock要比Synchronized好一点点,比较ReentrantLock重量级锁的源码里面给了一个类似虚拟头结点的这个设计确实是ReentrantLock的精髓代码了。

共同点

  1. 都是用来协调多线程对共享对象、变量的访问
  2. 都是可重入锁,同一线程可以多次获得同一个锁
  3. 都保证了可见性和互斥性

不同点

  1. ReentrantLock 显示的获得、释放锁,synchronized 隐式获得释放锁
  2. ReentrantLock 可响应中断、可轮回,synchronized 是不可以响应中断的,为处理锁的不可用性提供了更高的灵活性
  3. ReentrantLock 是 API 级别的,synchronized 是 JVM 级别的
  4. ReentrantLock 可以实现公平锁
  5. ReentrantLock 通过 Condition 可以绑定多个条件
  6. 底层实现不一样, synchronized 是同步阻塞,使用的是悲观并发策略,lock 是同步非阻塞,采用的是乐观并发策略
  7. Lock 是一个接口,而 synchronized 是 Java 中的关键字,synchronized 是内置的语言实现。
  8. synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而 Lock 在发生异常时,如果没有主动通过 unLock()去释放锁,则很可能造成死锁现象,因此使用 Lock 时需要在 finally 块中释放锁。
  9. Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用 synchronized 时,等待的线程会一直等待下去,不能够响应中断。
  10. 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。
  11. Lock 可以提高多个线程进行读操作的效率,既就是实现读写锁等。

参考

非公平锁、公平锁

非公平锁 NonFair
JVM 按随机、就近原则分配锁的机制则称为不公平锁。每个线程过来,都会先自旋尝试直接抢占锁资源,抢不到,再进入竞争队列当中去排队。即,加锁时不考虑排队等待问题,直接尝试获取锁,获取不到自动到队尾等待。

ReentrantLock 在构造函数中提供了是否公平锁的初始化方式,默认为非公平锁。非公平锁实际执行的效率要远远超出公平锁,除非程序有特殊需要,否则最常用非公平锁的分配机制。

公平锁 Fair
公平锁指的是锁的分配机制是公平的,通常先对锁提出获取请求的线程会先被分配到锁。即,加锁前检查是否有排队等待的线程,优先排队等待的线程,先来先得。

ReentrantLock 默认是非公平锁,同时在构造函数中提供了是否为公平锁的初始化方式,可以通过传入true,来定义公平锁。

总结

  1. 非公平锁性能比公平锁高 5~10 倍,因为公平锁需要在多核的情况下维护一个队列
  2. Java 中的 synchronized 是非公平锁,ReentrantLock 默认的 lock()方法采用的是非公平锁,同时可以通过构造函数来设置是否公平锁。 ```java public ReentrantLock() {
    sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
  1. <a name="ZW4fO"></a>
  2. ## 可重入锁(递归锁)
  3. 本文里面讲的是广义上的可重入锁,而不是单指 JAVA 下的 ReentrantLock。可重入锁,也叫做递归锁,指的是同一线程外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。就是第一个方法已经获得了锁,里面的递归方法也是同一种锁,那么第二个方法的锁,这个线程不用竞争,直接进入。
  4. 在 JAVA 环境下ReentrantLock 和 synchronized 都是可重入锁。
  5. ```java
  6. public class Test implements Runnable{
  7. public synchronized void get(){//synchronized
  8. System.out.println(Thread.currentThread().getId());
  9. set();// 那么第二个方法的锁,这个线程不用竞争,直接进入
  10. }
  11. public synchronized void set(){//synchronized
  12. System.out.println(Thread.currentThread().getId());
  13. }
  14. @Override
  15. public void run() {
  16. get();// 可重入锁,就是第一个方法已经获得了锁,里面的递归方法也是同一种锁,
  17. }
  18. public static void main(String[] args) {
  19. Test ss=new Test();
  20. new Thread(ss).start();
  21. new Thread(ss).start();
  22. new Thread(ss).start();
  23. }
  24. }

ReadWriteLock 读写锁

为了提高性能,Java 提供了读写锁,在读的地方使用读锁,在写的地方使用写锁,灵活控制,如果没有写锁的情况下,读是无阻塞的,在一定程度上提高了程序的执行效率。读写锁分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由 jvm 自己控制的,你只要上好相应的锁即可。

读锁
如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁

写锁
如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁。

ReentrantReadWriteLock
Java 中读写锁有个接口 java.util.concurrent.locks.ReadWriteLock ,也有具体的实现ReentrantReadWriteLock。

共享锁和独占锁

JAVA并发包提供的加锁模式分为独占锁和共享锁。

独占锁:效率低下的,因为其他线程会等待。synchronized、RenntrantLock、写锁。
独占锁模式下,每次只能有一个线程能持有锁,ReentrantLock 就是以独占方式实现的互斥锁。独占锁是一种悲观保守的加锁策略,它避免了读/读冲突,如果某个只读线程获取锁,则其他读线程都只能等待,这种情况下就限制了不必要的并发性,因为读操作并不会影响数据的一致性。

共享锁:
共享锁则允许多个线程同时获取锁,并发访问 共享资源,如:ReadWriteLock。共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。

  1. AQS 的内部类 Node 定义了两个常量 SHARED 和 EXCLUSIVE,他们分别标识 AQS 队列中等待线程的锁获取模式。
  2. JAVA 的并发包中提供了 ReadWriteLock,读-写锁。它允许一个资源可以被多个读操作访问,或者被一个写操作访问,但两者不能同时进行。

    重量级锁、轻量级锁、偏向锁、锁升级

    锁的状态总共有四种:无锁状态、偏向锁、轻量级锁和重量级锁。

重量级锁(Mutex Lock)
Synchronized、RenntrantLock

Synchronized 是通过对象内部的一个叫做监视器锁(monitor)来实现的。但是监视器锁本质又是依赖于底层的操作系统的 Mutex Lock 来实现的。而操作系统实现线程之间的切换这就需要从用户态转换到核心态,这个成本非常高,状态之间的转换需要相对比较长的时间,这就是为什么Synchronized 效率低的原因。因此,这种依赖于操作系统 Mutex Lock 所实现的锁我们称之为重量级锁。JDK 中对 Synchronized 做的种种优化,其核心都是为了减少这种重量级锁的使用。JDK1.6 以后,为了减少获得锁和释放锁所带来的性能消耗,提高性能,引入了轻量级锁和偏向锁。

操作系统实现线程之间的切换需要从用户态转换到核心态,可以理解为线程的运行在用户态,权限较低;线程的切换,需要在核心态,获取更高的权限才可以。这种切换需要依赖操作系统,所以往往重量级锁线程之间的切换消耗大于操作代码的消耗,这也是synchronized为什么效率低下的原因。

锁升级
随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁(但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级)。

比如一段代码,没加锁就是无锁状态,加了锁但是只有一个线程执行代码,他是偏向锁,又进来一个线程,两个线程交替进行,偏向锁就成了轻量级锁,多个线程进入开始竞争锁资源,轻量级锁膨胀成重量级锁。可以说所有的锁都有这么一个过程。可以优化禁用偏向锁等。

轻量级锁
“轻量级”是相对于使用操作系统互斥量来实现的传统锁而言的。但是,首先需要强调一点的是,轻量级锁并不是用来代替重量级锁的,它的本意是在没有多线程竞争的前提下,减少传统的重量级锁使用产生的性能消耗。在解释轻量级锁的执行过程之前,先明白一点,轻量级锁所适应的场景是线程交替执行同步块的情况,如果存在同一时间访问同一锁的情况,就会导致轻量级锁膨胀为重量级锁。

偏向锁
Hotspot 的作者经过以往的研究发现大多数情况下锁不仅不存在多线程竞争,而且总是由同一线程多次获得。偏向锁的目的是在某个线程获得锁之后,消除这个线程锁重入(CAS)的开销,看起来让这个线程得到了偏护。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次 CAS 原子指令,而偏向锁只需要在置换ThreadID 的时候依赖一次 CAS 原子指令(由于一旦出现多线程竞争的情况就必须撤销偏向锁,所以偏向锁的撤销操作的性能损耗必须小于节省下来的 CAS 原子指令的性能消耗)。上面说过,轻量级锁是为了在线程交替执行同步块时提高性能,而偏向锁则是在只有一个线程执行同步块时进一步提高性能。

分段锁

分段锁也并非一种实际的锁,而是一种思想 ConcurrentHashMap 是学习分段锁的最好实践。

锁优化

减少锁持有时间
只用在有线程安全要求的程序上加锁。

减小锁粒度
将大对象(这个对象可能会被很多线程访问),拆成小对象,大大增加并行度,降低锁竞争。降低了锁的竞争,偏向锁,轻量级锁成功率才会提高。最最典型的减小锁粒度的案例就是ConcurrentHashMap。

锁分离
最常见的锁分离就是读写锁 ReadWriteLock,根据功能进行分离成读锁和写锁,这样读读不互斥,读写互斥,写写互斥,即保证了线程安全,又提高了性能,具体也请查看[高并发 Java 五] JDK 并发包 1。读写分离思想可以延伸,只要操作互不影响,锁就可以分离。比如LinkedBlockingQueue 从头部取出,从尾部放数据。

锁粗化
通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽量短,即在使用完公共资源后,应该立即释放锁。但是,凡事都有一个度,如果对同一个锁不停的进行请求、同步和释放,其本身也会消耗系统宝贵的资源,反而不利于性能的优化 。

锁消除
锁消除是在编译器级别的事情。在即时编译器时,如果发现不可能被共享的对象,则可以消除这些对象的锁操作,多数是因为程序员编码不规范引起。
参考:https://www.jianshu.com/p/39628e1180a9

数据库锁

1、行级锁:写sql语句时都会自动加上行级锁,慢、冲突少
行级锁是一种排他锁,防止其他事务修改此行;在使用以下语句时,Oracle 会自动应用行级锁:

  • INSERT、UPDATE、DELETE、SELECT … FOR UPDATE [OF columns] [WAIT n | NOWAIT];
  • SELECT … FOR UPDATE 语句允许用户一次锁定多条记录进行更新
  • 使用 COMMIT 或 ROLLBACK 语句释放锁。

2、表级锁:给表加上锁,有共享读锁、独占写锁(排它锁),快,冲突多
表示对当前操作的整张表加锁,它实现简单,资源消耗较少,被大部分 MySQL 引擎支持。最常使
用的 MYISAM 与 INNODB 都支持表级锁定。表级锁定分为表共享读锁(共享锁)与表独占写锁
(排他锁)。
3、页级锁:介于行级锁和表级锁中间的锁,速度中庸的锁
页级锁是 MySQL 中锁定粒度介于行级锁和表级锁中间的一种锁。表级锁速度快,但冲突多,行级
冲突少,但速度慢。所以取了折衷的页级,一次锁定相邻的一组记录。BDB 支持页级锁

死锁

何为死锁,就是多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。

10. ConcurrentHashMap 并发

减小锁粒度
减小锁粒度是指缩小锁定对象的范围,从而减小锁冲突的可能性,从而提高系统的并发能力。减小锁粒度是一种削弱多线程锁竞争的有效手段,这种技术典型的应用是 ConcurrentHashMap(高性能的 HashMap)类的实现。对于 HashMap 而言,最重要的两个方法是 get 与 set 方法,如果我们对整个 HashMap 加锁,可以得到线程安全的对象,但是加锁粒度太大。Segment 的大小也被称为 ConcurrentHashMap 的并发度。

ConcurrentHashMap 分段锁
ConcurrentHashMap,它内部细分了若干个小的 HashMap,称之为段(Segment)。默认情况下一个 ConcurrentHashMap 被进一步细分为 16 个段,既就是锁的并发度。如果需要在 ConcurrentHashMap 中添加一个新的表项,并不是将整个 HashMap 加锁,而是首先根据 hashcode 得到该表项应该存放在哪个段中,然后对该段加锁,并完成 put 操作。在多线程环境中,如果多个线程同时进行 put操作,只要被加入的表项不存放在同一个段中,则线程间可以做到真正的并行。

JDK8之前ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成
ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。Segment 是一种可重入锁 ReentrantLock,在 ConcurrentHashMap 里扮演锁的角色,HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组,Segment 的结构和 HashMap类似,是一种数组和链表结构, 一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 守护一个 HashEntry 数组里的元素,当对 HashEntry 数组的数据进行修改时,必须首先获得它对应的 Segment 锁。
image.png

11. CAS:比较并交换-乐观锁机制-锁自旋

概念及特性

CAS(Compare And Swap/Set)比较并交换,CAS 算法的过程是这样:它包含 3 个参数 CAS(V,E,N)。V 表示要更新的变量(内存值),E 表示预期值(旧的),N 表示新值。当且仅当 V 值等于 E 值时,才会将 V 的值设为 N,如果V 值和 E 值不同,则说明已经有其他线程做了更新,则当 前线程什么都不做。最后,CAS 返回当前 V 的真实值。

CAS 操作是抱着乐观的态度进行的(乐观锁),它总是认为自己可以成功完成操作。当多个线程同时 使用 CAS 操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂 起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理, CAS 操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。

确保原子操作的类 AtomicInteger (锁自旋)

JDK1.5 的原子包 J.U.C.atomic 这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM 从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。 相对于对于 synchronized 这种阻塞算法,CAS 是非阻塞算法的一种常见实现。由于一般 CPU 切换时间比 CPU 指令集操作更加长, 所以 J.U.C 在性能上有了很大的提升。

getAndIncrement 采用了 CAS 操作,每次从内存中读取数据然后将此数据和+1 后的结果进行CAS 操作,如果成功就返回结果,否则重试直到成功为止。而 compareAndSet 利用 JNI 来完成CPU 指令的操作。

  1. public class AtomicInteger extends Number implements java.io.Serializable {
  2. private volatile int value;
  3. public final int get() {
  4. return value;
  5. }
  6. public final int getAndIncrement() {
  7. for (;;) { //CAS 自旋,一直尝试,直达成功
  8. int current = get();
  9. int next = current + 1;
  10. if (compareAndSet(current, next))
  11. return current;
  12. }
  13. }
  14. public final boolean compareAndSet(int expect, int update) {
  15. return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  16. }
  17. }

ABA 问题

CAS 会导致“ABA 问题”。CAS 算法实现一个重要前提需要取出内存中某时刻的数据,而在下时 刻比较并替换,那么在这个时间差类会导致数据的变化。

比如说一个线程 one 从内存位置 V 中取出 A,这时候另一个线程 two 也从内存中取出 A,并且 two 进行了一些操作变成了 B,然后 two 又将 V 位置的数据变成 A,这时候线程 one 进行 CAS 操 作发现内存中仍然是 A,然后 one 操作成功。尽管线程 one 的 CAS 操作成功,但是不代表这个过 程就是没有问题的。

部分乐观锁的实现是通过版本号或时间戳的方式来解决 ABA 问题,乐观锁每次在执行数据的修 改操作时,都会带上一个版本号,一旦版本号和数据的版本号一致就可以执行修改操作并对版本 号执行+1 操作,否则就执行失败。因为每次操作的版本号都会随之增加,所以不会出现 ABA 问 题,因为版本号只会增加不会减少。

12. 常用的辅助类

CountDownLatch 减法计数器(线程计数器)

  • 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。
  • 原理:每次有线程调用 countDown(),计数器的计数-1,直到计数变为0,await() 就会被唤醒,继续执行之后的操作。
    • countDownLatch.countDown(); // 数量-1
    • countDownLatch.await(); // 等待计数器归零,然后再向下执行
    • await方法使当前线程等待阻塞,直到由于countDown()方法的调用将计数器的计数减少到到零为止,释放所有等待的线程。
  • 利用它可以实现类似计数器的功能。比如有一个任务 A,它要等待其他 4 个任务执行完毕之后才能执行,此时就可以利用 CountDownLatch来实现这种功能了。

    1. public class CountDownLatchDemo {
    2. public static void main(String[] args) throws InterruptedException {
    3. // 总数是6,必须要执行任务的时候,再使用
    4. CountDownLatch countDownLatch = new CountDownLatch(6);
    5. for (int i = 1; i <=6 ; i++) {
    6. new Thread(()->{
    7. System.out.println(Thread.currentThread().getName()+" Go out");
    8. countDownLatch.countDown(); // countDown() 计数-1
    9. },String.valueOf(i)).start();
    10. }
    11. countDownLatch.await(); // await() 等待计数器归零,就唤醒,再继续向下执行
    12. System.out.println("Close Door");
    13. }
    14. }
    1. final CountDownLatch latch = new CountDownLatch(2);
    2. new Thread(){public void run() {
    3. System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
    4. Thread.sleep(3000);
    5. System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
    6. latch.countDown();
    7. };}.start();
    8. new Thread(){ public void run() {
    9. System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
    10. Thread.sleep(3000);
    11. System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
    12. latch.countDown();
    13. };}.start();
    14. System.out.println("等待 2 个子线程执行完毕...");
    15. latch.await();
    16. System.out.println("2 个子线程已经执行完毕");
    17. System.out.println("继续执行主线程");

    CyclicBarrier 加法计数器(等待至某状态再全部同时执行)

  • 允许一组线程全部等待彼此达到共同屏障点的同步辅助。

  • 通过它可以实现让一组线程等待至某个状态(等待所有子线程已经调用了await)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier 可以被重用。我们暂且把这个状态就叫做barrier,当调用 await()方法之后,线程就处于 barrier 了。CyclicBarrier 中最重要的方法就是 await 方法,它有 2 个重载版本:
    • public int await():用来挂起当前线程,直至所有线程都到达 barrier 状态再同时执行后续任务;
    • public int await(long timeout, TimeUnit unit):让这些线程等待至一定的时间,如果还有线程没有到达 barrier 状态就直接让到达 barrier 的线程执行后续任务。
  • 另外 CyclicBarrier 是可以重用的。

    1. public class CyclicBarrierDemo {
    2. public static void main(String[] args) {
    3. //主线程:召唤龙珠的线程
    4. CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
    5. System.out.println("召唤神龙成功!");
    6. });
    7. for (int i = 1; i <= 7; i++) {
    8. //子线程
    9. final int temp = i;
    10. // Lambda表达式(匿名类)不能访问非final的局部变量:
    11. // 因为实例变量存在堆中,而局部变量是在栈上分配,
    12. // Lambda表达((匿名类)会在另一个线程中执行。
    13. // 如果在线程中要直接访问一个局部变量,可能线程执行时该局部变量已经被销毁了,
    14. // 而 final 类型的局部变量在 Lambda 表达式(匿名类) 中其实是局部变量的一个拷贝
    15. new Thread(()->{
    16. System.out.println(Thread.currentThread().getName()+" 收集了第 {"+ finalI+"} 颗龙珠");
    17. try {
    18. cyclicBarrier.await(); //加法计数,等待
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. } catch (BrokenBarrierException e) {
    22. e.printStackTrace();
    23. }
    24. }).start();
    25. }
    26. }
    27. }
    1. public static void main(String[] args) {
    2. int N = 4;
    3. CyclicBarrier barrier = new CyclicBarrier(N);
    4. for(int i=0;i<N;i++)
    5. new Writer(barrier).start();
    6. }
    7. static class Writer extends Thread{
    8. private CyclicBarrier cyclicBarrier;
    9. public Writer(CyclicBarrier cyclicBarrier) {
    10. this.cyclicBarrier = cyclicBarrier;
    11. }
    12. @Override
    13. public void run() {
    14. try {
    15. Thread.sleep(5000); //以睡眠来模拟线程需要预定写入数据操作
    16. System.out.println("线程"+Thread.currentThread().getName()+"写入数据完
    17. 毕,等待其他线程写入完毕");
    18. cyclicBarrier.await();
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }catch(BrokenBarrierException e){
    22. e.printStackTrace();
    23. }
    24. System.out.println("所有线程写入完毕,继续处理其他任务,比如数据操作");
    25. }
    26. }

    CountDownLatch 和 CyclicBarrier
    CountDownLatch 和 CyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同;CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;而 CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;另外,CountDownLatch 是不能够重用的,而 CyclicBarrier 是可以重用的。

    Semaphore 信号量

    Semaphore 是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。Semaphore 可以用来构建一些对象池,资源池之类的,比如数据库连接池。
    原理:一个计数信号量。 在概念上,信号量维持一组许可证。

  • 通过 acquire() 获取一个许可,如果没有就等待阻塞,直到有许可证可用

  • 每个 release() 添加/释放一个许可证,潜在地释放阻塞获取方

方法:

  • acquire():获得资源,如果资源已经使用完了,就等待资源释放后再进行使用
  • release():释放,会将当前的信号量释放+1,然后唤醒等待的线程

作用: 多个共享资源互斥的使用! 并发限流,控制最大的线程数!

Semaphore 可以控制同时访问的线程个数,通过acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。Semaphore 类中比较重要的几个方法:

  1. public void acquire(): 用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
  2. public void acquire(int permits):获取 permits 个许可
  3. public void release() { } :释放许可。注意,在释放许可之前,必须先获获得许可。
  4. public void release(int permits) { }:释放 permits 个许可

上面 4 个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

  1. public boolean tryAcquire():尝试获取一个许可,若获取成功,则立即返回 true,若获取失败,则立即返回 false
  2. public boolean tryAcquire(long timeout, TimeUnit unit):尝试获取一个许可,若在指定的时间内获取成功,则立即返回 true,否则则立即返回 false
  3. public boolean tryAcquire(int permits):尝试获取 permits 个许可,若获取成功,则立即返回 true,若获取失败,则立即返回 false
  4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit): 尝试获取 permits个许可,若在指定的时间内获取成功,则立即返回 true,否则则立即返回 false
  5. 还可以通过 availablePermits()方法得到可用的许可数目。
    1. public class SemaphoreDemo {
    2. public static void main(String[] args) {
    3. // 线程数量:停车位! 限流!
    4. Semaphore semaphore = new Semaphore(3);
    5. for (int i = 1; i <= 6; i++) {
    6. final int temp = i; //如果Lambda表达式(匿名类)中没有使用这个局部变量i,就不需要先转成final
    7. new Thread(()->{
    8. try {
    9. semaphore.acquire(); //acquire()得到
    10. System.out.println(Thread.currentThread().getName() + ":第" + temp +"辆车占用到了一个车位");
    11. //System.out.println(Thread.currentThread().getName()+" 抢到了车位");
    12. TimeUnit.SECONDS.sleep(2); //停车2s
    13. System.out.println(Thread.currentThread().getName() + ":第" + temp +"辆车离开了一个车位");
    14. } catch (InterruptedException e) {
    15. e.printStackTrace();
    16. }finally {
    17. semaphore.release();//release()释放
    18. }
    19. },String.valueOf(i)).start();
    20. }
    21. }
    22. }
    实现互斥锁(计数器为 1)
    我们也可以创建计数为 1 的 Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。互斥锁:在任何时刻,只让一个线程操作数据,最明显的就是写锁,以及Semaphore通过设置信号量,来限制一个线程来操作数据。
    1. // 创建一个计数阈值为 5 的信号量对象
    2. // 只能 5 个线程同时访问
    3. Semaphore semp = new Semaphore(5);
    4. try { // 申请许可
    5. semp.acquire();
    6. try {
    7. // 业务逻辑
    8. } catch (Exception e) {
    9. } finally {
    10. // 释放许可
    11. semp.release();
    12. }
    13. } catch (InterruptedException e) {
    14. }
    抢机器:若一个工厂有 5 台机器,但是有 8 个工人,一台机器同时只能被一个工人使用,只有使用完
    了,其他工人才能继续使用。那么我们就可以通过 Semaphore 来实现: ```java int N = 8; //工人数 Semaphore semaphore = new Semaphore(5); //机器数目 for(int i=0;i<N;i++)
    1. new Worker(i,semaphore).start();
    }

static class Worker extends Thread{ private int num; private Semaphore semaphore; public Worker(int num,Semaphore semaphore){ this.num = num; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println(“工人”+this.num+”占用一个机器在生产…”); Thread.sleep(2000); System.out.println(“工人”+this.num+”释放出机器”); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }

  1. **Semaphore ReentrantLock**<br />Semaphore 基本能完成 ReentrantLock 的所有工作,使用方法也与之类似,通过 acquire()与release()方法来获得和释放临界资源。经实测,Semaphone.acquire()方法默认为可响应中断锁,与 ReentrantLock.lockInterruptibly()作用效果一致,也就是说在等待临界资源的过程中可以被Thread.interrupt()方法中断。
  2. 此外,Semaphore 也实现了可轮询的锁请求与定时锁的功能,除了方法名 tryAcquire tryLock不同,其使用方法与 ReentrantLock 几乎一致。Semaphore 也提供了公平与非公平锁的机制,也可在构造函数中进行设定。
  3. Semaphore 的锁释放操作也由手动进行,因此与 ReentrantLock 一样,为避免线程因抛出异常而无法正常释放锁的情况发生,释放锁的操作也必须在 finally 代码块中完成。
  4. Semaphore 其实和锁有点类似,它一般用于控制对某组资源的访问权限。
  5. <a name="ygSuK"></a>
  6. # 13. JMM
  7. <a name="oEHGk"></a>
  8. ## volatile关键字(变量可见性、禁止重排序)
  9. Java 语言提供了一种稍弱的同步机制,即 volatile 变量,用来确保将变量的更新操作通知到其他线程。volatile 变量具备两种特性,volatile 变量不会被缓存在寄存器或者对其他处理器不可见的地方,因此在读取 volatile 类型的变量时总会返回最新写入的值。
  10. volatile Java 虚拟机提供**轻量级的同步机制**。<br />作用:volatile 是可以保持可见性,不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生。<br />**1、保证变量可见性**<br />确保将变量的更新操作通知到其他线程:在读取 volatile 类型的变量时总会返回最新写入的值。保证该变量对所有线程可见,这里的可见性是指当一个线程修改了变量的值,那么新的值对于其他线程是可以立即获取的。
  11. ```java
  12. public class JMMDemo01 {
  13. // 如果不加 volatile 程序会死循环
  14. // 加了 volatile 可以保证可见性(子线程对主内存变化的可变性)
  15. private volatile static Integer number = 0;
  16. public static void main(String[] args) {
  17. // main线程
  18. // 子线程1
  19. new Thread(()->{
  20. while (number==0){// 子线程1对主内存的变化不知道的
  21. }
  22. }).start();
  23. try {
  24. TimeUnit.SECONDS.sleep(2);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. number = 1;
  29. System.out.println(number);
  30. }
  31. }

2、不保证原子性

  • 原子性 : 不可分割。线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败

    1. public class VDemo02 {
    2. // 加了 volatile 可以保证可见性,但不保证原子性
    3. private static volatile int number = 0;
    4. public static void add(){
    5. number++; //++ 自增,不是一个原子性操作,是2个~3个操作
    6. }
    7. public static void main(String[] args) {
    8. //理论上number==20000,但实际不是
    9. for (int i = 1; i <= 20; i++) {
    10. new Thread(()->{
    11. for (int j = 1; j <= 1000 ; j++) {
    12. add();
    13. }
    14. }).start();
    15. }
    16. while (Thread.activeCount()>2){// main gc
    17. // Thread.yield()是在主线程中执行的,意思只要还有除了gc和main线程之外的线程在跑
    18. // 主线程就让出cpu不往下执行,导致主线程中的子线程还没有执行完,主线程就停止了。
    19. Thread.yield();
    20. }
    21. System.out.println(Thread.currentThread().getName()+",num="+number);
    22. }
    23. }

    如果不加lock和synchronized ,怎么样保证原子性?

  • 使用JUC下原子包java.util.concurrent.atomic下的原子类,解决原子性问题。

  • 这些类的底层都直接和操作系统挂钩!在内存中修改值!
  • 底层使用了Unsafe类,Unsafe类是一个很特殊的存在!

image.png
确保原子操作的类 AtomicInteger (锁自旋)
首先说明,此处 AtomicInteger ,一个提供原子操作的 Integer 的类,常见的还有AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference 等,他们的实现原理相同,区别在与运算对象类型的不同。令人兴奋地,还可以通过 AtomicReference将一个对象的所有操作转化成原子操作。

我们知道,在多线程程序中,诸如++i 或 i++等运算不具有原子性,是不安全的线程操作之一。通常我们会使用 synchronized 将该操作变成一个原子操作,但 JVM 为此类操作特意提供了一些同步类,使得使用更方便,且使程序运行效率变得更高。通过相关资料显示,通常AtomicInteger的性能是 ReentantLock 的好几倍。

  1. public class VDemo02 {
  2. // 加了 volatile 可以保证可见性,但不保证原子性
  3. // 原子类,解决原子性问题
  4. private volatile static AtomicInteger num = new AtomicInteger();
  5. public static void add(){
  6. // num++; // 不是一个原子性操作
  7. num.getAndIncrement(); // AtomicInteger + 1 方法,底层是CAS保证的原子性
  8. }
  9. public static void main(String[] args) {
  10. //理论和实际上 num==2 万
  11. for (int i = 1; i <= 20; i++) {
  12. new Thread(()->{
  13. for (int j = 0; j < 1000 ; j++) {
  14. add();
  15. }
  16. }).start();
  17. }
  18. while (Thread.activeCount()>2){ // main gc
  19. Thread.yield();
  20. }
  21. System.out.println(Thread.currentThread().getName() + " " + num);
  22. }
  23. }

3、禁止指令重排

  • 我们写的程序,计算机并不是按照我们自己写的那样去执行的
  • 源代码–>编译器优化重排–>指令并行也可能会重排–>内存系统也会重排–>执行
  • 处理器在进行指令重排的时候,会考虑数据之间的依赖性!
  • volatile 可以禁止了指令重排。
  • 内存屏障。CPU指令。作用:
    • 保证特定的操作的执行顺序!
    • 保证某些变量的内存可见性 (利用这些特性volatile实现了可见性)

image.png
比 sychronized 更轻量级的同步锁
在访问 volatile 变量时不会执行加锁操作,因此也就不会使执行线程阻塞,故 volatile 是一种比 sychronized 关键字更轻量级的同步机制。volatile 适合这种场景:一个变量被多个线程共享,线程直接给这个变量赋值。
image.png
当对非 volatile 变量进行读写的时候,每个线程先从内存拷贝变量到 CPU 缓存中。如果计算机有多个 CPU,每个线程可能在不同的 CPU 上被处理,这意味着每个线程可以拷贝到不同的 CPU cache 中。而声明变量是 volatile 的,JVM 保证了每次读变量都从内存中读,跳过 CPU cache 这一步。

适用场景
值得说明的是对 volatile 变量的单次读/写操作可以保证原子性的,如 long 和 double 类型变量,但是并不能保证 i++(非原子操作,至少2步)这种操作的原子性,因为本质上 i++是读、写两次操作。在某些场景下可以代替 Synchronized。但是,volatile 的不能完全取代 Synchronized 的位置,只有在一些特殊的场景下,才能适用 volatile。总的来说,必须同时满足下面两个条件才能保证在并发环境的线程安全:
(1)对变量的写操作不依赖于当前值(比如 i++),或者说是单纯的变量赋值(boolean flag = true)。
(2)该变量没有包含在具有其他变量的不变式中,也就是说,不同的 volatile 变量之间,不能互相依赖。只有在状态真正独立于程序内其他内容时才能使用 volatile。

JMM

  • JMM:Java内存模型。不存在的东西,概念!约定!!
  • 关于JMM的一些同步的约定:

1、线程解锁前,必须把共享变量立刻刷回主存。
2、线程加锁前,必须读取主存中的最新值到工作内存中!
3、加锁和解锁是同一把锁。

14. AQS(抽象的队列同步器)

AbstractQueuedSynchronizer 类如其名,抽象的队列式的同步器,AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。
image.png
它维护了一个 volatile int state(代表共享资源)和一个 FIFO 线程等待队列(多线程争用资源被阻塞时会进入此队列)。state 的访问方式有三种:getState()、setState()、compareAndSetState()。

AQS 定义两种资源共享方式
1.Exclusive 独占资源-ReentrantLock
Exclusive(独占,只有一个线程能执行,如 ReentrantLock)

2.Share 共享资源-Semaphore/CountDownLatch
Share(共享,多个线程可同时执行,如 Semaphore/CountDownLatch)。
AQS 只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现,AQS 这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过 state 的 get/set/CAS)之所以没有定义成abstract ,是因为独占模式下只用实现 tryAcquire-tryRelease ,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS 已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  1. isHeldExclusively():该线程是否正在独占资源。只有用到 condition 才需要去实现它。
  2. tryAcquire(int):独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
  3. tryRelease(int):独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
  4. tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  5. tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回
  6. true,否则返回 false。

同步器的实现是 ABS 核心(state 资源状态计数)
同步器的实现是 ABS 核心,以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程lock()时,会调用 tryAcquire()独占该锁并将 state+1。此后,其他线程再 tryAcquire()时就会失败,直到 A 线程 unlock()到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。

以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown()一次,state会 CAS 减 1。等到所有子线程都执行完后(即 state=0),会 unpark()主调用线程,然后主调用线程就会从 await()函数返回,继续后余动作。

ReentrantReadWriteLock 实现独占和共享两种方式
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现 tryAcquire-tryReleasetry、AcquireShared-tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock。