image.png

这篇笔记主要简要的介绍Java里的多线程和并发机制。主要是会用就行……卧槽并发太多东西了

介绍

学习计算机的时候,我们都了解到操作系统中的多任务:即在同一时刻运行多个程序的能力。计算机的这个在同一时间执行多个任务的能力,实际上不是真正意义上的“同一时间”,而是多个任务或进程共享一个 CPU,并交由操作系统来完成多任务间对 CPU 的运行切换,以使得每个任务都有机会获得一定的时间片运行,由于这个时间片很短,就给用户一种同时进行的错觉。

再后来发展到多线程技术,使得在一个程序内部能拥有多个线程并行执行。一个线程的执行可以被认为是一个 CPU 在执行该程序。当一个程序运行在多线程下,就好像有多个 CPU 在同时执行该程序。

多线程多进程有啥区别呢?本质的区别在于每个进程用于自己的一整套变量,而线程则共享数据,这就会因为对相同的内存空间进行并发读写操作造成不可忽略的问题。因此多线程也是一种更具挑战性的技术。

而Java是最优先支持多线程开发的语言之一,Java 从一开始就支持了多线程能力,因此 Java 开发者能常遇到上面描述的问题场景。

并发三特性

并发的三个特性是:原子性,可见性,有序性。

原子性(Atomicity)

原子性:一个或多个操作,要么全部执行且在执行过程中不被任何因素打断,要么全部不执行。

在Java中,对基本数据类型的变量的读取赋值操作是原子性操作。

  1. a = 1;
  2. a = true;
  3. a = a + 1;
  4. b++;
  5. a = b + 1;

在以上五条语句中,只有1、2是原子性操作。以第三条为例,首先读取出a的值,计算a+1的值,再将计算结果赋值给a,即包含了两个原子性操作,这条语句便不符合原子性。

不符合原子性就可能出现以下的问题,两个线程进行对a+1计算,理论上a最后为3,但结果却是2。

image.png

为了原子性可以采取以下措施:

  • 通过synchronized关键字定义同步代码块或者同步方法保障原子性。
  • 通过Lock接口保障原子性。
  • 通过Atomic类型保障原子性。

可见性(Visibility)

可见性:当一个线程修改了共享变量的值,其他线程能够看到修改的值。

如果没有实现可见性,同样是上面的例子,当一个线程修改了一个共享变量的值,另一个线程因为原先自己保存了共享变量的拷贝,而且它并不知道该变量的值已经变化了,那么它继续操作+1,把计算后的值赋值给a,这样就覆盖了原线程的操作,造成错误。
如果实现了可见性,那么在共享变量修改后,其他线程能够看到其修改的值,并用新值继续自己的操作,保证操作结果的正确性。

可以通过以下方法保证可见性

  • 通过volatile关键字标记内存屏障保证可见性。
  • 通过synchronized关键字定义同步代码块或者同步方法保障可见性。
  • 通过Lock接口保障可见性。
  • 通过Atomic类型保障可见性。

有序性(orderly)

有序性:即程序执行的顺序按照代码的先后顺序执行。
**
我们知道在JVM中存在指令重排序,只要执行结果一样,就会重排序指令使运行速度最快,于是就有了有序性问题。

在Java中,由于happen-before原则,代码是有序的,即是串行执行的。但是在多线程环境下,代码是交替串行执行的,可能会出现意料之外的问题,这就有了有序性的问题。

可以通过以下方法保证有序性:

  • 通过synchronized关键字定义同步代码块或者同步方法保障有序性。
  • 通过Lock接口保障有序性。
  • volatile包含禁止指令重排序的语义,其具有有序性。

实现多线程的方法

在进行多线程开发的时候,肯定离不开Thread类和Runnable接口。而实现多线程的两个方法是:继承Thread类,实现Runnable接口。大多数情况下都建议使用实现Runnable接口的方法,其优势如下:

  • 可以避免由于 Java 的单继承特性而带来的局限;
  • 增强程序的健壮性,代码能够被多个线程共享,代码与数据是独立的;
  • 适合多个相同程序代码的线程区处理同一资源的情况。

方法一,继承Thread类,并重写run()方法

  1. public class ThreadTest extends Thread{
  2. private int num = 5;
  3. @Override
  4. public void run() {
  5. for (int i = 0; i < 10; i++){
  6. if(num <= 0){
  7. break;
  8. }
  9. System.out.println(num--);
  10. }
  11. }
  12. public static void main(String[] args) {
  13. ThreadTest threadTest1 = new ThreadTest();
  14. ThreadTest threadTest2 = new ThreadTest();
  15. threadTest1.start();
  16. threadTest2.start();
  17. }
  18. }
  1. 输出:
  2. 5
  3. 4
  4. 3
  5. 2
  6. 1
  7. 5
  8. 4
  9. 3
  10. 2
  11. 1

方法二,实现Runnable接口

  1. public class ThreadTest implements Runnable{
  2. private int num = 5;
  3. @Override
  4. public void run() {
  5. for (int i = 0; i < 10; i++){
  6. if(num <= 0){
  7. break;
  8. }
  9. System.out.println(num--);
  10. }
  11. }
  12. public static void main(String[] args) {
  13. ThreadTest threadTest = new ThreadTest();
  14. Thread thread = new Thread(threadTest);
  15. Thread thread1 = new Thread(threadTest);
  16. thread.start();
  17. thread1.start();
  18. }
  19. }
  1. 输出:
  2. 5
  3. 3
  4. 2
  5. 1
  6. 4

在实现多线程时,一个线程Thread对象创建出来之后并不会自动运行,需要调用start()方法才会执行线程的run()方法。而在上面两个实例中,同样是对num的输出并递减,并且都是开启并执行了两个线程,从输出的差异我们可以看出两种方法执行的不是同一段代码,前者没有资源共享,后者则存在资源的共享。

另外补充的是输出的结果其实是随机的,因为线程时间片的分配调度难以预测。 在Java8开始,由于引入了lambda表达式这一特性,我们在创建线程的时候也可以传入一段lambda表达式来取代必须实现Runnable接口的方法,不过需要注意的是lambda表达式是一次性的,如果需要多个线程资源共享则不能使用该方法。

  1. //启动这个线程,将引发调用run()方法。该方法会立即返回,并且新线程将并发运行
  2. public synchronized void start();
  3. //调用关联Runnable的run()方法
  4. public void run();

中断线程

一个线程的结束有以下两种情况:

  • 线程的run()方法执行完毕,线程终止
  • 线程的run()方法在执行中捕获到了异常,线程终止

如果想要强制终止一个线程,在Java的早期版本中,可以使用stop()方法做到,但是这个方法现在已经弃用了。现在的Java多线程开发中,没有可以强制线程终止的方法,并且一个线程也不应该被强制终止。不过可以使线程中断,进入阻塞状态。

  1. static void sleep(long millis);
  2. public void interrupt();
  3. public static boolean interrupted();
  4. public boolean isInterrupted();

使用sleep()方法可以使线程睡眠,即进入阻塞状态。

使用interrupt()方法中断线程
当一个线程运行时,另一个线程可以调用对应的 Thread 对象的 interrupt()方法来中断它,该方法只是在目标线程中设置一个标志,表示它已经被中断,并立即返回,并没有真正中断线程,线程还是正常执行。

但是,如果线程被阻塞,就无法检测中断状态:在一个被阻塞的线程(调用了sleep()或wait())上调用interrupt()方法时,阻塞调用将会抛出一个InterruptException异常,线程会被该异常中断。

  1. public class ThreadTest extends Thread{
  2. private int num = 5;
  3. @Override
  4. public void run() {
  5. try {
  6. for (int i = 0; i < 10; i++) {
  7. if (num <= 0) {
  8. break;
  9. }
  10. System.out.println(num--);
  11. Thread.sleep(10000);
  12. }
  13. }catch (InterruptedException e){
  14. System.out.println("Exception");
  15. //如果注释掉return语句,在处理完中断异常后不会实际中断,而会继续往下执行
  16. return;
  17. }
  18. System.out.println("in run()");
  19. }
  20. public static void main(String[] args) {
  21. ThreadTest threadTest1 = new ThreadTest();
  22. threadTest1.start();
  23. threadTest1.interrupt();
  24. }
  25. //输出:
  26. //5
  27. //Exception
  28. //in run()
  29. }

在这里需要注意的是,在处理完中断异常之后,线程不会实际被中断,而会继续往下执行(输出“in run()”),如果要中断线程,则需加上return语句。

如果在调用sleep()方法之前线程的状态已是中断状态(调用了interrupt()),那么在调用sleep()之前就会注意到这一状态,并且抛出InterruptedException异常。这种中断称为待决中断。

使用isInterrupted()方法判断中断状态
在Thread线程上调用isInterrupted()方法可以检测当前线程的中断状态。这个方法不会改变中断状态,如果线程是中断状态则返回true,但是如果sleep()方法抛出异常,它将清空中断标志,此时isInterrupted()方法将返回 false。

  1. public static void main(String[] args) {
  2. //中断自身线程
  3. Thread.currentThread().interrupt();
  4. System.out.println("isInterrupt = " + Thread.currentThread().isInterrupted());
  5. }

使用interrupted()方法判断中断状态
interrupted()方法是一个静态方法,它检测当前的线程是否被中断,而且会清除该线程的中断状态(隐式重置为 false)。

  1. public static void main(String[] args) {
  2. System.out.println("isInterrupt = " + Thread.interrupted());
  3. Thread.currentThread().interrupt();
  4. System.out.println("isInterrupt = " + Thread.interrupted());
  5. System.out.println("isInterrupt = " + Thread.interrupted());
  6. }
  7. /*
  8. isInterrupt = false
  9. isInterrupt = true
  10. isInterrupt = false
  11. */

线程的状态和优先级

线程有如下6种状态:

  • New(新创建)
  • Runnable(可运行)
  • Blocked(被阻塞)
  • Waiting(等待)
  • Timed Waiting(计时等待)
  • Terminated(被终止) (Dead)

并发 - 图3

一个线程一旦调用start()方法,就会进入可运行状态。为什么说它是可运行状态而不是运行中状态呢,因为一个可运行线程可能正在运行也可能没有运行,这取决于操作系统给线程提供的运行的时间。线程调度的细节依赖于操作系统提供的服务,抢占式调度系统给每一个可运行线程一个时间片来执行任务,当时间片用完,操作系统剥夺该线程的运行权,并给另一个线程运行程序。选择下一个线程时,操作系统会考虑线程的优先级。

除了抢占式调度,还有一种叫做协作式调度,像手机这种小型设备可能会使用。这种方式是,一个线程只有调用yield方法、或者被阻塞或等待时,线程才失去控制权。

当线程处于被阻塞或等待状态时,它暂时不活动。它不运行任何代码且消耗最少的资源。直到线程调度器重新激活它。
当一个线程视图获取一个内部的对象锁,而该锁被其他线程持有,则该线程进入阻塞状态。当所有其他线程释放该锁,并且线程调度器允许本线程持有它时,它才变成非阻塞状态,恢复运行。
当线程等待另一个线程通知调度器一个条件时,它自己进入等待状态。有几个方法有一个超时参数,调用它们导致线程进入计时等待状态。这一状态将一直保持到超时期满或者接收到适当的通知。(Thread.sleep Object.wait Lock.tryLock Condition.await)

优先级
前面提到操作系统会考虑线程的优先级来分配时间片。在Java程序设计语言中,每个线程有一个优先级。默认情况下,一个线程继承它的父线程的优先级,而线程的默认优先级是5。
线程的优先级从MIN_PRIORITY(1)到MAX_PRIORITY(10)增长,共有十个,默认是NORM_PRIORITY(5)。可以通过setPriority()方法对线程的优先级进行操作:

  1. //设置线程的优先级,必须在1~10之间
  2. public final void setPriority(int newPriority);
  3. //导致当前执行的线程处于让步状态。如果有其他的可运行线程具有至少与此线程同样高的优先级,
  4. //那么这些线程接下来会被调度
  5. public static native void yield()

守护线程

Java 中有两类线程:User Thread(用户线程)、Daemon Thread(守护线程)

用户线程即运行在前台的线程,而守护线程是运行在后台的线程。 守护线程作用是为其他前台线程的运行提供便利服务,而且仅在普通、非守护线程仍然运行时才需要,比如垃圾回收线程就是一个守护线程。当 VM 检测仅剩一个守护线程,而用户线程都已经退出运行时,VM就会退出,因为没有其他线程被守护着,也就没有继续运行程序的必要了。如果有非守护线程仍然存活,VM 就不会退出。

用户也可以通过setDeamon(true)方法来设置守护线程,但是需要注意的是,必须小心确保其他所有非守护线程消亡时,不会由于它的终止而产生任何危害。守护线程应该永远不去访问固有资源,不要在守护线程中执行业务逻辑操作,如对文件、数据库的读写,因为它会在任何时候甚至在一个操作的中间发生中断。

注意: setDaemon(true)必须在调用线程的 start()方法之前设置,否则会抛出 IllegalThreadStateException 异常。 在守护线程中产生的新线程也是守护线程。

  1. //标识该线程为守护线程(true)或用户线程(false),必须在线程启动之前调用
  2. public final void setDaemon(boolean on);

同步

之前在介绍线程的实现的时候提到了资源共享问题。在大多数实际的多线程应用中,两个或两个以上的线程需要共享对同一数据的存取。而一旦存在共享数据,那肯定免不了一些问题的发生,其中就有一个多次提起的银行存取款的例子。

该例子简单描述是,当一个线程读取了该个账户中的余额为5000,接下来想要取出着5000块,可是刚好在读取完余额后,方法判断有足够余额可以取钱,但是该线程时间片用完了。此时另一个线程访问同一个账户,同样读取到账户余额为5000,并取出了4000元,线程结束。当线程一恢复运行时,因为此前已经执行了判断操作,所以直接进行下一步取出5000块,这时就会导致账户余额出错,变成了-4000块!

在并发编程中,多线程同时并发访问的资源叫做临界资源,当多个线程同时访问对象并要求操作相同资源时,分割了原子操作就有可能出现数据的不一致或数据不完整的情况,为避免这种情况的发生,我们会采取同步机制,以确保在某一时刻,方法内只允许有一个线程。

采用锁来实现同步机制叫做互斥锁机制。通过对变量或者代码块加锁,实现同时只能有一个线程访问。当一个线程要访问该加锁的代码块时,需要申请获得该锁,在持有该锁之后才能进入执行,之后的线程再申请该锁时会进入阻塞状态,直到锁被释放,下一个线程才能获得锁并进入执行,这个锁是为了分配给线程的,防止打断原子操作。每个对象的锁只能分配给一个线程,因此叫做互斥锁。

锁对象

在Java中,有两种机制防止代码块受到并发访问的干扰。Java 5 中引入了新的锁机制——java.util.concurrent.locks 中的显式的互斥锁:Lock 接口,它提供了比synchronized 更加广泛的锁定操作。Lock 接口有 3 个实现它的类:ReentrantLock、ReetrantReadWriteLock.ReadLock 和 ReetrantReadWriteLock.WriteLock,即重入锁、读锁和写锁。

lock 必须被显式地创建、锁定和释放,为了可以使用更多的功能,一般用 ReentrantLock 为其实例化。
用ReentrantLock保护代码块的基本结构如下:

  1. ReentrantLock lock = new ReentrantLock();
  2. lock.lock(); //加锁
  3. try{
  4. }finally{
  5. lock.unlock(); //确保一个锁在最后被释放,否则会导致死锁
  6. }

这一结构确保任何时刻只有一个线程进入临界区。一旦一个线程封锁了锁对象,其他任何线程都无法通过lock语句。当其他线程调用lock时,它们被阻塞,直到第一个线程释放锁对象。
此外,线程在每一次调用lock都要调用unlock来释放锁,否则会导致死锁。还要考虑到抛出异常的情况,unlock必须放在finally块里确保锁最终都会被释放。

synchronized 关键字

Lock和Condition接口为程序设计人员提供了高度的锁定控制。然而,大多数情况下,并不需要那样控制。从Java 1.0开始,Java中的每一个对象都有一个内部锁。可以使用synchronized关键字声明,那么对象的锁将保护整个方法。也就是说,要调用该方法,线程必须获得内部的对象锁。

对于互斥锁机制以及synchronized的使用,有以下几点注意:

  1. 如果同一个方法内同时有两个或更多线程,则每个线程有自己的局部变量拷贝
  2. 类的每个实例都有自己的对象级别锁。当一个线程访问实例对象中的 synchronized 同步代码块或同步方法时,该线程便获取了该实例的对象级别锁,其他线程这时如果要访问 synchronized 同步代码块或同步方法,便需要阻塞等待,直到前面的线程从同步代码块或方法中退出,释放掉了该对象级别锁。
  3. 访问同一个类的不同实例对象中的同步代码块,不存在阻塞等待获取对象锁的问题,因为它们获取的是各自实例的对象级别锁,相互之间没有影响。
  4. 持有一个对象级别锁不会阻止该线程被交换出来,也不会阻塞其他线程访问同一实例对象中的非 synchronized 代码。(即一个线程持有一个对象锁时,当该线程的时间片用完了,该线程还是会进入等待,其他线程同样可以获得执行的机会,并且当其他线程访问同一实例对象中其他非synchronized代码时仍然可以执行,但当遇到synchronized代码时由于没有锁,还是会进入阻塞状态。)
  5. 使用 synchronized(obj)同步语句块,可以获取指定对象上的对象级别锁。
  1. //给方法加锁
  2. public synchronized void transfer(){
  3. //action...
  4. }
  5. //同步代码块
  6. synchronized(obj){
  7. //action...
  8. }

synchronized的内存可见性
加锁(synchronized 同步)的功能不仅仅局限于互斥行为,同时还存在另外一个重要的方面:内存可见性。我们不仅希望防止某个线程正在使用对象状态而另一个线程在同时修改该状态,而且还希望确保当一个线程修改了对象状态后,其他线程能够看到该变化。而线程的同步恰恰也能够实现这一点。

内置锁可以用于确保某个线程以一种可预测的方式来查看另一个线程的执行结果。为了确保所有的线程都能看到共享变量的最新值,可以在所有执行读操作或写操作的线程上加上同一把锁。

并发 - 图4

volatile 关键字

在多任务多线程环境下,如果实现内存可见性,让共享变量的安全性得到保障,还可以使用volatile关键字。

volatile这个关键字和Java的内存模型相关(这里先简单介绍),在当前的 Java 内存模型下,线程可以把变量保存在本地内存(比如机器的寄存器)中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成数据的不一致。
此时把变量用volatile关键字修饰,该成员变量每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。

volatile 关键字就是提示 JVM:对于这个成员变量,它是不稳定的,不能保存它的私有拷贝,而应直接与共享成员变量交互。

volatile 是一种稍弱的同步机制,在访问 volatile 变量时不会执行加锁操作,也就不会执行线程阻塞,因此 volatilei 变量是一种比 synchronized 关键字更轻量级的同步机制。

Volatile 变量具有 synchronized 的可见性特性,但是不具备原子特性。这就是说线程能够自动发现 volatile 变量的最新值。Volatile 变量可用于提供线程安全,但是只能应用于非常有限的一组用例:多个变量之间或者某个变量的当前值与修改后值之间没有约束。因此,单独使用 volatile 还不足以实现计数器、互斥锁或任何具有与多个变量相关的不变式(Invariants)的类(例如 “start <=end”)。

正确使用volatile变量的条件:

  • 对变量的写操作不依赖于当前值。
  • 该变量没有包含在具有其他变量的不变式中。

对于volatile的更深层的探讨:
正确使用 Volatile 变量—Brian Goetz
Java并发编程:volatile关键字解析

可重入内置锁

每个 Java 对象都可以用做一个实现同步的锁,这些锁被称为内置锁或监视器锁。线程在进入同步代码块之前会自动获取锁,并且在退出同步代码块时会自动释放锁。获得内置锁的唯一途径就是进入由这个锁保护的同步代码块或方法。

当某个线程请求一个由其他线程持有的锁时,发出请求的线程就会阻塞。然而,由于内置锁是可重入的,因此如果某个线程试图获得一个已经由它自己持有的锁,那么这个请求就会成功。“重入”意味着获取锁的操作的粒度是“线程”,而不是调用。重入的一种实现方法是,为每个锁关联一个获取计数值和一个所有者线程。当计数值为 0 时,这个锁就被认为是没有被任何线程所持有,当线程请求一个未被持有的锁时,JVM 将记下锁的持有者,并且将获取计数值置为 1,如果同一个线程再次获取这个锁,计数值将递增,而当线程退出同步代码块时,计数器会相应地递减。当计数值为 0 时,这个锁将被释放。

  1. public class Father
  2. {
  3. public synchronized void doSomething(){
  4. ......
  5. }
  6. }
  7. public class Child extends Father
  8. {
  9. @Override
  10. public synchronized void doSomething(){
  11. ......
  12. super.doSomething();
  13. }
  14. }

在上面的例子中,子类重写了父类的同步方法。两个doSomething都是同步方法,因此在进入每个doSomething方法时都需要请求该Child实例的对象锁。如果此时锁不是可重入的,再进入子类的doSomething方法时,由于该实例的对象锁已经被持有,因此再次申请该对象锁时请求失败,线程进入阻塞状态,而该方法又不能执行完毕,该锁得不到释放,就会造成死锁。因此,可重入锁可以有效解决这种情况。

消息通知机制

在 Java 中,可以通过配合调用 Object 对象的 wait() 方法和 notify()方法或 notifyAll() 方法来实现线程间的通信。

  1. //随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态。……
  2. public final native void notify();
  3. //解除那些在该对象上调用wait方法的线程的阻塞状态。只能在同步块或同步代码里调用。
  4. //如果当前线程不是对象锁持有者,抛出IllegalMonitorStateException
  5. public final native void notifyAll();
  6. //导致线程进入等待状态直到它被通知。该方法只能在一个同步方法中调用
  7. //如果当前线程不是对象锁持有者,抛出IllegalMonitorStateException
  8. //@param timeout 超时时间 毫秒
  9. //@param timeout 纳秒
  10. public final native void wait(long timeout) throws InterruptedException;
  11. public final void wait(long timeout, int nanos) throws InterruptedException;
  12. public final void wait() throws InterruptedException;

Object 是所有类的超类,它有 5 个方法组成了等待/通知机制的核心:notify()、notifyAll()、wait()、wait(long)和 wait(long,int)。在 Java 中,所有的类都从 Object 继承而来,因此,所有的类都拥有这些共有方法可供使用。而且,由于他们都被声明为 final,因此在子类中不能覆写任何一个方法。

wait()方法
该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait()之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait()方法。进入 wait()方法后,当前线程释放锁。在从 wait()返回前,线程与其他线程竞争重新获得锁。

notify()
该方法要在同步方法或同步块中调用,即在调用前,线程必须要获得该对象的对象级别锁,否则会抛出 IllegalMonitorStateException。
该方法用来通知那些可能等待该对象的对象锁的其他线程。如果有多个线程等待,则线程规划器任意挑选出其中一个 wait()状态的线程来发出通知,并使它等待获取该对象的对象锁(notify 后,当前线程不会马上释放该对象锁,wait 所在的线程并不能马上获取该对象锁,要等到程序退出 synchronized 代码块后,当前线程才会释放锁,wait所在的线程也才可以获取该对象锁),但不惊动其他同样在等待被该对象notify的线程们。当第一个获得了该对象锁的 wait 线程运行完毕以后,它会释放掉该对象锁,此时如果该对象没有再次使用 notify 语句,则即便该对象已经空闲,其他 wait 状态等待的线程由于没有得到该对象的通知,会继续阻塞在 wait 状态,直到这个对象发出一个 notify 或 notifyAll。注意:它们等待的是被 notify 或 notifyAll,而不是锁。这与下面的 notifyAll()方法执行后的情况不同。

notifyAll()方法
该方法与 notify ()方法的工作方式相同,重要的一点差异是:
notifyAll 使所有原来在该对象上 wait 的线程统统退出 wait 的状态(即全部被唤醒,不再等待 notify 或 notifyAll,但由于此时还没有获取到该对象锁,因此还不能继续往下执行),变成等待获取该对象上的锁,一旦该对象锁被释放(notifyAll 线程退出调用了 notifyAll 的 synchronized 代码块的时候),他们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出 synchronized 代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。

notify()的早期问题:notify 通知的遗漏很容易理解,即 threadA 还没开始 wait 的时候,threadB 已经 notify 了,这样,threadB 通知是没有任何响应的,当 threadB 退出 synchronized 代码块后,threadA 再开始 wait,便会一直阻塞等待,直到被别的线程打断。

为了避免这个问题,通常可以设定一个boolean变量值,在notify之前修改这个值。把wait的代码块放进一个while循环里,这样当notify修改后就可以退出这个循环,或在通知遗漏后不会阻塞在wait方法处。(一般都要这样做以防止早期通知问题)

多线程中while和if:用if判断的话,唤醒后线程会从wait之后的代码开始运行,但是不会重新判断if条件,直接继续运行if代码块之后的代码,而如果使用while的话,也会从wait之后的代码运行,但是唤醒后会重新判断循环条件,如果不成立再执行while代码块之后的代码块,成立的话继续wait。

生产者——消费者模式

生产者——消费者模式是并发编程中一个非常经典的问题。这个模式是有两种线程,生产者线程和消费者线程;一个仓库(共享数据区),为了解耦生产者和消费者的关系。生产者不停地向仓库里生产商品,消费者不停地从仓库里消费商品,两者不必关系对方的行为。

此外这个仓库(共享数据区)需要具备以下功能:

  • 当仓库满了,阻塞生产者继续生产商品
  • 当仓库空了,阻塞消费者继续消费商品

并发 - 图5

使用wait/notify消息机制实现

public class Test2 {

    //仓库容量为10
    private static final int FULL = 10;
    //目前仓库产品数目
    private static int count = 0;
    private static Object object = new Object();

    public static void main(String[] args) {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }

    //生产者类
    static class Producer implements Runnable{

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try{
                    Thread.sleep(1000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                synchronized (object){
                    //循环判断仓库是否已经满了(循环的原因是避免早期通知问题)
                    while(count == FULL){
                        try{
                            //如果满了,生产者阻塞
                            object.wait();
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                    //生产产品
                    count++;
                    System.out.println(Thread.currentThread().getName() + ": 生产者:目前产品数 = " + count);
                    object.notify();
                }
            }
        }
    }

    //消费者类
    static class Consumer implements Runnable{

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try{
                    Thread.sleep(1000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                synchronized (object){
                    //循环判断仓库是否空了
                    while(count == 0){
                        try{
                            //如果空了,消费者阻塞
                            object.wait();
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                    //消费产品
                    count--;
                    System.out.println(Thread.currentThread().getName() + ": 消费者:目前产品数 = " + count);
                    object.notify();
                }
            }
        }
    }
}
//输出
Thread-0: 生产者:目前产品数 = 1
Thread-2: 生产者:目前产品数 = 2
Thread-3: 消费者:目前产品数 = 1
Thread-1: 消费者:目前产品数 = 0
Thread-0: 生产者:目前产品数 = 1
Thread-2: 生产者:目前产品数 = 2
Thread-1: 消费者:目前产品数 = 1
//......
Thread-2: 生产者:目前产品数 = 2
Thread-3: 消费者:目前产品数 = 1
Thread-1: 消费者:目前产品数 = 0

这是该模型最简单的一个实现。使用wait()和notify()方法,当仓库满了的时候,调用wait()方法使得生产者进入阻塞,等待消费者消费了产品之后,仓库则有空余,然后消费者notify()通知其他线程,生产者解除阻塞。当仓库空了的时候同理。

但是在多消费者多生产者的情况下会有一种“假死”问题。假设当前多个生产者线程调用wait方法进入阻塞等待,当其中的生产者线程获取到对象锁之后使用notify通知处于等待状态的线程,如果唤醒的仍然是生产者线程,就会造成所有的生产者线程都处于等待状态。

解决办法:将notify方法替换成notifyAll方法,也称为wait/notifyAll通知机制的实现方法

使用Lock中Condition的await/signalAll实现

在java.util.concurrent.locks包里,可以使用Java 1.5引入的Condition接口,它参照Object的wait()、notify()和notifyAll(),也提供了await()、signal()和signalAll()的条件机制。

使用该方法实现和wait/notify通知机制差不多,使用Lock进行同步加锁操作。

public class Test2 {

    private static final int FULL = 10;
    private static int count = 0;
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition full = lock.newCondition();
    private static Condition empty = lock.newCondition();

    public static void main(String[] args) {
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }

    static class Producer implements Runnable{

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try{
                    Thread.sleep(1000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                //使用lock对代码块加锁
                lock.lock();
                try {
                    while (count == FULL) {
                        try {
                            //如果满了,生产者阻塞
                            full.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    count++;
                    System.out.println(Thread.currentThread().getName() + ": 生产者:目前产品数 = " + count);
                    empty.signal();
                }finally {
                    //释放锁
                    lock.unlock();
                }
            }
        }
    }

    static class Consumer implements Runnable{

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try{
                    Thread.sleep(1000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                lock.lock();
                try {
                    while (count == 0) {
                        try {
                            //如果空了,消费者阻塞
                            empty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    count--;
                    System.out.println(Thread.currentThread().getName() + ": 消费者:目前产品数 = " + count);
                    full.signal();
                }finally {
                    lock.unlock();
                }
            }
        }
    }
}

阻塞队列的实现
**
BlockingQueue阻塞队列,作为一种常用的并发容器,在生产者消费者模式中也有很大的应用,其原因是阻塞队列内部实现了两个阻塞操作。因此在此模式中,阻塞队列可以充当共享数据区的角色。

使用阻塞队列主要使用到put()和take()两个方法实现阻塞。

public class Test2 {

    private static ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(10);

    public static void main(String[] args) {
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
        new Thread(new Producer(blockingQueue)).start();
        new Thread(new Consumer(blockingQueue)).start();
    }

    static class Producer implements Runnable{

        BlockingQueue blockingQueue;

        Producer(BlockingQueue blockingQueue){
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try{
                    Thread.sleep(1000);
                    blockingQueue.put(1);
                    System.out.println(Thread.currentThread().getName() + ": 生产者:目前产品数 = " + blockingQueue.size());
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }

    static class Consumer implements Runnable{

        BlockingQueue blockingQueue;

        Consumer(BlockingQueue blockingQueue){
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try{
                    Thread.sleep(1000);
                    blockingQueue.take();
                    System.out.println(Thread.currentThread().getName() + ": 消费者:目前产品数 = " + blockingQueue.size());
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
}

信号量Semaphore的实现

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,在操作系统中是一个非常重要的问题,可以用来解决哲学家就餐问题。Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量,可以使用acquire()方法获得一个许可,当许可不足时会被阻塞,release()添加一个许可。在下列代码中,还加入了另外一个mutex信号量,维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行。

public class Test2 {

    private static int count = 0;
    final static Semaphore notFull = new Semaphore(10);
    final static Semaphore notEmpty = new Semaphore(0);
    final static Semaphore mutex = new Semaphore(1);



    public static void main(String[] args) {
        Test2 test2 = new Test2();
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
    }

    static class Producer implements Runnable{

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try{
                    //等待空位
                    notFull.acquire();
                    //等待读写锁
                    mutex.acquire();
                    Thread.sleep(1000);
                    count++;
                    System.out.println(Thread.currentThread().getName() + ": 生产者:目前产品数 = " + count);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally {
                    mutex.release();
                    notEmpty.release();
                }
            }
        }
    }

    static class Consumer implements Runnable{

        @Override
        public void run() {
            for(int i = 0; i < 10; i++){
                try{
                    //等待空位
                    notEmpty.acquire();
                    //等待读写锁
                    mutex.acquire();
                    Thread.sleep(1000);
                    count--;
                    System.out.println(Thread.currentThread().getName() + ": 消费者:目前产品数 = " + count);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }finally {
                    //释放许可
                    mutex.release();
                    notFull.release();
                }
            }
        }
    }
}