4.1 等待/通知机制

4.1.1 概念

线程条件不满足时进入等待状态,条件满足时继续执行。

4.1.2 实现

Object类中的wait()方法可以使执行当前代码的线程等待,暂停执行,直到通知或者被中断为止。

  1. public class Test01 {
  2. public static void main(String[] args) {
  3. try{
  4. String text = "something";
  5. System.out.println("同步前的代码");
  6. synchronized (text){
  7. System.out.println("同步代码块开始……");
  8. text.wait(); // 线程等待,后面的内容不输出
  9. System.out.println("wait后的代码……");
  10. }
  11. System.out.println("同步代码块后面的代码");
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. System.out.println("main最后的代码");
  16. }
  17. }

注意:1. wait()方法只能在同步代码块中由锁对象调用;

  1. 调用wait()方法,当前线程会释放锁;

Object类的notify()可以唤醒线程,该方法也必须在同步代码块中,由锁对象调用,没有使用锁对象调用wait()/notify()会抛出IllgalMonitorStateException异常。如果有多个等待的线程,notify()方法只能唤醒其中一个。在同步代码块中,调用notify()方法会并不会立即释放锁对象,需要等当前同步代码块执行完后会释放锁对象,一般将nofity()方法放在同步代码块的最后。
在使用notify()的时候,可能出现通过过早的情况,需要再引入一个变量来判断是否已经唤醒过,唤醒过就不再需要等待了。

  1. public class Test02 {
  2. public static void main(String[] args) throws InterruptedException {
  3. /*
  4. 线程1开始等待:1614737148963
  5. 线程2开始唤醒:1614737151964
  6. 线程2结束唤醒:1614737152964
  7. 线程1结束等待:1614737152964
  8. */
  9. String lock = "something";
  10. Thread t1 = new Thread(() -> {
  11. synchronized (lock) {
  12. System.out.println("线程1开始等待:" + System.currentTimeMillis());
  13. try {
  14. lock.wait();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. System.out.println("线程1结束等待:" + System.currentTimeMillis());
  19. }
  20. });
  21. Thread t2 = new Thread(() -> {
  22. synchronized (lock) {
  23. System.out.println("线程2开始唤醒:" + System.currentTimeMillis());
  24. lock.notify();
  25. try {
  26. Thread.sleep(1000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. System.out.println("线程2结束唤醒:" + System.currentTimeMillis());
  31. }
  32. });
  33. t1.start();
  34. Thread.sleep(3000);
  35. t2.start();
  36. }
  37. }

4.1.3 interrupt()方法会中断wait()

当线程处于wait()等待状态时,调用线程对象的interrupt()方法会中断线程的等待状态,会产生InterruptedException异常。

  1. public class Test03 {
  2. public static void main(String[] args) throws InterruptedException {
  3. SubThread subThread = new SubThread();
  4. subThread.start();
  5. Thread.sleep(2000);
  6. subThread.interrupt();
  7. }
  8. private static final Object LOCK = new Object();
  9. static class SubThread extends Thread {
  10. @Override
  11. public void run() {
  12. synchronized (LOCK) {
  13. System.out.println("begin wait...");
  14. try {
  15. LOCK.wait();
  16. System.out.println("end wait...");
  17. } catch (InterruptedException e) {
  18. System.out.println("wait等待被中断");
  19. }
  20. }
  21. }
  22. }
  23. }

4.1.4 notify()与notifyAll()

notify()一次只能唤醒一个线程,如果有多个等待线程,只能随机唤醒其中某一个;想要唤醒所有线程,需要调用notifyAll()。

  1. public class Test04 {
  2. public static void main(String[] args) {
  3. Object lock = new Object();
  4. SubThread t1 = new SubThread(lock);
  5. SubThread t2 = new SubThread(lock);
  6. SubThread t3 = new SubThread(lock);
  7. t1.setName("t1");
  8. t2.setName("t2");
  9. t3.setName("t3");
  10. t1.start();
  11. t2.start();
  12. t3.start();
  13. try {
  14. Thread.sleep(2000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. synchronized (lock){
  19. /* 只能唤醒一个线程,这种情况称为信号丢失
  20. t1 -- begin wait...
  21. t3 -- begin wait...
  22. t2 -- begin wait...
  23. t1 -- end wait...
  24. */
  25. // lock.notify();
  26. /*
  27. t1 -- begin wait...
  28. t3 -- begin wait...
  29. t2 -- begin wait...
  30. t2 -- end wait...
  31. t3 -- end wait...
  32. t1 -- end wait...
  33. */
  34. lock.notifyAll();
  35. }
  36. }
  37. static class SubThread extends Thread {
  38. private Object lock;
  39. public SubThread(Object lock){
  40. this.lock = lock;
  41. }
  42. @Override
  43. public void run() {
  44. synchronized (lock){
  45. try{
  46. System.out.println(Thread.currentThread().getName() + " -- begin wait...");
  47. lock.wait();
  48. System.out.println(Thread.currentThread().getName() + " -- end wait...");
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. }
  54. }
  55. }

4.1.5 wait(long)

在参数指定的时间内没有被唤醒,超时后会自动唤醒。

4.1.6 wait等待条件发生变化

  1. /**
  2. * wait条件发生变化
  3. * 定义一个集合
  4. * 定义一个线程向集合中添加数据,添加完数据会通知另外的线程从集合中取数据
  5. * 定义一个线程从集合中取数据,如果集合中没有数据就等待
  6. */
  7. public class Test05 {
  8. public static void main(String[] args) {
  9. ThreadAdd threadAdd = new ThreadAdd();
  10. ThreadSubtract threadSubtract = new ThreadSubtract();
  11. threadSubtract.setName("subtract 1");
  12. /*
  13. 测试一:先开启添加数据的线程,再开启一个取数据的线程,大多数情况下正常
  14. threadAdd.start();
  15. threadSubtract.start();
  16. 测试二:先开启取数据的线程,再开启添加数据线程
  17. threadSubtract.start();
  18. threadAdd.start();
  19. 测试三:先开启两个取数据的线程,再开启添加数据的线程
  20. ThreadSubtract threadSubtract2 = new ThreadSubtract();
  21. threadSubtract2.setName("subtract 2");
  22. threadSubtract.start();
  23. threadSubtract2.start();
  24. threadAdd.start();
  25. 结果:同时唤醒后,先后取数据,一个取到,另一个取数据时再取时出现异常
  26. subtract 1 begin wait...
  27. subtract 2 begin wait...
  28. subtract 2 end wait...
  29. subtract 2从集合中取了data 后,集合中数据的数量:0
  30. subtract 1 end wait...
  31. 解决方案:被唤醒后依然要再判断,可将if改为while
  32. */
  33. }
  34. /**
  35. * 1) 定义List集合
  36. */
  37. static List<String> list = new ArrayList<>();
  38. /**
  39. * 2) 定义方法从集合中取数据
  40. */
  41. public static void subtract() {
  42. synchronized (list) {
  43. try {
  44. if (list.size() == 0) {
  45. System.out.println(Thread.currentThread().getName() + " begin wait...");
  46. list.wait();
  47. System.out.println(Thread.currentThread().getName() + " end wait...");
  48. }
  49. } catch (InterruptedException e) {
  50. e.printStackTrace();
  51. }
  52. Object data = list.remove(0);
  53. System.out.println(Thread.currentThread().getName()
  54. + "从集合中取了" + data + " 后,集合中数据的数量:" + list.size());
  55. }
  56. }
  57. /**
  58. * 3) 定义方法向集合中添加数据后,通知等待的线程取数据
  59. */
  60. public static void add(){
  61. synchronized (list){
  62. list.add("data");
  63. list.notifyAll();
  64. }
  65. }
  66. /**
  67. * 4) 定义线程类调用subtract取数据
  68. */
  69. static class ThreadSubtract extends Thread {
  70. @Override
  71. public void run() {
  72. subtract();
  73. }
  74. }
  75. /**
  76. * 5) 定义线程类调用add添加数据
  77. */
  78. static class ThreadAdd extends Thread {
  79. @Override
  80. public void run() {
  81. add();
  82. }
  83. }
  84. }

4.1.7 生产者消费者模式

  1. /**
  2. * 模拟产品
  3. */
  4. public class ValueOP {
  5. private String value = "";
  6. /**
  7. * 生产过程
  8. */
  9. public void setValue() {
  10. synchronized (this){
  11. while (!"".equalsIgnoreCase(value)){
  12. try {
  13. this.wait();
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. String value = System.currentTimeMillis() + " - " + System.nanoTime();
  19. System.out.println("set设置的值是:" + value);
  20. this.value = value;
  21. this.notifyAll();
  22. }
  23. }
  24. /**
  25. * 消费过程
  26. */
  27. public void getValue(){
  28. synchronized (this){
  29. while ("".equalsIgnoreCase(value)){
  30. try {
  31. this.wait();
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. System.out.println("get的值是:" + this.value);
  37. this.value = "";
  38. this.notifyAll();
  39. }
  40. }
  41. }
  42. /**
  43. * 定义线程模拟生产者
  44. */
  45. public class ProducerThread extends Thread{
  46. /**
  47. * 生产者生产数据,调用valueOP类的setValue方法给value赋值
  48. */
  49. private ValueOP obj;
  50. public ProducerThread(ValueOP obj){
  51. this.obj = obj;
  52. }
  53. @Override
  54. public void run() {
  55. while (true){
  56. obj.setValue();
  57. }
  58. }
  59. }
  60. /**
  61. * 定义线程模拟消费者
  62. */
  63. public class ConsumerThread extends Thread{
  64. /**
  65. * 消费者消费数据,调用valueOP类的getValue方法消费产品
  66. */
  67. private ValueOP obj;
  68. public ConsumerThread(ValueOP obj){
  69. this.obj = obj;
  70. }
  71. @Override
  72. public void run() {
  73. while (true){
  74. obj.getValue();
  75. }
  76. }
  77. }
  78. public class Test {
  79. public static void main(String[] args) {
  80. ValueOP valueOP = new ValueOP();
  81. /*
  82. 测试一生产,一消费的情况
  83. ProducerThread producerThread = new ProducerThread(valueOP);
  84. ConsumerThread consumerThread = new ConsumerThread(valueOP);
  85. producerThread.start();
  86. consumerThread.start();
  87. 测试多个生产者,多个消费者:
  88. 把等待条件的if改为while,避免重新唤醒消费者后直接消费空串;
  89. 但是出现了假死现象:消费者消费空串时,重新进入等待状态,生产者又未被唤醒进行生产,所以需要把notify()改为notifyAll()。
  90. ProducerThread producerThread1 = new ProducerThread(valueOP);
  91. ProducerThread producerThread2 = new ProducerThread(valueOP);
  92. ProducerThread producerThread3 = new ProducerThread(valueOP);
  93. ConsumerThread consumerThread1 = new ConsumerThread(valueOP);
  94. ConsumerThread consumerThread2 = new ConsumerThread(valueOP);
  95. ConsumerThread consumerThread3 = new ConsumerThread(valueOP);
  96. producerThread1.start();
  97. producerThread2.start();
  98. producerThread3.start();
  99. consumerThread1.start();
  100. consumerThread2.start();
  101. consumerThread3.start();
  102. */
  103. }
  104. }

4.2 通过管道实现线程间的通信

在java.io包中的PipeStream管道流用于在线程之间传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读取数据,相关的类包括:PipedInputStream和PipedOutputStream,PipedReader和PipedWriter。

  1. /**
  2. * 使用PipedInputStream和PipedOutputStream管道字节流在线程之间传递数据
  3. *
  4. * @author 王游
  5. * @date 2021/3/3 19:45
  6. */
  7. public class Test {
  8. public static void main(String[] args) {
  9. // 定义管道字节流
  10. PipedInputStream inputStream = new PipedInputStream();
  11. PipedOutputStream outputStream = new PipedOutputStream();
  12. // 建立连接
  13. try {
  14. inputStream.connect(outputStream);
  15. } catch (IOException e) {
  16. e.printStackTrace();
  17. }
  18. new Thread(() -> readData(inputStream)).start();
  19. try {
  20. Thread.sleep(1000);
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. new Thread(() -> writeData(outputStream)).start();
  25. }
  26. /**
  27. * 定义方法向管道流中写入数据
  28. */
  29. public static void writeData(PipedOutputStream out) {
  30. int sum = 100;
  31. try {
  32. for (int i = 0; i < sum; i++) {
  33. String data = "" + i;
  34. out.write(data.getBytes());
  35. }
  36. out.close();
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. /**
  42. * 定义方法从管道流中读取数据
  43. */
  44. public static void readData(PipedInputStream in) {
  45. byte[] bytes = new byte[1024];
  46. // 从管道中读取字节
  47. try {
  48. // This method blocks until input data is available, end of file is detected, or an exception is thrown.
  49. int len = in.read(bytes);
  50. while (len != -1) {
  51. System.out.println(new String(bytes, 0, len));
  52. len = in.read(bytes);
  53. }
  54. in.close();
  55. } catch (IOException e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. }

4.3 thread.join

线程没有执行完之前,会一直阻塞在join方法处。

  1. public class JoinDemo extends Thread{
  2. int i;
  3. Thread previousThread; //上一个线程
  4. public JoinDemo(Thread previousThread,int i){
  5. this.previousThread=previousThread;
  6. this.i=i;
  7. }
  8. @Override
  9. public void run() {
  10. try {
  11. //调用上一个线程的join方法
  12. previousThread.join();
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. System.out.println("num:"+i);
  17. }
  18. public static void main(String[] args) {
  19. Thread previousThread=Thread.currentThread();
  20. for(int i=0;i<10;i++){
  21. JoinDemo joinDemo=new JoinDemo(previousThread,i);
  22. joinDemo.start();
  23. previousThread=joinDemo;
  24. }
  25. }
  26. }

注意 previousThread.join部分,在没有加join的时候运行的结果是不确定的。加了join以后,运行结果按照递增的顺序展示出来。

4.4 ThreadLocal的使用

除了控制资源的访问外,还可以通过增加资源来保证线程安全。ThreadLocal主要解决的方法是为每个线程绑定自己的值。

  1. public class Test01 {
  2. static ThreadLocal threadLocal = new ThreadLocal();
  3. static class SubThread extends Thread {
  4. @Override
  5. public void run() {
  6. int num = 5;
  7. for (int i = 0; i < num; i++){
  8. // 设置线程关联的值
  9. threadLocal.set(Thread.currentThread().getName() + "-" + i);
  10. // 调用get()方法读取关联的值
  11. System.out.println(Thread.currentThread().getName() + "value = " + threadLocal.get());
  12. }
  13. }
  14. }
  15. public static void main(String[] args) {
  16. SubThread t1 = new SubThread();
  17. SubThread t2 = new SubThread();
  18. t1.start();
  19. t2.start();
  20. /*
  21. 两个线程互不干扰
  22. Thread-0value = Thread-0-0
  23. Thread-1value = Thread-1-1
  24. Thread-1value = Thread-1-2
  25. Thread-1value = Thread-1-3
  26. Thread-1value = Thread-1-4
  27. Thread-0value = Thread-0-1
  28. Thread-0value = Thread-0-2
  29. Thread-0value = Thread-0-3
  30. Thread-0value = Thread-0-4
  31. */
  32. }
  33. }