阅读前带着这些问题区阅读

  • 多线程的出现是要解决什么问题的?
  • 线程不安全是指什么? 举例说明
  • 并发出现线程不安全的本质什么? 可见性,原子性和有序性。
  • Java是怎么解决并发问题的? 3个关键字,JMM和8个Happens-Before
  • 线程安全是不是非真即假? 不是
  • 线程安全有哪些实现思路?
  • 如何理解并发和并行的区别?

    为什么需要多线程

    众所周知,CPU、内存、I/O 设备的速度是有极大差异的,为了合理利用 CPU 的高性能,平衡这三者的速度差异,计算机体系结构、操作系统、编译程序都做出了贡献,主要体现为:

  • CPU 增加了缓存,以均衡与内存的速度差异;// 导致 可见性问题

  • 操作系统增加了进程、线程,以分时复用 CPU,进而均衡 CPU 与 I/O 设备的速度差异;// 导致 原子性问题
  • 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。// 导致 有序性问题

    Java多线程

多线程是Java最基本的一种并发模型,本章我们将详细介绍Java多线程编程。

Java多线程 - 图1

多线程基础

现代操作系统(Windows,macOS,Linux)都可以执行多任务。多任务就是同时运行多个任务,例如:

Java多线程 - 图2

CPU执行代码都是一条一条顺序执行的,但是,即使是单核cpu,也可以同时运行多个任务。因为操作系统执行多任务实际上就是让CPU对多个任务轮流交替执行。

例如,假设我们有语文、数学、英语3门作业要做,每个作业需要30分钟。我们把这3门作业看成是3个任务,可以做1分钟语文作业,再做1分钟数学作业,再做1分钟英语作业:

Java多线程 - 图3

这样轮流做下去,在某些人眼里看来,做作业的速度就非常快,看上去就像同时在做3门作业一样

Java多线程 - 图4

类似的,操作系统轮流让多个任务交替执行,例如,让浏览器执行0.001秒,让QQ执行0.001秒,再让音乐播放器执行0.001秒,在人看来,CPU就是在同时执行多个任务。

即使是多核CPU,因为通常任务的数量远远多于CPU的核数,所以任务也是交替执行的。

进程
在计算机中,我们把一个任务称为一个进程,浏览器就是一个进程,视频播放器是另一个进程,类似的,音乐播放器和Word都是进程。

某些进程内部还需要同时执行多个子任务。例如,我们在使用Word时,Word可以让我们一边打字,一边进行拼写检查,同时还可以在后台进行打印,我们把子任务称为线程。

进程和线程的关系就是:一个进程可以包含一个或多个线程,但至少会有一个线程。

  1. ┌──────────┐
  2. Process
  3. │┌────────┐│
  4. ┌──────────┐││ Thread ││┌──────────┐
  5. Process ││└────────┘││Process
  6. │┌────────┐││┌────────┐││┌────────┐│
  7. ┌──────────┐││ Thread ││││ Thread ││││ Thread ││
  8. Process ││└────────┘││└────────┘││└────────┘│
  9. │┌────────┐││┌────────┐││┌────────┐││┌────────┐│
  10. ││ Thread ││││ Thread ││││ Thread ││││ Thread ││
  11. │└────────┘││└────────┘││└────────┘││└────────┘│
  12. └──────────┘└──────────┘└──────────┘└──────────┘
  13. ┌──────────────────────────────────────────────┐
  14. Operating System
  15. └──────────────────────────────────────────────┘

操作系统调度的最小任务单位其实不是进程,而是线程。常用的Windows、Linux等操作系统都采用抢占式多任务,如何调度线程完全由操作系统决定,程序自己不能决定什么时候执行,以及执行多长时间。

因为同一个应用程序,既可以有多个进程,也可以有多个线程,因此,实现多任务的方法,有以下几种:

多进程模式(每个进程只有一个线程):

  1. ┌──────────┐ ┌──────────┐ ┌──────────┐
  2. Process Process Process
  3. │┌────────┐│ │┌────────┐│ │┌────────┐│
  4. ││ Thread ││ ││ Thread ││ ││ Thread ││
  5. │└────────┘│ │└────────┘│ │└────────┘│
  6. └──────────┘ └──────────┘ └──────────┘

多线程模式(一个进程有多个线程):

  1. ┌────────────────────┐
  2. Process
  3. │┌────────┐┌────────┐│
  4. ││ Thread ││ Thread ││
  5. │└────────┘└────────┘│
  6. │┌────────┐┌────────┐│
  7. ││ Thread ││ Thread ││
  8. │└────────┘└────────┘│
  9. └────────────────────┘

多进程+多线程模式(复杂度最高):

  1. ┌──────────┐┌──────────┐┌──────────┐
  2. Process ││Process ││Process
  3. │┌────────┐││┌────────┐││┌────────┐│
  4. ││ Thread ││││ Thread ││││ Thread ││
  5. │└────────┘││└────────┘││└────────┘│
  6. │┌────────┐││┌────────┐││┌────────┐│
  7. ││ Thread ││││ Thread ││││ Thread ││
  8. │└────────┘││└────────┘││└────────┘│
  9. └──────────┘└──────────┘└──────────┘

进程 vs 线程

进程和线程是包含关系,但是多任务既可以由多进程实现,也可以由单进程内的多线程实现,还可以混合多进程+多线程。

具体采用哪种方式,要考虑到进程和线程的特点。

和多线程相比,多进程的缺点在于:

  • 创建进程比创建线程开销大,尤其是在Windows系统上;
  • 进程间通信比线程间通信要慢,因为线程间通信就是读写同一个变量,速度很快。

而多进程的优点在于:

多进程稳定性比多线程高,因为在多进程的情况下,一个进程崩溃不会影响其他进程,而在多线程的情况下,任何一个线程崩溃会直接导致整个进程崩溃。

多线程

Java语言内置了多线程支持:一个Java程序实际上是一个JVM进程,JVM进程用一个主线程来执行main()方法,在main()方法内部,我们又可以启动多个线程。此外,JVM还有负责垃圾回收的其他工作线程等。

因此,对于大多数Java程序来说,我们说多任务,实际上是说如何使用多线程实现多任务。

和单线程相比,多线程编程的特点在于:多线程经常需要读写共享数据,并且需要同步。例如,播放电影时,就必须由一个线程播放视频,另一个线程播放音频,两个线程需要协调运行,否则画面和声音就不同步。因此,多线程编程的复杂度高,调试更困难。

Java多线程编程的特点又在于:

  • 多线程模型是Java程序最基本的并发模型;
  • 后续读写网络、数据库、Web开发等都依赖Java多线程模型。

因此,必须掌握Java多线程编程才能继续深入学习其他内容。

创建新线程

Java语言内置了多线程支持。当Java程序启动的时候,实际上是启动了一个JVM进程,然后,JVM启动主线程来执行main()方法。在main()方法中,我们又可以启动其他线程。

要创建一个新线程非常容易,我们需要实例化一个Thread实例,然后调用它的start()方法:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) {
  4. Thread t = new Thread();
  5. t.start(); // 启动新线程
  6. }
  7. }

但是这个线程启动后实际上什么也不做就立刻结束了。我们希望新线程能执行指定的代码,有以下几种方法:

方法一:从Thread派生一个自定义类,然后覆写run()方法:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) {
  4. Thread t = new MyThread();
  5. t.start(); // 启动新线程
  6. }
  7. }
  8. class MyThread extends Thread {
  9. @Override
  10. public void run() {
  11. System.out.println("start new thread!");
  12. }
  13. }

执行上述代码,注意到start()方法会在内部自动调用实例的run()方法。

方法二:创建Thread实例时,传入一个Runnable实例:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) {
  4. Thread t = new Thread(new MyRunnable());
  5. t.start(); // 启动新线程
  6. }
  7. }
  8. class MyRunnable implements Runnable {
  9. @Override
  10. public void run() {
  11. System.out.println("start new thread!");
  12. }
  13. }

或者用Java8引入的lambda语法进一步简写为:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) {
  4. Thread t = new Thread(() -> {
  5. System.out.println("start new thread!");
  6. });
  7. t.start(); // 启动新线程
  8. }
  9. }

有童鞋会问,使用线程执行的打印语句,和直接在main()方法执行有区别吗?

区别大了去了。我们看以下代码:

  1. public class Main {
  2. public static void main(String[] args) {
  3. System.out.println("main start...");
  4. Thread t = new Thread() {
  5. public void run() {
  6. System.out.println("thread run...");
  7. System.out.println("thread end.");
  8. }
  9. };
  10. t.start();
  11. System.out.println("main end...");
  12. }
  13. }

我们用蓝色表示主线程,也就是main线程,main线程执行的代码有4行,首先打印main start,然后创建Thread对象,紧接着调用start()启动新线程。当start()方法被调用时,JVM就创建了一个新线程,我们通过实例变量t来表示这个新线程对象,并开始执行。

接着,main线程继续执行打印main end语句,而t线程在main线程执行的同时会并发执行,打印thread runthread end语句。

run()方法结束时,新线程就结束了。而main()方法结束时,主线程也结束了。

我们再来看线程的执行顺序:

  1. main线程肯定是先打印main start,再打印main end
  2. t线程肯定是先打印thread run,再打印thread end

但是,除了可以肯定,main start会先打印外,main end打印在thread run之前、thread end之后或者之间,都无法确定。因为从t线程开始运行以后,两个线程就开始同时运行了,并且由操作系统调度,程序本身无法确定线程的调度顺序。

要模拟并发执行的效果,我们可以在线程中调用Thread.sleep(),强迫当前线程暂停一段时间:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) {
  4. System.out.println("main start...");
  5. Thread t = new Thread() {
  6. public void run() {
  7. System.out.println("thread run...");
  8. try {
  9. Thread.sleep(10);
  10. } catch (InterruptedException e) {}
  11. System.out.println("thread end.");
  12. }
  13. };
  14. t.start();
  15. try {
  16. Thread.sleep(20);
  17. } catch (InterruptedException e) {}
  18. System.out.println("main end...");
  19. }
  20. }

sleep()传入的参数是毫秒。调整暂停时间的大小,我们可以看到main线程和t线程执行的先后顺序。

要特别注意:直接调用Thread实例的run()方法是无效的:

  1. public class Main {
  2. public static void main(String[] args) {
  3. Thread t = new MyThread();
  4. t.run();
  5. }
  6. }
  7. class MyThread extends Thread {
  8. public void run() {
  9. System.out.println("hello");
  10. }
  11. }

直接调用run()方法,相当于调用了一个普通的Java方法,当前线程并没有任何改变,也不会启动新线程。上述代码实际上是在main()方法内部又调用了run()方法,打印hello语句是在main线程中执行的,没有任何新线程被创建。

必须调用Thread实例的start()方法才能启动新线程,如果我们查看Thread类的源代码,会看到start()方法内部调用了一个private native void start0()方法,native修饰符表示这个方法是由JVM虚拟机内部的C代码实现的,不是由Java代码实现的。

线程的优先级

可以对线程设定优先级,设定优先级的方法是:

  1. Thread.setPriority(int n) // 1~10, 默认值5

本章小结

Java用Thread对象表示一个线程,通过调用start()启动一个新线程;

一个线程对象只能调用一次start()方法;

线程的执行代码写在run()方法中;

线程调度由操作系统决定,程序本身无法决定调度顺序;

Thread.sleep()可以把当前线程暂停一段时间。

线程的状态

在Java程序中,一个线程对象只能调用一次start()方法启动新线程,并在新线程中执行run()方法。一旦run()方法执行完毕,线程就结束了。因此,Java线程的状态有以下几种:

  • New:新创建的线程,尚未执行;
  • Runnable:运行中的线程,正在执行run()方法的Java代码;
  • Blocked:运行中的线程,因为某些操作被阻塞而挂起;
  • Waiting:运行中的线程,因为某些操作在等待中;
  • Timed Waiting:运行中的线程,因为执行sleep()方法正在计时等待;
  • Terminated:线程已终止,因为run()方法执行完毕。

用一个状态转移图表示如下:

  1. ┌─────────────┐
  2. New
  3. └─────────────┘
  4. ┌─────────────┐ ┌─────────────┐
  5. ││ Runnable Blocked ││
  6. └─────────────┘ └─────────────┘
  7. │┌─────────────┐ ┌─────────────┐│
  8. Waiting Timed Waiting
  9. │└─────────────┘ └─────────────┘│
  10. ┌─────────────┐
  11. Terminated
  12. └─────────────┘

当线程启动后,它可以在RunnableBlockedWaitingTimed Waiting这几个状态之间切换,直到最后变成Terminated状态,线程终止。

线程终止的原因有:

  • 线程正常终止:run()方法执行到return语句返回;
  • 线程意外终止:run()方法因为未捕获的异常导致线程终止;
  • 对某个线程的Thread实例调用stop()方法强制终止(强烈不推荐使用)。

一个线程还可以等待另一个线程直到其运行结束。例如,main线程在启动t线程后,可以通过t.join()等待t线程结束后再继续运行:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) throws InterruptedException {
  4. Thread t = new Thread(() -> {
  5. System.out.println("hello");
  6. });
  7. System.out.println("start");
  8. t.start();
  9. t.join();
  10. System.out.println("end");
  11. }
  12. }

main线程对线程对象t调用join()方法时,主线程将等待变量t表示的线程运行结束,即join就是指等待该线程结束,然后才继续往下执行自身线程。所以,上述代码打印顺序可以肯定是main线程先打印startt线程再打印hellomain线程最后再打印end

如果t线程已经结束,对实例t调用join()会立刻返回。此外,join(long)的重载方法也可以指定一个等待时间,超过等待时间后就不再继续等待。

本章小结

Java线程对象Thread的状态包括:NewRunnableBlockedWaitingTimed WaitingTerminated

通过对另一个线程对象调用join()方法可以等待其执行结束;

可以指定等待时间,超过等待时间线程仍然没有结束就不再等待;

对已经运行结束的线程调用join()方法会立刻返回。

中断线程

如果线程需要执行一个长时间任务,就可能需要能中断线程。中断线程就是其他线程给该线程发一个信号,该线程收到信号后结束执行run()方法,使得自身线程能立刻结束运行。

我们举个栗子:假设从网络下载一个100M的文件,如果网速很慢,用户等得不耐烦,就可能在下载过程中点“取消”,这时,程序就需要中断下载线程的执行。

中断一个线程非常简单,只需要在其他线程中对目标线程调用interrupt()方法,目标线程需要反复检测自身状态是否是interrupted状态,如果是,就立刻结束运行。

我们还是看示例代码:

  1. //中断线程
  2. public class Main {
  3. public static void main(String[] args) throws InterruptedException {
  4. Thread t = new MyThread();
  5. t.start();
  6. Thread.sleep(1); // 暂停1毫秒
  7. t.interrupt(); // 中断t线程
  8. t.join(); // 等待t线程结束
  9. System.out.println("end");
  10. }
  11. }
  12. class MyThread extends Thread {
  13. public void run() {
  14. int n = 0;
  15. while (! isInterrupted()) {
  16. n ++;
  17. System.out.println(n + " hello!");
  18. }
  19. }
  20. }

仔细看上述代码,main线程通过调用t.interrupt()方法中断t线程,但是要注意,interrupt()方法仅仅向t线程发出了“中断请求”,至于t线程是否能立刻响应,要看具体代码。而t线程的while循环会检测isInterrupted(),所以上述代码能正确响应interrupt()请求,使得自身立刻结束运行run()方法。

如果线程处于等待状态,例如,t.join()会让main线程进入等待状态,此时,如果对main线程调用interrupt()join()方法会立刻抛出InterruptedException,因此,目标线程只要捕获到join()方法抛出的InterruptedException,就说明有其他线程对其调用了interrupt()方法,通常情况下该线程应该立刻结束运行。

我们来看下面的示例代码:

  1. //中断线程
  2. public class Main {
  3. public static void main(String[] args) throws InterruptedException {
  4. Thread t = new MyThread();
  5. t.start();
  6. Thread.sleep(1000);
  7. t.interrupt(); // 中断t线程
  8. t.join(); // 等待t线程结束
  9. System.out.println("end");
  10. }
  11. }
  12. class MyThread extends Thread {
  13. public void run() {
  14. Thread hello = new HelloThread();
  15. hello.start(); // 启动hello线程
  16. try {
  17. hello.join(); // 等待hello线程结束
  18. } catch (InterruptedException e) {
  19. System.out.println("interrupted!");
  20. }
  21. hello.interrupt();
  22. }
  23. }
  24. class HelloThread extends Thread {
  25. public void run() {
  26. int n = 0;
  27. while (!isInterrupted()) {
  28. n++;
  29. System.out.println(n + " hello!");
  30. try {
  31. Thread.sleep(100);
  32. } catch (InterruptedException e) {
  33. break;
  34. }
  35. }
  36. }
  37. }

main线程通过调用t.interrupt()从而通知t线程中断,而此时t线程正位于hello.join()的等待中,此方法会立刻结束等待并抛出InterruptedException。由于我们在t线程中捕获了InterruptedException,因此,就可以准备结束该线程。在t线程结束前,对hello线程也进行了interrupt()调用通知其中断。如果去掉这一行代码,可以发现hello线程仍然会继续运行,且JVM不会退出。

另一个常用的中断线程的方法是设置标志位。我们通常会用一个running标志位来标识线程是否应该继续运行,在外部线程中,通过把HelloThread.running置为false,就可以让线程结束:

  1. //中断线程
  2. public class Main {
  3. public static void main(String[] args) throws InterruptedException {
  4. HelloThread t = new HelloThread();
  5. t.start();
  6. Thread.sleep(1);
  7. t.running = false; // 标志位置为false
  8. }
  9. }
  10. class HelloThread extends Thread {
  11. public volatile boolean running = true;
  12. public void run() {
  13. int n = 0;
  14. while (running) {
  15. n ++;
  16. System.out.println(n + " hello!");
  17. }
  18. System.out.println("end!");
  19. }
  20. }

注意到HelloThread的标志位boolean running是一个线程间共享的变量。线程间共享变量需要使用volatile关键字标记,确保每个线程都能读取到更新后的变量值。

为什么要对线程间共享的变量用关键字volatile声明?这涉及到Java的内存模型。在Java虚拟机中,变量的值保存在主内存中,但是,当线程访问变量时,它会先获取一个副本,并保存在自己的工作内存中。如果线程修改了变量的值,虚拟机会在某个时刻把修改后的值回写到主内存,但是,这个时间是不确定的!

Java多线程 - 图5

这会导致如果一个线程更新了某个变量,另一个线程读取的值可能还是更新前的。例如,主内存的变量a = true,线程1执行a = false时,它在此刻仅仅是把变量a的副本变成了false,主内存的变量a还是true,在JVM把修改后的a回写到主内存之前,其他线程读取到的a的值仍然是true,这就造成了多线程之间共享的变量不一致。

因此,volatile关键字的目的是告诉虚拟机:

  • 每次访问变量时,总是获取主内存的最新值;
  • 每次修改变量后,立刻回写到主内存。

volatile关键字解决的是可见性问题:当一个线程修改了某个共享变量的值,其他线程能够立刻看到修改后的值。

如果我们去掉volatile关键字,运行上述程序,发现效果和带volatile差不多,这是因为在x86的架构下,JVM回写主内存的速度非常快,但是,换成ARM的架构,就会有显著的延迟。

本章小结

对目标线程调用interrupt()方法可以请求中断一个线程,目标线程通过检测isInterrupted()标志获取自身是否已中断。如果目标线程处于等待状态,该线程会捕获到InterruptedException

目标线程检测到isInterrupted()true或者捕获了InterruptedException都应该立刻结束自身线程;

通过标志位判断需要正确使用volatile关键字;

volatile关键字解决了共享变量在线程间的可见性问题。

守护线程

Java程序入口就是由JVM启动main线程,main线程又可以启动其他线程。当所有线程都运行结束时,JVM退出,进程结束。

如果有一个线程没有退出,JVM进程就不会退出。所以,必须保证所有线程都能及时结束。

但是有一种线程的目的就是无限循环,例如,一个定时触发任务的线程:

  1. class TimerThread extends Thread {
  2. @Override
  3. public void run() {
  4. while (true) {
  5. System.out.println(LocalTime.now());
  6. try {
  7. Thread.sleep(1000);
  8. } catch (InterruptedException e) {
  9. break;
  10. }
  11. }
  12. }
  13. }

如果这个线程不结束,JVM进程就无法结束。问题是,由谁负责结束这个线程?

然而这类线程经常没有负责人来负责结束它们。但是,当其他线程结束时,JVM进程又必须要结束,怎么办?

答案是使用守护线程(Daemon Thread)。

守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。

因此,JVM退出时,不必关心守护线程是否已结束。

如何创建守护线程呢?方法和普通线程一样,只是在调用start()方法前,调用setDaemon(true)把该线程标记为守护线程:

  1. Thread t = new MyThread();
  2. t.setDaemon(true);
  3. t.start();

在守护线程中,编写代码要注意:守护线程不能持有任何需要关闭的资源,例如打开文件等,因为虚拟机退出时,守护线程没有任何机会来关闭文件,这会导致数据丢失。

本章小结

守护线程是为其他线程服务的线程;

所有非守护线程都执行完毕后,虚拟机退出;

守护线程不能持有需要关闭的资源(如打开文件等)。

线程同步

当多个线程同时运行时,线程的调度由操作系统决定,程序本身无法决定。因此,任何一个线程都有可能在任何指令处被操作系统暂停,然后在某个时间段后继续执行。

这个时候,有个单线程模型下不存在的问题就来了:如果多个线程同时读写共享变量,会出现数据不一致的问题。

我们来看一个例子:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) throws Exception {
  4. var add = new AddThread();
  5. var dec = new DecThread();
  6. add.start();
  7. dec.start();
  8. add.join();
  9. dec.join();
  10. System.out.println(Counter.count);
  11. }
  12. }
  13. class Counter {
  14. public static int count = 0;
  15. }
  16. class AddThread extends Thread {
  17. public void run() {
  18. for (int i=0; i<10000; i++) { Counter.count += 1; }
  19. }
  20. }
  21. class DecThread extends Thread {
  22. public void run() {
  23. for (int i=0; i<10000; i++) { Counter.count -= 1; }
  24. }
  25. }

上面的代码很简单,两个线程同时对一个int变量进行操作,一个加10000次,一个减10000次,最后结果应该是0,但是,每次运行,结果实际上都是不一样的。

这是因为对变量进行读取和写入时,结果要正确,必须保证是原子操作。原子操作是指不能被中断的一个或一系列操作。

例如,对于语句:

  1. n = n + 1;

看上去是一行语句,实际上对应了3条指令:

  1. ILOAD
  2. IADD
  3. ISTORE

我们假设n的值是100,如果两个线程同时执行n = n + 1,得到的结果很可能不是102,而是101,原因在于:

Java多线程 - 图6

如果线程1在执行ILOAD后被操作系统中断,此刻如果线程2被调度执行,它执行ILOAD后获取的值仍然是100,最终结果被两个线程的ISTORE写入后变成了101,而不是期待的102

这说明多线程模型下,要保证逻辑正确,对共享变量进行读写时,必须保证一组指令以原子方式执行:即某一个线程执行时,其他线程必须等待:

Java多线程 - 图7

通过加锁和解锁的操作,就能保证3条指令总是在一个线程执行期间,不会有其他线程会进入此指令区间。即使在执行期线程被操作系统中断执行,其他线程也会因为无法获得锁导致无法进入此指令区间。只有执行线程将锁释放后,其他线程才有机会获得锁并执行。这种加锁和解锁之间的代码块我们称之为临界区(Critical Section),任何时候临界区最多只有一个线程能执行。

可见,保证一段代码的原子性就是通过加锁和解锁实现的。Java程序使用synchronized关键字对一个对象进行加锁:

  1. synchronized(lock) {
  2. n = n + 1;
  3. }

synchronized保证了代码块在任意时刻最多只有一个线程能执行。我们把上面的代码用synchronized改写如下:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) throws Exception {
  4. var add = new AddThread();
  5. var dec = new DecThread();
  6. add.start();
  7. dec.start();
  8. add.join();
  9. dec.join();
  10. System.out.println(Counter.count);
  11. }
  12. }
  13. class Counter {
  14. public static final Object lock = new Object();
  15. public static int count = 0;
  16. }
  17. class AddThread extends Thread {
  18. public void run() {
  19. for (int i=0; i<10000; i++) {
  20. synchronized(Counter.lock) {
  21. Counter.count += 1;
  22. }
  23. }
  24. }
  25. }
  26. class DecThread extends Thread {
  27. public void run() {
  28. for (int i=0; i<10000; i++) {
  29. synchronized(Counter.lock) {
  30. Counter.count -= 1;
  31. }
  32. }
  33. }
  34. }

注意到代码:

  1. synchronized(Counter.lock) { // 获取锁
  2. ...
  3. } // 释放锁

它表示用Counter.lock实例作为锁,两个线程在执行各自的synchronized(Counter.lock) { ... }代码块时,必须先获得锁,才能进入代码块进行。执行结束后,在synchronized语句块结束会自动释放锁。这样一来,对Counter.count变量进行读写就不可能同时进行。上述代码无论运行多少次,最终结果都是0。

使用synchronized解决了多线程同步访问共享变量的正确性问题。但是,它的缺点是带来了性能下降。因为synchronized代码块无法并发执行。此外,加锁和解锁需要消耗一定的时间,所以,synchronized会降低程序的执行效率。

我们来概括一下如何使用synchronized

  1. 找出修改共享变量的线程代码块;
  2. 选择一个共享实例作为锁;
  3. 使用synchronized(lockObject) { ... }

在使用synchronized的时候,不必担心抛出异常。因为无论是否有异常,都会在synchronized结束处正确释放锁:

  1. public void add(int m) {
  2. synchronized (obj) {
  3. if (m < 0) {
  4. throw new RuntimeException();
  5. }
  6. this.value += m;
  7. } // 无论有无异常,都会在此释放锁
  8. }

我们再来看一个错误使用synchronized的例子:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) throws Exception {
  4. var add = new AddThread();
  5. var dec = new DecThread();
  6. add.start();
  7. dec.start();
  8. add.join();
  9. dec.join();
  10. System.out.println(Counter.count);
  11. }
  12. }
  13. class Counter {
  14. public static final Object lock1 = new Object();
  15. public static final Object lock2 = new Object();
  16. public static int count = 0;
  17. }
  18. class AddThread extends Thread {
  19. public void run() {
  20. for (int i=0; i<10000; i++) {
  21. synchronized(Counter.lock1) {
  22. Counter.count += 1;
  23. }
  24. }
  25. }
  26. }
  27. class DecThread extends Thread {
  28. public void run() {
  29. for (int i=0; i<10000; i++) {
  30. synchronized(Counter.lock2) {
  31. Counter.count -= 1;
  32. }
  33. }
  34. }
  35. }

结果并不是0,这是因为两个线程各自的synchronized锁住的不是同一个对象!这使得两个线程各自都可以同时获得锁:因为JVM只保证同一个锁在任意时刻只能被一个线程获取,但两个不同的锁在同一时刻可以被两个线程分别获取。

因此,使用synchronized的时候,获取到的是哪个锁非常重要。锁对象如果不对,代码逻辑就不对。

我们再看一个例子:

  1. //多线程
  2. public class Main {
  3. public static void main(String[] args) throws Exception {
  4. var ts = new Thread[] { new AddStudentThread(), new DecStudentThread(), new AddTeacherThread(), new DecTeacherThread() };
  5. for (var t : ts) {
  6. t.start();
  7. }
  8. for (var t : ts) {
  9. t.join();
  10. }
  11. System.out.println(Counter.studentCount);
  12. System.out.println(Counter.teacherCount);
  13. }
  14. }
  15. class Counter {
  16. public static final Object lock = new Object();
  17. public static int studentCount = 0;
  18. public static int teacherCount = 0;
  19. }
  20. class AddStudentThread extends Thread {
  21. public void run() {
  22. for (int i=0; i<10000; i++) {
  23. synchronized(Counter.lock) {
  24. Counter.studentCount += 1;
  25. }
  26. }
  27. }
  28. }
  29. class DecStudentThread extends Thread {
  30. public void run() {
  31. for (int i=0; i<10000; i++) {
  32. synchronized(Counter.lock) {
  33. Counter.studentCount -= 1;
  34. }
  35. }
  36. }
  37. }
  38. class AddTeacherThread extends Thread {
  39. public void run() {
  40. for (int i=0; i<10000; i++) {
  41. synchronized(Counter.lock) {
  42. Counter.teacherCount += 1;
  43. }
  44. }
  45. }
  46. }
  47. class DecTeacherThread extends Thread {
  48. public void run() {
  49. for (int i=0; i<10000; i++) {
  50. synchronized(Counter.lock) {
  51. Counter.teacherCount -= 1;
  52. }
  53. }
  54. }
  55. }

上述代码的4个线程对两个共享变量分别进行读写操作,但是使用的锁都是Counter.lock这一个对象,这就造成了原本可以并发执行的Counter.studentCount += 1Counter.teacherCount += 1,现在无法并发执行了,执行效率大大降低。实际上,需要同步的线程可以分成两组:AddStudentThreadDecStudentThreadAddTeacherThreadDecTeacherThread,组之间不存在竞争,因此,应该使用两个不同的锁,即:

AddStudentThreadDecStudentThread使用lockStudent锁:

  1. synchronized(Counter.lockStudent) {
  2. ...
  3. }

AddTeacherThreadDecTeacherThread使用lockTeacher锁:

  1. synchronized(Counter.lockTeacher) {
  2. ...
  3. }

这样才能最大化地提高执行效率。

不需要synchronized的操作

JVM规范定义了几种原子操作:

  • 基本类型(longdouble除外)赋值,例如:int n = m
  • 引用类型赋值,例如:List<String> list = anotherList

longdouble是64位数据,JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把longdouble的赋值作为原子操作实现的。

单条原子操作的语句不需要同步。例如:

  1. public void set(int m) {
  2. synchronized(lock) {
  3. this.value = m;
  4. }
  5. }

就不需要同步。

对引用也是类似。例如:

  1. public void set(String s) {
  2. this.value = s;
  3. }

上述赋值语句并不需要同步。

但是,如果是多行赋值语句,就必须保证是同步操作,例如:

  1. class Pair {
  2. int first;
  3. int last;
  4. public void set(int first, int last) {
  5. synchronized(this) {
  6. this.first = first;
  7. this.last = last;
  8. }
  9. }
  10. }

有些时候,通过一些巧妙的转换,可以把非原子操作变为原子操作。例如,上述代码如果改造成:

  1. class Pair {
  2. int[] pair;
  3. public void set(int first, int last) {
  4. int[] ps = new int[] { first, last };
  5. this.pair = ps;
  6. }
  7. }

就不再需要同步,因为this.pair = ps是引用赋值的原子操作。而语句:

  1. int[] ps = new int[] { first, last };

这里的ps是方法内部定义的局部变量,每个线程都会有各自的局部变量,互不影响,并且互不可见,并不需要同步。

本章小结

多线程同时读写共享变量时,会造成逻辑错误,因此需要通过synchronized同步;

同步的本质就是给指定对象加锁,加锁后才能继续执行后续代码;

注意加锁对象必须是同一个实例;

对JVM定义的单个原子操作不需要同步。

同步方法

我们知道Java程序依靠synchronized对线程进行同步,使用synchronized的时候,锁住的是哪个对象非常重要。

让线程自己选择锁对象往往会使得代码逻辑混乱,也不利于封装。更好的方法是把synchronized逻辑封装起来。例如,我们编写一个计数器如下:

  1. public class Counter {
  2. private int count = 0;
  3. public void add(int n) {
  4. synchronized(this) {
  5. count += n;
  6. }
  7. }
  8. public void dec(int n) {
  9. synchronized(this) {
  10. count -= n;
  11. }
  12. }
  13. public int get() {
  14. return count;
  15. }
  16. }

这样一来,线程调用add()dec()方法时,它不必关心同步逻辑,因为synchronized代码块在add()dec()方法内部。并且,我们注意到,synchronized锁住的对象是this,即当前实例,这又使得创建多个Counter实例的时候,它们之间互不影响,可以并发执行:

  1. var c1 = Counter();
  2. var c2 = Counter();
  3. // 对c1进行操作的线程:
  4. new Thread(() -> {
  5. c1.add();
  6. }).start();
  7. new Thread(() -> {
  8. c1.dec();
  9. }).start();
  10. // 对c2进行操作的线程:
  11. new Thread(() -> {
  12. c2.add();
  13. }).start();
  14. new Thread(() -> {
  15. c2.dec();
  16. }).start();

现在,对于Counter类,多线程可以正确调用。

如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe),上面的Counter类就是线程安全的。Java标准库的java.lang.StringBuffer也是线程安全的。

还有一些不变类,例如StringIntegerLocalDate,它们的所有成员变量都是final,多线程同时访问时只能读不能写,这些不变类也是线程安全的。

最后,类似Math这些只提供静态方法,没有成员变量的类,也是线程安全的。

除了上述几种少数情况,大部分类,例如ArrayList,都是非线程安全的类,我们不能在多线程中修改它们。但是,如果所有线程都只读取,不写入,那么ArrayList是可以安全地在线程间共享的。

没有特殊说明时,一个类默认是非线程安全的。

我们再观察Counter的代码:

  1. public class Counter {
  2. public void add(int n) {
  3. synchronized(this) {
  4. count += n;
  5. }
  6. }
  7. ...
  8. }

当我们锁住的是this实例时,实际上可以用synchronized修饰这个方法。下面两种写法是等价的:

  1. public void add(int n) {
  2. synchronized(this) { // 锁住this
  3. count += n;
  4. } // 解锁
  5. }
  1. public synchronized void add(int n) { // 锁住this
  2. count += n;
  3. } // 解锁

因此,用synchronized修饰的方法就是同步方法,它表示整个方法都必须用this实例加锁。

我们再思考一下,如果对一个静态方法添加synchronized修饰符,它锁住的是哪个对象?

  1. public synchronized static void test(int n) {
  2. ...
  3. }

对于static方法,是没有this实例的,因为static方法是针对类而不是实例。但是我们注意到任何一个类都有一个由JVM自动创建的Class实例,因此,对static方法添加synchronized,锁住的是该类的Class实例。上述synchronized static方法实际上相当于:

  1. public class Counter {
  2. public static void test(int n) {
  3. synchronized(Counter.class) {
  4. ...
  5. }
  6. }
  7. }

我们再考察Counterget()方法:

  1. public class Counter {
  2. private int count;
  3. public int get() {
  4. return count;
  5. }
  6. ...
  7. }

它没有同步,因为读一个int变量不需要同步。

然而,如果我们把代码稍微改一下,返回一个包含两个int的对象:

  1. public class Counter {
  2. private int first;
  3. private int last;
  4. public Pair get() {
  5. Pair p = new Pair();
  6. p.first = first;
  7. p.last = last;
  8. return p;
  9. }
  10. ...
  11. }

就必须要同步了。

本章小结

synchronized修饰方法可以把整个方法变为同步代码块,synchronized方法加锁对象是this

通过合理的设计和数据封装可以让一个类变为“线程安全”;

一个类没有特殊说明,默认不是thread-safe;

多线程能否安全访问某个非线程安全的实例,需要具体问题具体分析。

死锁

Java的线程锁是可重入的锁。

什么是可重入的锁?我们还是来看例子:

  1. public class Counter {
  2. private int count = 0;
  3. public synchronized void add(int n) {
  4. if (n < 0) {
  5. dec(-n);
  6. } else {
  7. count += n;
  8. }
  9. }
  10. public synchronized void dec(int n) {
  11. count += n;
  12. }
  13. }

观察synchronized修饰的add()方法,一旦线程执行到add()方法内部,说明它已经获取了当前实例的this锁。如果传入的n < 0,将在add()方法内部调用dec()方法。由于dec()方法也需要获取this锁,现在问题来了:

对同一个线程,能否在获取到锁以后继续获取同一个锁?

答案是肯定的。JVM允许同一个线程重复获取同一个锁,这种能被同一个线程反复获取的锁,就叫做可重入锁。

由于Java的线程锁是可重入锁,所以,获取锁的时候,不但要判断是否是第一次获取,还要记录这是第几次获取。每获取一次锁,记录+1,每退出synchronized块,记录-1,减到0的时候,才会真正释放锁。

死锁

一个线程可以获取一个锁后,再继续获取另一个锁。例如:

  1. public void add(int m) {
  2. synchronized(lockA) { // 获得lockA的锁
  3. this.value += m;
  4. synchronized(lockB) { // 获得lockB的锁
  5. this.another += m;
  6. } // 释放lockB的锁
  7. } // 释放lockA的锁
  8. }
  9. public void dec(int m) {
  10. synchronized(lockB) { // 获得lockB的锁
  11. this.another -= m;
  12. synchronized(lockA) { // 获得lockA的锁
  13. this.value -= m;
  14. } // 释放lockA的锁
  15. } // 释放lockB的锁
  16. }

在获取多个锁的时候,不同线程获取多个不同对象的锁可能导致死锁。对于上述代码,线程1和线程2如果分别执行add()dec()方法时:

  • 线程1:进入add(),获得lockA
  • 线程2:进入dec(),获得lockB

随后:

  • 线程1:准备获得lockB,失败,等待中;
  • 线程2:准备获得lockA,失败,等待中。

此时,两个线程各自持有不同的锁,然后各自试图获取对方手里的锁,造成了双方无限等待下去,这就是死锁。

死锁发生后,没有任何机制能解除死锁,只能强制结束JVM进程。

因此,在编写多线程应用时,要特别注意防止死锁。因为死锁一旦形成,就只能强制结束进程。

那么我们应该如何避免死锁呢?答案是:线程获取锁的顺序要一致。即严格按照先获取lockA,再获取lockB的顺序,改写dec()方法如下:

  1. public void dec(int m) {
  2. synchronized(lockA) { // 获得lockA的锁
  3. this.value -= m;
  4. synchronized(lockB) { // 获得lockB的锁
  5. this.another -= m;
  6. } // 释放lockB的锁
  7. } // 释放lockA的锁
  8. }

本章小结

Java的synchronized锁是可重入锁;

死锁产生的条件是多线程各自持有不同的锁,并互相试图获取对方已持有的锁,导致无限等待;

避免死锁的方法是多线程获取锁的顺序要一致。

使用wait和notify

在Java程序中,synchronized解决了多线程竞争的问题。例如,对于一个任务管理器,多个线程同时往队列中添加任务,可以用synchronized加锁:

  1. class TaskQueue {
  2. Queue<String> queue = new LinkedList<>();
  3. public synchronized void addTask(String s) {
  4. this.queue.add(s);
  5. }
  6. }

但是synchronized并没有解决多线程协调的问题。

仍然以上面的TaskQueue为例,我们再编写一个getTask()方法取出队列的第一个任务:

  1. class TaskQueue {
  2. Queue<String> queue = new LinkedList<>();
  3. public synchronized void addTask(String s) {
  4. this.queue.add(s);
  5. }
  6. public synchronized String getTask() {
  7. while (queue.isEmpty()) {
  8. }
  9. return queue.remove();
  10. }
  11. }

上述代码看上去没有问题:getTask()内部先判断队列是否为空,如果为空,就循环等待,直到另一个线程往队列中放入了一个任务,while()循环退出,就可以返回队列的元素了。

但实际上while()循环永远不会退出。因为线程在执行while()循环时,已经在getTask()入口获取了this锁,其他线程根本无法调用addTask(),因为addTask()执行条件也是获取this锁。

因此,执行上述代码,线程会在getTask()中因为死循环而100%占用CPU资源。

如果深入思考一下,我们想要的执行效果是:

  • 线程1可以调用addTask()不断往队列中添加任务;
  • 线程2可以调用getTask()从队列中获取任务。如果队列为空,则getTask()应该等待,直到队列中至少有一个任务时再返回。

因此,多线程协调运行的原则就是:当条件不满足时,线程进入等待状态;当条件满足时,线程被唤醒,继续执行任务。

对于上述TaskQueue,我们先改造getTask()方法,在条件不满足时,线程进入等待状态:

  1. public synchronized String getTask() {
  2. while (queue.isEmpty()) {
  3. this.wait();
  4. }
  5. return queue.remove();
  6. }

当一个线程执行到getTask()方法内部的while循环时,它必定已经获取到了this锁,此时,线程执行while条件判断,如果条件成立(队列为空),线程将执行this.wait(),进入等待状态。

这里的关键是:wait()方法必须在当前获取的锁对象上调用,这里获取的是this锁,因此调用this.wait()

调用wait()方法后,线程进入等待状态,wait()方法不会返回,直到将来某个时刻,线程从等待状态被其他线程唤醒后,wait()方法才会返回,然后,继续执行下一条语句。

有些仔细的童鞋会指出:即使线程在getTask()内部等待,其他线程如果拿不到this锁,照样无法执行addTask(),肿么办?

这个问题的关键就在于wait()方法的执行机制非常复杂。首先,它不是一个普通的Java方法,而是定义在Object类的一个native方法,也就是由JVM的C代码实现的。其次,必须在synchronized块中才能调用wait()方法,因为wait()方法调用时,会释放线程获得的锁,wait()方法返回后,线程又会重新试图获得锁。

因此,只能在锁对象上调用wait()方法。因为在getTask()中,我们获得了this锁,因此,只能在this对象上调用wait()方法:

  1. public synchronized String getTask() {
  2. while (queue.isEmpty()) {
  3. // 释放this锁:
  4. this.wait();
  5. // 重新获取this锁
  6. }
  7. return queue.remove();
  8. }

当一个线程在this.wait()等待时,它就会释放this锁,从而使得其他线程能够在addTask()方法获得this锁。

现在我们面临第二个问题:如何让等待的线程被重新唤醒,然后从wait()方法返回?答案是在相同的锁对象上调用notify()方法。我们修改addTask()如下:

  1. public synchronized void addTask(String s) {
  2. this.queue.add(s);
  3. this.notify(); // 唤醒在this锁等待的线程
  4. }

注意到在往队列中添加了任务后,线程立刻对this锁对象调用notify()方法,这个方法会唤醒一个正在this锁等待的线程(就是在getTask()中位于this.wait()的线程),从而使得等待线程从this.wait()方法返回。

我们来看一个完整的例子:

  1. import java.util.*;
  2. public class Main {
  3. public static void main(String[] args) throws InterruptedException {
  4. var q = new TaskQueue();
  5. var ts = new ArrayList<Thread>();
  6. for (int i=0; i<5; i++) {
  7. var t = new Thread() {
  8. public void run() {
  9. // 执行task:
  10. while (true) {
  11. try {
  12. String s = q.getTask();
  13. System.out.println("execute task: " + s);
  14. } catch (InterruptedException e) {
  15. return;
  16. }
  17. }
  18. }
  19. };
  20. t.start();
  21. ts.add(t);
  22. }
  23. var add = new Thread(() -> {
  24. for (int i=0; i<10; i++) {
  25. // 放入task:
  26. String s = "t-" + Math.random();
  27. System.out.println("add task: " + s);
  28. q.addTask(s);
  29. try { Thread.sleep(100); } catch(InterruptedException e) {}
  30. }
  31. });
  32. add.start();
  33. add.join();
  34. Thread.sleep(100);
  35. for (var t : ts) {
  36. t.interrupt();
  37. }
  38. }
  39. }
  40. class TaskQueue {
  41. Queue<String> queue = new LinkedList<>();
  42. public synchronized void addTask(String s) {
  43. this.queue.add(s);
  44. this.notifyAll();
  45. }
  46. public synchronized String getTask() throws InterruptedException {
  47. while (queue.isEmpty()) {
  48. this.wait();
  49. }
  50. return queue.remove();
  51. }
  52. }

这个例子中,我们重点关注addTask()方法,内部调用了this.notifyAll()而不是this.notify(),使用notifyAll()将唤醒所有当前正在this锁等待的线程,而notify()只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)。这是因为可能有多个线程正在getTask()方法内部的wait()中等待,使用notifyAll()将一次性全部唤醒。通常来说,notifyAll()更安全。有些时候,如果我们的代码逻辑考虑不周,用notify()会导致只唤醒了一个线程,而其他线程可能永远等待下去醒不过来了。

但是,注意到wait()方法返回时需要重新获得this锁。假设当前有3个线程被唤醒,唤醒后,首先要等待执行addTask()的线程结束此方法后,才能释放this锁,随后,这3个线程中只能有一个获取到this锁,剩下两个将继续等待。

再注意到我们在while()循环中调用wait(),而不是if语句:

  1. public synchronized String getTask() throws InterruptedException {
  2. if (queue.isEmpty()) {
  3. this.wait();
  4. }
  5. return queue.remove();
  6. }

这种写法实际上是错误的,因为线程被唤醒时,需要再次获取this锁。多个线程被唤醒后,只有一个线程能获取this锁,此刻,该线程执行queue.remove()可以获取到队列的元素,然而,剩下的线程如果获取this锁后执行queue.remove(),此刻队列可能已经没有任何元素了,所以,要始终在while循环中wait(),并且每次被唤醒后拿到this锁就必须再次判断:

  1. while (queue.isEmpty()) {
  2. this.wait();
  3. }

所以,正确编写多线程代码是非常困难的,需要仔细考虑的条件非常多,任何一个地方考虑不周,都会导致多线程运行时不正常。

Java多线程 - 图8

本章小结

waitnotify用于多线程协调运行:

  • synchronized内部可以调用wait()使线程进入等待状态;
  • 必须在已获得的锁对象上调用wait()方法;
  • synchronized内部可以调用notify()notifyAll()唤醒其他等待线程;
  • 必须在已获得的锁对象上调用notify()notifyAll()方法;
  • 已唤醒的线程还需要重新获得锁后才能继续执行。

使用ReentrantLock

从Java 5开始,引入了一个高级的处理并发的java.util.concurrent包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。

我们知道Java语言直接提供了synchronized关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。

java.util.concurrent.locks包提供的ReentrantLock用于替代synchronized加锁,我们来看一下传统的synchronized代码:

  1. public class Counter {
  2. private int count;
  3. public void add(int n) {
  4. synchronized(this) {
  5. count += n;
  6. }
  7. }
  8. }

如果用ReentrantLock替代,可以把代码改造为:

  1. public class Counter {
  2. private final Lock lock = new ReentrantLock();
  3. private int count;
  4. public void add(int n) {
  5. lock.lock();
  6. try {
  7. count += n;
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  12. }

因为synchronized是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock是Java代码实现的锁,我们就必须先获取锁,然后在finally中正确释放锁。

顾名思义,ReentrantLock是可重入锁,它和synchronized一样,一个线程可以多次获取同一个锁。

synchronized不同的是,ReentrantLock可以尝试获取锁:

  1. if (lock.tryLock(1, TimeUnit.SECONDS)) {
  2. try {
  3. ...
  4. } finally {
  5. lock.unlock();
  6. }
  7. }

上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()返回false,程序就可以做一些额外处理,而不是无限等待下去。

所以,使用ReentrantLock比直接使用synchronized更安全,线程在tryLock()失败的时候不会导致死锁。

本章小结

ReentrantLock可以替代synchronized进行同步;

ReentrantLock获取锁更安全;

必须先获取到锁,再进入try {...}代码块,最后使用finally保证释放锁;

可以使用tryLock()尝试获取锁。

使用Condition

使用ReentrantLock比直接使用synchronized更安全,可以替代synchronized进行线程同步。

但是,synchronized可以配合waitnotify实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock我们怎么编写waitnotify的功能呢?

答案是使用Condition对象来实现waitnotify的功能。

我们仍然以TaskQueue为例,把前面用synchronized实现的功能通过ReentrantLockCondition来实现:

  1. class TaskQueue {
  2. private final Lock lock = new ReentrantLock();
  3. private final Condition condition = lock.newCondition();
  4. private Queue<String> queue = new LinkedList<>();
  5. public void addTask(String s) {
  6. lock.lock();
  7. try {
  8. queue.add(s);
  9. condition.signalAll();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. public String getTask() {
  15. lock.lock();
  16. try {
  17. while (queue.isEmpty()) {
  18. condition.await();
  19. }
  20. return queue.remove();
  21. } finally {
  22. lock.unlock();
  23. }
  24. }
  25. }

可见,使用Condition时,引用的Condition对象必须从Lock实例的newCondition()返回,这样才能获得一个绑定了Lock实例的Condition实例。

Condition提供的await()signal()signalAll()原理和synchronized锁对象的wait()notify()notifyAll()是一致的,并且其行为也是一样的:

  • await()会释放当前锁,进入等待状态;
  • signal()会唤醒某个等待线程;
  • signalAll()会唤醒所有等待线程;
  • 唤醒线程从await()返回后需要重新获得锁。

此外,和tryLock()类似,await()可以在等待指定时间后,如果还没有被其他线程通过signal()signalAll()唤醒,可以自己醒来:

  1. if (condition.await(1, TimeUnit.SECOND)) {
  2. // 被其他线程唤醒
  3. } else {
  4. // 指定时间内没有被其他线程唤醒
  5. }

可见,使用Condition配合Lock,我们可以实现更灵活的线程同步。

本章小结

Condition可以替代waitnotify

Condition对象必须从Lock对象获取。

使用ReadWriteLock

前面讲到的ReentrantLock保证了只有一个线程可以执行临界区代码:

  1. public class Counter {
  2. private final Lock lock = new ReentrantLock();
  3. private int[] counts = new int[10];
  4. public void inc(int index) {
  5. lock.lock();
  6. try {
  7. counts[index] += 1;
  8. } finally {
  9. lock.unlock();
  10. }
  11. }
  12. public int[] get() {
  13. lock.lock();
  14. try {
  15. return Arrays.copyOf(counts, counts.length);
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. }

但是有些时候,这种保护有点过头。因为我们发现,任何时刻,只允许一个线程修改,也就是调用inc()方法是必须获取锁,但是,get()方法只读取数据,不修改数据,它实际上允许多个线程同时调用。

实际上我们想要的是:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待:

允许 不允许
不允许 不允许

使用ReadWriteLock可以解决这个问题,它保证:

  • 只允许一个线程写入(其他线程既不能写入也不能读取);
  • 没有写入时,多个线程允许同时读(提高性能)。

ReadWriteLock实现这个功能十分容易。我们需要创建一个ReadWriteLock实例,然后分别获取读锁和写锁:

  1. public class Counter {
  2. private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
  3. private final Lock rlock = rwlock.readLock();
  4. private final Lock wlock = rwlock.writeLock();
  5. private int[] counts = new int[10];
  6. public void inc(int index) {
  7. wlock.lock(); // 加写锁
  8. try {
  9. counts[index] += 1;
  10. } finally {
  11. wlock.unlock(); // 释放写锁
  12. }
  13. }
  14. public int[] get() {
  15. rlock.lock(); // 加读锁
  16. try {
  17. return Arrays.copyOf(counts, counts.length);
  18. } finally {
  19. rlock.unlock(); // 释放读锁
  20. }
  21. }
  22. }

把读写操作分别用读锁和写锁来加锁,在读取时,多个线程可以同时获得读锁,这样就大大提高了并发读的执行效率。

使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。

例如,一个论坛的帖子,回复可以看做写入操作,它是不频繁的,但是,浏览可以看做读取操作,是非常频繁的,这种情况就可以使用ReadWriteLock

本章小结

使用ReadWriteLock可以提高读取效率:

  • ReadWriteLock只允许一个线程写入;
  • ReadWriteLock允许多个线程在没有写入时同时读取;
  • ReadWriteLock适合读多写少的场景。

使用StampedLock

前面介绍的ReadWriteLock可以解决多线程同时读,但只有一个线程能写的问题。

如果我们深入分析ReadWriteLock,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。

要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock

StampedLockReadWriteLock相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。

乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。

我们来看例子:

  1. public class Point {
  2. private final StampedLock stampedLock = new StampedLock();
  3. private double x;
  4. private double y;
  5. public void move(double deltaX, double deltaY) {
  6. long stamp = stampedLock.writeLock(); // 获取写锁
  7. try {
  8. x += deltaX;
  9. y += deltaY;
  10. } finally {
  11. stampedLock.unlockWrite(stamp); // 释放写锁
  12. }
  13. }
  14. public double distanceFromOrigin() {
  15. long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
  16. // 注意下面两行代码不是原子操作
  17. // 假设x,y = (100,200)
  18. double currentX = x;
  19. // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
  20. double currentY = y;
  21. // 此处已读取到y,如果没有写入,读取是正确的(100,200)
  22. // 如果有写入,读取是错误的(100,400)
  23. if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
  24. stamp = stampedLock.readLock(); // 获取一个悲观读锁
  25. try {
  26. currentX = x;
  27. currentY = y;
  28. } finally {
  29. stampedLock.unlockRead(stamp); // 释放悲观读锁
  30. }
  31. }
  32. return Math.sqrt(currentX * currentX + currentY * currentY);
  33. }
  34. }

ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。

可见,StampedLock把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock是不可重入锁,不能在一个线程中反复获取同一个锁。

StampedLock还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。

本章小结

StampedLock提供了乐观读锁,可取代ReadWriteLock以进一步提升并发性能;

StampedLock是不可重入锁。

使用Concurrent集合

我们在前面已经通过ReentrantLockCondition实现了一个BlockingQueue

  1. public class TaskQueue {
  2. private final Lock lock = new ReentrantLock();
  3. private final Condition condition = lock.newCondition();
  4. private Queue<String> queue = new LinkedList<>();
  5. public void addTask(String s) {
  6. lock.lock();
  7. try {
  8. queue.add(s);
  9. condition.signalAll();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }
  14. public String getTask() {
  15. lock.lock();
  16. try {
  17. while (queue.isEmpty()) {
  18. condition.await();
  19. }
  20. return queue.remove();
  21. } finally {
  22. lock.unlock();
  23. }
  24. }
  25. }

BlockingQueue的意思就是说,当一个线程调用这个TaskQueuegetTask()方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()方法才会返回。

因为BlockingQueue非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent包提供的线程安全的集合:ArrayBlockingQueue

除了BlockingQueue外,针对ListMapSetDeque等,java.util.concurrent包也提供了对应的并发集合类。我们归纳一下:

interface non-thread-safe thread-safe
List ArrayList CopyOnWriteArrayList
Map HashMap ConcurrentHashMap
Set HashSet / TreeSet CopyOnWriteArraySet
Queue ArrayDeque / LinkedList ArrayBlockingQueue / LinkedBlockingQueue
Deque ArrayDeque / LinkedList LinkedBlockingDeque

使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap为例:

  1. Map<String, String> map = new ConcurrentHashMap<>();
  2. // 在不同的线程读写:
  3. map.put("A", "1");
  4. map.put("B", "2");
  5. map.get("A", "1");

因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:

  1. Map<String, String> map = new HashMap<>();

改为:

  1. Map<String, String> map = new ConcurrentHashMap<>();

就可以了。

java.util.Collections工具类还提供了一个旧的线程安全集合转换器,可以这么用:

  1. Map unsafeMap = new HashMap();
  2. Map threadSafeMap = Collections.synchronizedMap(unsafeMap);

但是它实际上是用一个包装类包装了非线程安全的Map,然后对所有读写方法都用synchronized加锁,这样获得的线程安全集合的性能比java.util.concurrent集合要低很多,所以不推荐使用。

本章小结

使用java.util.concurrent包提供的线程安全的并发集合可以大大简化多线程编程:

多线程同时读写并发集合是安全的;

尽量使用Java标准库提供的并发集合,避免自己编写同步代码。

使用Atomic

Java的java.util.concurrent包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic包。

我们以AtomicInteger为例,它提供的主要操作有:

  • 增加值并返回新值:int addAndGet(int delta)
  • 加1后返回新值:int incrementAndGet()
  • 获取当前值:int get()
  • 用CAS方式设置:int compareAndSet(int expect, int update)

Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。

如果我们自己通过CAS编写incrementAndGet(),它大概长这样:

  1. public int incrementAndGet(AtomicInteger var) {
  2. int prev, next;
  3. do {
  4. prev = var.get();
  5. next = prev + 1;
  6. } while ( ! var.compareAndSet(prev, next));
  7. return next;
  8. }

CAS是指,在这个操作中,如果AtomicInteger的当前值是prev,那么就更新为next,返回true。如果AtomicInteger的当前值不是prev,就什么也不干,返回false。通过CAS操作并配合do ... while循环,即使其他线程修改了AtomicInteger的值,最终的结果也是正确的。

我们利用AtomicLong可以编写一个多线程安全的全局唯一ID生成器:

  1. class IdGenerator {
  2. AtomicLong var = new AtomicLong(0);
  3. public long getNextId() {
  4. return var.incrementAndGet();
  5. }
  6. }

通常情况下,我们并不需要直接用do ... while循环调用compareAndSet实现复杂的并发操作,而是用incrementAndGet()这样的封装好的方法,因此,使用起来非常简单。

在高度竞争的情况下,还可以使用Java 8提供的LongAdderLongAccumulator

本章小结

使用java.util.concurrent.atomic提供的原子操作可以简化多线程编程:

  • 原子操作实现了无锁的线程安全;
  • 适用于计数器,累加器等。

使用线性池

Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。

如果可以复用一组线程:

Java多线程 - 图9

那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。

简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。

Java标准库提供了ExecutorService接口表示线程池,它的典型用法如下:

  1. // 创建固定大小的线程池:
  2. ExecutorService executor = Executors.newFixedThreadPool(3);
  3. // 提交任务:
  4. executor.submit(task1);
  5. executor.submit(task2);
  6. executor.submit(task3);
  7. executor.submit(task4);
  8. executor.submit(task5);

因为ExecutorService只是接口,Java标准库提供的几个常用实现类有:

  • FixedThreadPool:线程数固定的线程池;
  • CachedThreadPool:线程数根据任务动态调整的线程池;
  • SingleThreadExecutor:仅单线程执行的线程池。

创建这些线程池的方法都被封装到Executors这个类中。我们以FixedThreadPool为例,看看线程池的执行逻辑:

  1. //thread-pool
  2. import java.util.concurrent.*;
  3. public class Main {
  4. public static void main(String[] args) {
  5. // 创建一个固定大小的线程池:
  6. ExecutorService es = Executors.newFixedThreadPool(4);
  7. for (int i = 0; i < 6; i++) {
  8. es.submit(new Task("" + i));
  9. }
  10. // 关闭线程池:
  11. es.shutdown();
  12. }
  13. }
  14. class Task implements Runnable {
  15. private final String name;
  16. public Task(String name) {
  17. this.name = name;
  18. }
  19. @Override
  20. public void run() {
  21. System.out.println("start task " + name);
  22. try {
  23. Thread.sleep(1000);
  24. } catch (InterruptedException e) {
  25. }
  26. System.out.println("end task " + name);
  27. }
  28. }

我们观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。

线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务,awaitTermination()则会等待指定的时间让线程池关闭。

如果我们把线程池改为CachedThreadPool,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。

如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()方法的源码:

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

因此,想创建指定动态范围的线程池,可以这么写:

  1. int min = 4;
  2. int max = 10;
  3. ExecutorService es = new ThreadPoolExecutor(min, max,
  4. 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

ScheduledThreadpool

还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任务可以定期反复执行。

创建一个ScheduledThreadPool仍然是通过Executors类:

  1. ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

我们可以提交一次性任务,它会在指定延迟后只执行一次:

  1. // 1秒后执行一次性任务:
  2. ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

如果任务以固定的每3秒执行,我们可以这样写:

  1. // 2秒后开始执行定时任务,每3秒执行:
  2. ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

如果任务以固定的3秒为间隔执行,我们可以这样写:

  1. // 2秒后开始执行定时任务,以3秒为间隔执行:
  2. ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:

Java多线程 - 图10

而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:

Java多线程 - 图11

因此,使用ScheduledThreadPool时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。

细心的童鞋还可以思考下面的问题:

  • 在FixedRate模式下,假设每秒触发,如果某次任务执行时间超过1秒,后续任务会不会并发执行?
  • 如果任务抛出了异常,后续任务是否继续执行?

Java标准库还提供了一个java.util.Timer类,这个类也可以定期执行任务,但是,一个Timer会对应一个Thread,所以,一个Timer只能定期执行一个任务,多个定时任务必须启动多个Timer,而一个ScheduledThreadPool就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool取代旧的Timer

本章小结

JDK提供了ExecutorService实现了线程池功能:

  • 线程池内部维护一组线程,可以高效执行大量小任务;
  • Executors提供了静态方法创建不同类型的ExecutorService
  • 必须调用shutdown()关闭ExecutorService
  • ScheduledThreadPool可以定期调度多个任务。

使用Future

在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable接口,就可以让线程池去执行:

  1. class Task implements Runnable {
  2. public String result;
  3. public void run() {
  4. this.result = longTimeCalculation();
  5. }
  6. }

Runnable接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable接口,和Runnable接口比,它多了一个返回值:

  1. class Task implements Callable<String> {
  2. public String call() throws Exception {
  3. return longTimeCalculation();
  4. }
  5. }

并且Callable接口是一个泛型接口,可以返回指定类型的结果。

现在的问题是,如何获得异步执行的结果?

如果仔细看ExecutorService.submit()方法,可以看到,它返回了一个Future类型,一个Future类型的实例代表一个未来能获取结果的对象:

  1. ExecutorService executor = Executors.newFixedThreadPool(4);
  2. // 定义任务:
  3. Callable<String> task = new Task();
  4. // 提交任务并获得Future:
  5. Future<String> future = executor.submit(task);
  6. // 从Future获取异步执行返回的结果:
  7. String result = future.get(); // 可能阻塞

当我们提交一个Callable任务后,我们会同时获得一个Future对象,然后,我们在主线程某个时刻调用Future对象的get()方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。

一个Future<V>接口表示一个未来可能会返回的结果,它定义的方法有:

  • get():获取结果(可能会等待)
  • get(long timeout, TimeUnit unit):获取结果,但只等待指定的时间;
  • cancel(boolean mayInterruptIfRunning):取消当前任务;
  • isDone():判断任务是否已完成。

本章小结

对线程池提交一个Callable任务,可以获得一个Future对象;

可以用Future在将来某个时刻获取结果。

使用CompletableFuture

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

我们以获取股票价格为例,看看如何使用CompletableFuture

  1. // CompletableFuture
  2. import java.util.concurrent.CompletableFuture;
  3. public class Main {
  4. public static void main(String[] args) throws Exception {
  5. // 创建异步执行任务:
  6. CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
  7. // 如果执行成功:
  8. cf.thenAccept((result) -> {
  9. System.out.println("price: " + result);
  10. });
  11. // 如果执行异常:
  12. cf.exceptionally((e) -> {
  13. e.printStackTrace();
  14. return null;
  15. });
  16. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
  17. Thread.sleep(200);
  18. }
  19. static Double fetchPrice() {
  20. try {
  21. Thread.sleep(100);
  22. } catch (InterruptedException e) {
  23. }
  24. if (Math.random() < 0.3) {
  25. throw new RuntimeException("fetch price failed!");
  26. }
  27. return 5 + Math.random() * 20;
  28. }
  29. }

创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:

  1. public interface Supplier<T> {
  2. T get();
  3. }

这里我们用lambda语法简化了一下,直接传入Main::fetchPrice,因为Main.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)。

紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,CompletableFuture会调用Consumer对象:

  1. public interface Consumer<T> {
  2. void accept(T t);
  3. }

异常时,CompletableFuture会调用Function对象:

  1. public interface Function<T, R> {
  2. R apply(T t);
  3. }

这里我们都用lambda语法简化了代码。

可见CompletableFuture的优点是:

  • 异步任务结束时,会自动回调某个对象的方法;
  • 异步任务出错时,会自动回调某个对象的方法;
  • 主线程设置好回调后,不再关心异步任务的执行。

如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:

  1. // CompletableFuture
  2. import java.util.concurrent.CompletableFuture;
  3. public class Main {
  4. public static void main(String[] args) throws Exception {
  5. // 第一个任务:
  6. CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
  7. return queryCode("中国石油");
  8. });
  9. // cfQuery成功后继续执行下一个任务:
  10. CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
  11. return fetchPrice(code);
  12. });
  13. // cfFetch成功后打印结果:
  14. cfFetch.thenAccept((result) -> {
  15. System.out.println("price: " + result);
  16. });
  17. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
  18. Thread.sleep(2000);
  19. }
  20. static String queryCode(String name) {
  21. try {
  22. Thread.sleep(100);
  23. } catch (InterruptedException e) {
  24. }
  25. return "601857";
  26. }
  27. static Double fetchPrice(String code) {
  28. try {
  29. Thread.sleep(100);
  30. } catch (InterruptedException e) {
  31. }
  32. return 5 + Math.random() * 20;
  33. }
  34. }

除了串行执行外,多个CompletableFuture还可以并行执行。

例如,我们考虑这样的场景:

同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

  1. // CompletableFuture
  2. import java.util.concurrent.CompletableFuture;
  3. public class Main {
  4. public static void main(String[] args) throws Exception {
  5. // 两个CompletableFuture执行异步查询:
  6. CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
  7. return queryCode("中国石油", "https://finance.sina.com.cn/code/");
  8. });
  9. CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
  10. return queryCode("中国石油", "https://money.163.com/code/");
  11. });
  12. // 用anyOf合并为一个新的CompletableFuture:
  13. CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
  14. // 两个CompletableFuture执行异步查询:
  15. CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
  16. return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
  17. });
  18. CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
  19. return fetchPrice((String) code, "https://money.163.com/price/");
  20. });
  21. // 用anyOf合并为一个新的CompletableFuture:
  22. CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
  23. // 最终结果:
  24. cfFetch.thenAccept((result) -> {
  25. System.out.println("price: " + result);
  26. });
  27. // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
  28. Thread.sleep(200);
  29. }
  30. static String queryCode(String name, String url) {
  31. System.out.println("query code from " + url + "...");
  32. try {
  33. Thread.sleep((long) (Math.random() * 100));
  34. } catch (InterruptedException e) {
  35. }
  36. return "601857";
  37. }
  38. static Double fetchPrice(String code, String url) {
  39. System.out.println("query price from " + url + "...");
  40. try {
  41. Thread.sleep((long) (Math.random() * 100));
  42. } catch (InterruptedException e) {
  43. }
  44. return 5 + Math.random() * 20;
  45. }
  46. }

上述逻辑实现的异步查询规则实际上是:

Java多线程 - 图12

除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

最后我们注意CompletableFuture的命名规则:

  • xxx():表示该方法将继续在已有的线程中执行;
  • xxxAsync():表示将异步在线程池中执行。

本章小结

CompletableFuture可以指定异步处理流程:

  • thenAccept()处理正常结果;
  • exceptional()处理异常结果;
  • thenApplyAsync()用于串行化另一个CompletableFuture
  • anyOf()allOf()用于并行化多个CompletableFuture

使用Fork/Join

带着这些面试问题区理解Fork/Join框架

  • Fork/Join主要用来解决什么样的问题?
  • Fork/Join框架是在哪个JDK版本中引入的?
  • Fork/Join框架主要包含哪三个模块? 模块之间的关系是怎么样的?
  • ForkJoinPool类继承关系?
  • ForkJoinTask抽象类继承关系? 在实际运用中,我们一般都会继承 RecursiveTask 、RecursiveAction 或 CountedCompleter 来实现我们的业务需求,而不会直接继承 ForkJoinTask 类。
  • 整个Fork/Join 框架的执行流程/运行机制是怎么样的?
  • 具体阐述Fork/Join的分治思想和work-stealing 实现方式?
  • 有哪些JDK源码中使用了Fork/Join思想?
  • 如何使用Executors工具类创建ForkJoinPool?
  • 写一个例子: 用ForkJoin方式实现1+2+3+…+100000?
  • Fork/Join在使用时有哪些注意事项? 结合JDK中的斐波那契数列实例具体说明

    使用Fork/Join

    Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。

我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:

Java多线程 - 图13

还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:

Java多线程 - 图14

如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:

Java多线程 - 图15

这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。

我们来看如何使用Fork/Join对大数据进行并行求和:

  1. import java.util.Random;
  2. import java.util.concurrent.*;
  3. public class Main {
  4. public static void main(String[] args) throws Exception {
  5. // 创建2000个随机数组成的数组:
  6. long[] array = new long[2000];
  7. long expectedSum = 0;
  8. for (int i = 0; i < array.length; i++) {
  9. array[i] = random();
  10. expectedSum += array[i];
  11. }
  12. System.out.println("Expected sum: " + expectedSum);
  13. // fork/join:
  14. ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
  15. long startTime = System.currentTimeMillis();
  16. Long result = ForkJoinPool.commonPool().invoke(task);
  17. long endTime = System.currentTimeMillis();
  18. System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
  19. }
  20. static Random random = new Random(0);
  21. static long random() {
  22. return random.nextInt(10000);
  23. }
  24. }
  25. class SumTask extends RecursiveTask<Long> {
  26. static final int THRESHOLD = 500;
  27. long[] array;
  28. int start;
  29. int end;
  30. SumTask(long[] array, int start, int end) {
  31. this.array = array;
  32. this.start = start;
  33. this.end = end;
  34. }
  35. @Override
  36. protected Long compute() {
  37. if (end - start <= THRESHOLD) {
  38. // 如果任务足够小,直接计算:
  39. long sum = 0;
  40. for (int i = start; i < end; i++) {
  41. sum += this.array[i];
  42. // 故意放慢计算速度:
  43. try {
  44. Thread.sleep(1);
  45. } catch (InterruptedException e) {
  46. }
  47. }
  48. return sum;
  49. }
  50. // 任务太大,一分为二:
  51. int middle = (end + start) / 2;
  52. System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
  53. SumTask subtask1 = new SumTask(this.array, start, middle);
  54. SumTask subtask2 = new SumTask(this.array, middle, end);
  55. invokeAll(subtask1, subtask2);
  56. Long subresult1 = subtask1.join();
  57. Long subresult2 = subtask2.join();
  58. Long result = subresult1 + subresult2;
  59. System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
  60. return result;
  61. }
  62. }

观察上述代码的执行过程,一个大的计算任务02000首先分裂为两个小任务01000和10002000,这两个小任务仍然太大,继续分裂为更小的0500,5001000,10001500,1500~2000,最后,计算结果被依次合并,得到最终结果。

因此,核心代码SumTask继承自RecursiveTask,在compute()方法中,关键是如何“分裂”出子任务并且提交子任务:

  1. class SumTask extends RecursiveTask<Long> {
  2. protected Long compute() {
  3. // “分裂”子任务:
  4. SumTask subtask1 = new SumTask(...);
  5. SumTask subtask2 = new SumTask(...);
  6. // invokeAll会并行运行两个子任务:
  7. invokeAll(subtask1, subtask2);
  8. // 获得子任务的结果:
  9. Long subresult1 = subtask1.join();
  10. Long subresult2 = subtask2.join();
  11. // 汇总结果:
  12. return subresult1 + subresult2;
  13. }
  14. }

Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。

本章小结

Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。

ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTaskRecursiveAction

使用Fork/Join模式可以进行并行计算以提高效率。

使用ThreadLocal

多线程是Java实现多任务的基础,Thread对象代表一个线程,我们可以在代码中调用Thread.currentThread()获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字:

  1. //Thread
  2. public class Main {
  3. public static void main(String[] args) throws Exception {
  4. log("start main...");
  5. new Thread(() -> {
  6. log("run task...");
  7. }).start();
  8. new Thread(() -> {
  9. log("print...");
  10. }).start();
  11. log("end main.");
  12. }
  13. static void log(String s) {
  14. System.out.println(Thread.currentThread().getName() + ": " + s);
  15. }
  16. }

对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:

  1. public void process(User user) {
  2. checkPermission();
  3. doWork();
  4. saveStatus();
  5. sendResponse();
  6. }

然后,通过线程池去执行这些任务。

观察process()方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?

process()方法需要传递的状态就是User实例。有的童鞋会想,简单地传入User就可以了:

  1. public void process(User user) {
  2. checkPermission(user);
  3. doWork(user);
  4. saveStatus(user);
  5. sendResponse(user);
  6. }

但是往往一个方法又会调用其他很多方法,这样会导致User传递到所有地方:

  1. void doWork(User user) {
  2. queryStatus(user);
  3. checkStatus();
  4. setNewStatus(user);
  5. log();
  6. }

这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。

给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User对象就传不进去了。

Java标准库提供了一个特殊的ThreadLocal,它可以在一个线程中传递同一个对象。

ThreadLocal实例通常总是以静态字段初始化如下:

  1. static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();

它的典型使用方式如下:

  1. void processUser(user) {
  2. try {
  3. threadLocalUser.set(user);
  4. step1();
  5. step2();
  6. } finally {
  7. threadLocalUser.remove();
  8. }
  9. }

通过设置一个User实例关联到ThreadLocal中,在移除之前,所有方法都可以随时获取到该User实例:

  1. void step1() {
  2. User u = threadLocalUser.get();
  3. log();
  4. printUser();
  5. }
  6. void log() {
  7. User u = threadLocalUser.get();
  8. println(u.name);
  9. }
  10. void step2() {
  11. User u = threadLocalUser.get();
  12. checkUser(u.id);
  13. }

注意到普通的方法调用一定是同一个线程执行的,所以,step1()step2()以及log()方法内,threadLocalUser.get()获取的User对象是同一个实例。

实际上,可以把ThreadLocal看成一个全局Map<Thread, Object>:每个线程获取ThreadLocal变量时,总是使用Thread自身作为key:

  1. Object threadLocalValue = threadLocalMap.get(Thread.currentThread());

因此,ThreadLocal相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal关联的实例互不干扰。

最后,特别注意ThreadLocal一定要在finally中清除:

  1. try {
  2. threadLocalUser.set(user);
  3. ...
  4. } finally {
  5. threadLocalUser.remove();
  6. }

这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal没有被清除,该线程执行其他代码时,会把上一次的状态带进去。

为了保证能释放ThreadLocal关联的实例,我们可以通过AutoCloseable接口配合try (resource) {...}结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal可以封装为一个UserContext对象:

  1. public class UserContext implements AutoCloseable {
  2. static final ThreadLocal<String> ctx = new ThreadLocal<>();
  3. public UserContext(String user) {
  4. ctx.set(user);
  5. }
  6. public static String currentUser() {
  7. return ctx.get();
  8. }
  9. @Override
  10. public void close() {
  11. ctx.remove();
  12. }
  13. }

使用的时候,我们借助try (resource) {...}结构,可以这么写:

  1. try (var ctx = new UserContext("Bob")) {
  2. // 可任意调用UserContext.currentUser():
  3. String currentUser = UserContext.currentUser();
  4. } // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象

这样就在UserContext中完全封装了ThreadLocal,外部代码在try (resource) {...}内部可以随时调用UserContext.currentUser()获取当前线程绑定的用户名。

本章小结

ThreadLocal表示线程的“局部变量”,它确保每个线程的ThreadLocal变量都是各自独立的;

ThreadLocal适合在一个线程的处理流程中保持上下文(避免了同一参数在所有方法中传递);

使用ThreadLocal要用try ... finally结构,并在finally中清除。