4.1 等待/通知机制
4.1.1 概念
4.1.2 实现
Object类中的wait()方法可以使执行当前代码的线程等待,暂停执行,直到通知或者被中断为止。
public class Test01 {public static void main(String[] args) {try{String text = "something";System.out.println("同步前的代码");synchronized (text){System.out.println("同步代码块开始……");text.wait(); // 线程等待,后面的内容不输出System.out.println("wait后的代码……");}System.out.println("同步代码块后面的代码");} catch (InterruptedException e) {e.printStackTrace();}System.out.println("main最后的代码");}}
注意:1. wait()方法只能在同步代码块中由锁对象调用;
- 调用wait()方法,当前线程会释放锁;
Object类的notify()可以唤醒线程,该方法也必须在同步代码块中,由锁对象调用,没有使用锁对象调用wait()/notify()会抛出IllgalMonitorStateException异常。如果有多个等待的线程,notify()方法只能唤醒其中一个。在同步代码块中,调用notify()方法会并不会立即释放锁对象,需要等当前同步代码块执行完后会释放锁对象,一般将nofity()方法放在同步代码块的最后。
在使用notify()的时候,可能出现通过过早的情况,需要再引入一个变量来判断是否已经唤醒过,唤醒过就不再需要等待了。
public class Test02 {public static void main(String[] args) throws InterruptedException {/*线程1开始等待:1614737148963线程2开始唤醒:1614737151964线程2结束唤醒:1614737152964线程1结束等待:1614737152964*/String lock = "something";Thread t1 = new Thread(() -> {synchronized (lock) {System.out.println("线程1开始等待:" + System.currentTimeMillis());try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程1结束等待:" + System.currentTimeMillis());}});Thread t2 = new Thread(() -> {synchronized (lock) {System.out.println("线程2开始唤醒:" + System.currentTimeMillis());lock.notify();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("线程2结束唤醒:" + System.currentTimeMillis());}});t1.start();Thread.sleep(3000);t2.start();}}
4.1.3 interrupt()方法会中断wait()
当线程处于wait()等待状态时,调用线程对象的interrupt()方法会中断线程的等待状态,会产生InterruptedException异常。
public class Test03 {public static void main(String[] args) throws InterruptedException {SubThread subThread = new SubThread();subThread.start();Thread.sleep(2000);subThread.interrupt();}private static final Object LOCK = new Object();static class SubThread extends Thread {@Overridepublic void run() {synchronized (LOCK) {System.out.println("begin wait...");try {LOCK.wait();System.out.println("end wait...");} catch (InterruptedException e) {System.out.println("wait等待被中断");}}}}}
4.1.4 notify()与notifyAll()
notify()一次只能唤醒一个线程,如果有多个等待线程,只能随机唤醒其中某一个;想要唤醒所有线程,需要调用notifyAll()。
public class Test04 {public static void main(String[] args) {Object lock = new Object();SubThread t1 = new SubThread(lock);SubThread t2 = new SubThread(lock);SubThread t3 = new SubThread(lock);t1.setName("t1");t2.setName("t2");t3.setName("t3");t1.start();t2.start();t3.start();try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (lock){/* 只能唤醒一个线程,这种情况称为信号丢失t1 -- begin wait...t3 -- begin wait...t2 -- begin wait...t1 -- end wait...*/// lock.notify();/*t1 -- begin wait...t3 -- begin wait...t2 -- begin wait...t2 -- end wait...t3 -- end wait...t1 -- end wait...*/lock.notifyAll();}}static class SubThread extends Thread {private Object lock;public SubThread(Object lock){this.lock = lock;}@Overridepublic void run() {synchronized (lock){try{System.out.println(Thread.currentThread().getName() + " -- begin wait...");lock.wait();System.out.println(Thread.currentThread().getName() + " -- end wait...");} catch (InterruptedException e) {e.printStackTrace();}}}}}
4.1.5 wait(long)
4.1.6 wait等待条件发生变化
/*** wait条件发生变化* 定义一个集合* 定义一个线程向集合中添加数据,添加完数据会通知另外的线程从集合中取数据* 定义一个线程从集合中取数据,如果集合中没有数据就等待*/public class Test05 {public static void main(String[] args) {ThreadAdd threadAdd = new ThreadAdd();ThreadSubtract threadSubtract = new ThreadSubtract();threadSubtract.setName("subtract 1");/*测试一:先开启添加数据的线程,再开启一个取数据的线程,大多数情况下正常threadAdd.start();threadSubtract.start();测试二:先开启取数据的线程,再开启添加数据线程threadSubtract.start();threadAdd.start();测试三:先开启两个取数据的线程,再开启添加数据的线程ThreadSubtract threadSubtract2 = new ThreadSubtract();threadSubtract2.setName("subtract 2");threadSubtract.start();threadSubtract2.start();threadAdd.start();结果:同时唤醒后,先后取数据,一个取到,另一个取数据时再取时出现异常subtract 1 begin wait...subtract 2 begin wait...subtract 2 end wait...subtract 2从集合中取了data 后,集合中数据的数量:0subtract 1 end wait...解决方案:被唤醒后依然要再判断,可将if改为while*/}/*** 1) 定义List集合*/static List<String> list = new ArrayList<>();/*** 2) 定义方法从集合中取数据*/public static void subtract() {synchronized (list) {try {if (list.size() == 0) {System.out.println(Thread.currentThread().getName() + " begin wait...");list.wait();System.out.println(Thread.currentThread().getName() + " end wait...");}} catch (InterruptedException e) {e.printStackTrace();}Object data = list.remove(0);System.out.println(Thread.currentThread().getName()+ "从集合中取了" + data + " 后,集合中数据的数量:" + list.size());}}/*** 3) 定义方法向集合中添加数据后,通知等待的线程取数据*/public static void add(){synchronized (list){list.add("data");list.notifyAll();}}/*** 4) 定义线程类调用subtract取数据*/static class ThreadSubtract extends Thread {@Overridepublic void run() {subtract();}}/*** 5) 定义线程类调用add添加数据*/static class ThreadAdd extends Thread {@Overridepublic void run() {add();}}}
4.1.7 生产者消费者模式
/*** 模拟产品*/public class ValueOP {private String value = "";/*** 生产过程*/public void setValue() {synchronized (this){while (!"".equalsIgnoreCase(value)){try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}String value = System.currentTimeMillis() + " - " + System.nanoTime();System.out.println("set设置的值是:" + value);this.value = value;this.notifyAll();}}/*** 消费过程*/public void getValue(){synchronized (this){while ("".equalsIgnoreCase(value)){try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("get的值是:" + this.value);this.value = "";this.notifyAll();}}}/*** 定义线程模拟生产者*/public class ProducerThread extends Thread{/*** 生产者生产数据,调用valueOP类的setValue方法给value赋值*/private ValueOP obj;public ProducerThread(ValueOP obj){this.obj = obj;}@Overridepublic void run() {while (true){obj.setValue();}}}/*** 定义线程模拟消费者*/public class ConsumerThread extends Thread{/*** 消费者消费数据,调用valueOP类的getValue方法消费产品*/private ValueOP obj;public ConsumerThread(ValueOP obj){this.obj = obj;}@Overridepublic void run() {while (true){obj.getValue();}}}public class Test {public static void main(String[] args) {ValueOP valueOP = new ValueOP();/*测试一生产,一消费的情况ProducerThread producerThread = new ProducerThread(valueOP);ConsumerThread consumerThread = new ConsumerThread(valueOP);producerThread.start();consumerThread.start();测试多个生产者,多个消费者:把等待条件的if改为while,避免重新唤醒消费者后直接消费空串;但是出现了假死现象:消费者消费空串时,重新进入等待状态,生产者又未被唤醒进行生产,所以需要把notify()改为notifyAll()。ProducerThread producerThread1 = new ProducerThread(valueOP);ProducerThread producerThread2 = new ProducerThread(valueOP);ProducerThread producerThread3 = new ProducerThread(valueOP);ConsumerThread consumerThread1 = new ConsumerThread(valueOP);ConsumerThread consumerThread2 = new ConsumerThread(valueOP);ConsumerThread consumerThread3 = new ConsumerThread(valueOP);producerThread1.start();producerThread2.start();producerThread3.start();consumerThread1.start();consumerThread2.start();consumerThread3.start();*/}}
4.2 通过管道实现线程间的通信
在java.io包中的PipeStream管道流用于在线程之间传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读取数据,相关的类包括:PipedInputStream和PipedOutputStream,PipedReader和PipedWriter。
/*** 使用PipedInputStream和PipedOutputStream管道字节流在线程之间传递数据** @author 王游* @date 2021/3/3 19:45*/public class Test {public static void main(String[] args) {// 定义管道字节流PipedInputStream inputStream = new PipedInputStream();PipedOutputStream outputStream = new PipedOutputStream();// 建立连接try {inputStream.connect(outputStream);} catch (IOException e) {e.printStackTrace();}new Thread(() -> readData(inputStream)).start();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}new Thread(() -> writeData(outputStream)).start();}/*** 定义方法向管道流中写入数据*/public static void writeData(PipedOutputStream out) {int sum = 100;try {for (int i = 0; i < sum; i++) {String data = "" + i;out.write(data.getBytes());}out.close();} catch (IOException e) {e.printStackTrace();}}/*** 定义方法从管道流中读取数据*/public static void readData(PipedInputStream in) {byte[] bytes = new byte[1024];// 从管道中读取字节try {// This method blocks until input data is available, end of file is detected, or an exception is thrown.int len = in.read(bytes);while (len != -1) {System.out.println(new String(bytes, 0, len));len = in.read(bytes);}in.close();} catch (IOException e) {e.printStackTrace();}}}
4.3 thread.join
线程没有执行完之前,会一直阻塞在join方法处。
public class JoinDemo extends Thread{int i;Thread previousThread; //上一个线程public JoinDemo(Thread previousThread,int i){this.previousThread=previousThread;this.i=i;}@Overridepublic void run() {try {//调用上一个线程的join方法previousThread.join();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("num:"+i);}public static void main(String[] args) {Thread previousThread=Thread.currentThread();for(int i=0;i<10;i++){JoinDemo joinDemo=new JoinDemo(previousThread,i);joinDemo.start();previousThread=joinDemo;}}}
注意 previousThread.join部分,在没有加join的时候运行的结果是不确定的。加了join以后,运行结果按照递增的顺序展示出来。
4.4 ThreadLocal的使用
除了控制资源的访问外,还可以通过增加资源来保证线程安全。ThreadLocal主要解决的方法是为每个线程绑定自己的值。
public class Test01 {static ThreadLocal threadLocal = new ThreadLocal();static class SubThread extends Thread {@Overridepublic void run() {int num = 5;for (int i = 0; i < num; i++){// 设置线程关联的值threadLocal.set(Thread.currentThread().getName() + "-" + i);// 调用get()方法读取关联的值System.out.println(Thread.currentThread().getName() + "value = " + threadLocal.get());}}}public static void main(String[] args) {SubThread t1 = new SubThread();SubThread t2 = new SubThread();t1.start();t2.start();/*两个线程互不干扰Thread-0value = Thread-0-0Thread-1value = Thread-1-1Thread-1value = Thread-1-2Thread-1value = Thread-1-3Thread-1value = Thread-1-4Thread-0value = Thread-0-1Thread-0value = Thread-0-2Thread-0value = Thread-0-3Thread-0value = Thread-0-4*/}}
