4.1 等待/通知机制
4.1.1 概念
4.1.2 实现
Object类中的wait()方法可以使执行当前代码的线程等待,暂停执行,直到通知或者被中断为止。
public class Test01 {
public static void main(String[] args) {
try{
String text = "something";
System.out.println("同步前的代码");
synchronized (text){
System.out.println("同步代码块开始……");
text.wait(); // 线程等待,后面的内容不输出
System.out.println("wait后的代码……");
}
System.out.println("同步代码块后面的代码");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("main最后的代码");
}
}
注意:1. wait()方法只能在同步代码块中由锁对象调用;
- 调用wait()方法,当前线程会释放锁;
Object类的notify()可以唤醒线程,该方法也必须在同步代码块中,由锁对象调用,没有使用锁对象调用wait()/notify()会抛出IllgalMonitorStateException异常。如果有多个等待的线程,notify()方法只能唤醒其中一个。在同步代码块中,调用notify()方法会并不会立即释放锁对象,需要等当前同步代码块执行完后会释放锁对象,一般将nofity()方法放在同步代码块的最后。
在使用notify()的时候,可能出现通过过早的情况,需要再引入一个变量来判断是否已经唤醒过,唤醒过就不再需要等待了。
public class Test02 {
public static void main(String[] args) throws InterruptedException {
/*
线程1开始等待:1614737148963
线程2开始唤醒:1614737151964
线程2结束唤醒:1614737152964
线程1结束等待:1614737152964
*/
String lock = "something";
Thread t1 = new Thread(() -> {
synchronized (lock) {
System.out.println("线程1开始等待:" + System.currentTimeMillis());
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程1结束等待:" + System.currentTimeMillis());
}
});
Thread t2 = new Thread(() -> {
synchronized (lock) {
System.out.println("线程2开始唤醒:" + System.currentTimeMillis());
lock.notify();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程2结束唤醒:" + System.currentTimeMillis());
}
});
t1.start();
Thread.sleep(3000);
t2.start();
}
}
4.1.3 interrupt()方法会中断wait()
当线程处于wait()等待状态时,调用线程对象的interrupt()方法会中断线程的等待状态,会产生InterruptedException异常。
public class Test03 {
public static void main(String[] args) throws InterruptedException {
SubThread subThread = new SubThread();
subThread.start();
Thread.sleep(2000);
subThread.interrupt();
}
private static final Object LOCK = new Object();
static class SubThread extends Thread {
@Override
public void run() {
synchronized (LOCK) {
System.out.println("begin wait...");
try {
LOCK.wait();
System.out.println("end wait...");
} catch (InterruptedException e) {
System.out.println("wait等待被中断");
}
}
}
}
}
4.1.4 notify()与notifyAll()
notify()一次只能唤醒一个线程,如果有多个等待线程,只能随机唤醒其中某一个;想要唤醒所有线程,需要调用notifyAll()。
public class Test04 {
public static void main(String[] args) {
Object lock = new Object();
SubThread t1 = new SubThread(lock);
SubThread t2 = new SubThread(lock);
SubThread t3 = new SubThread(lock);
t1.setName("t1");
t2.setName("t2");
t3.setName("t3");
t1.start();
t2.start();
t3.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock){
/* 只能唤醒一个线程,这种情况称为信号丢失
t1 -- begin wait...
t3 -- begin wait...
t2 -- begin wait...
t1 -- end wait...
*/
// lock.notify();
/*
t1 -- begin wait...
t3 -- begin wait...
t2 -- begin wait...
t2 -- end wait...
t3 -- end wait...
t1 -- end wait...
*/
lock.notifyAll();
}
}
static class SubThread extends Thread {
private Object lock;
public SubThread(Object lock){
this.lock = lock;
}
@Override
public void run() {
synchronized (lock){
try{
System.out.println(Thread.currentThread().getName() + " -- begin wait...");
lock.wait();
System.out.println(Thread.currentThread().getName() + " -- end wait...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
4.1.5 wait(long)
4.1.6 wait等待条件发生变化
/**
* wait条件发生变化
* 定义一个集合
* 定义一个线程向集合中添加数据,添加完数据会通知另外的线程从集合中取数据
* 定义一个线程从集合中取数据,如果集合中没有数据就等待
*/
public class Test05 {
public static void main(String[] args) {
ThreadAdd threadAdd = new ThreadAdd();
ThreadSubtract threadSubtract = new ThreadSubtract();
threadSubtract.setName("subtract 1");
/*
测试一:先开启添加数据的线程,再开启一个取数据的线程,大多数情况下正常
threadAdd.start();
threadSubtract.start();
测试二:先开启取数据的线程,再开启添加数据线程
threadSubtract.start();
threadAdd.start();
测试三:先开启两个取数据的线程,再开启添加数据的线程
ThreadSubtract threadSubtract2 = new ThreadSubtract();
threadSubtract2.setName("subtract 2");
threadSubtract.start();
threadSubtract2.start();
threadAdd.start();
结果:同时唤醒后,先后取数据,一个取到,另一个取数据时再取时出现异常
subtract 1 begin wait...
subtract 2 begin wait...
subtract 2 end wait...
subtract 2从集合中取了data 后,集合中数据的数量:0
subtract 1 end wait...
解决方案:被唤醒后依然要再判断,可将if改为while
*/
}
/**
* 1) 定义List集合
*/
static List<String> list = new ArrayList<>();
/**
* 2) 定义方法从集合中取数据
*/
public static void subtract() {
synchronized (list) {
try {
if (list.size() == 0) {
System.out.println(Thread.currentThread().getName() + " begin wait...");
list.wait();
System.out.println(Thread.currentThread().getName() + " end wait...");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Object data = list.remove(0);
System.out.println(Thread.currentThread().getName()
+ "从集合中取了" + data + " 后,集合中数据的数量:" + list.size());
}
}
/**
* 3) 定义方法向集合中添加数据后,通知等待的线程取数据
*/
public static void add(){
synchronized (list){
list.add("data");
list.notifyAll();
}
}
/**
* 4) 定义线程类调用subtract取数据
*/
static class ThreadSubtract extends Thread {
@Override
public void run() {
subtract();
}
}
/**
* 5) 定义线程类调用add添加数据
*/
static class ThreadAdd extends Thread {
@Override
public void run() {
add();
}
}
}
4.1.7 生产者消费者模式
/**
* 模拟产品
*/
public class ValueOP {
private String value = "";
/**
* 生产过程
*/
public void setValue() {
synchronized (this){
while (!"".equalsIgnoreCase(value)){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String value = System.currentTimeMillis() + " - " + System.nanoTime();
System.out.println("set设置的值是:" + value);
this.value = value;
this.notifyAll();
}
}
/**
* 消费过程
*/
public void getValue(){
synchronized (this){
while ("".equalsIgnoreCase(value)){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("get的值是:" + this.value);
this.value = "";
this.notifyAll();
}
}
}
/**
* 定义线程模拟生产者
*/
public class ProducerThread extends Thread{
/**
* 生产者生产数据,调用valueOP类的setValue方法给value赋值
*/
private ValueOP obj;
public ProducerThread(ValueOP obj){
this.obj = obj;
}
@Override
public void run() {
while (true){
obj.setValue();
}
}
}
/**
* 定义线程模拟消费者
*/
public class ConsumerThread extends Thread{
/**
* 消费者消费数据,调用valueOP类的getValue方法消费产品
*/
private ValueOP obj;
public ConsumerThread(ValueOP obj){
this.obj = obj;
}
@Override
public void run() {
while (true){
obj.getValue();
}
}
}
public class Test {
public static void main(String[] args) {
ValueOP valueOP = new ValueOP();
/*
测试一生产,一消费的情况
ProducerThread producerThread = new ProducerThread(valueOP);
ConsumerThread consumerThread = new ConsumerThread(valueOP);
producerThread.start();
consumerThread.start();
测试多个生产者,多个消费者:
把等待条件的if改为while,避免重新唤醒消费者后直接消费空串;
但是出现了假死现象:消费者消费空串时,重新进入等待状态,生产者又未被唤醒进行生产,所以需要把notify()改为notifyAll()。
ProducerThread producerThread1 = new ProducerThread(valueOP);
ProducerThread producerThread2 = new ProducerThread(valueOP);
ProducerThread producerThread3 = new ProducerThread(valueOP);
ConsumerThread consumerThread1 = new ConsumerThread(valueOP);
ConsumerThread consumerThread2 = new ConsumerThread(valueOP);
ConsumerThread consumerThread3 = new ConsumerThread(valueOP);
producerThread1.start();
producerThread2.start();
producerThread3.start();
consumerThread1.start();
consumerThread2.start();
consumerThread3.start();
*/
}
}
4.2 通过管道实现线程间的通信
在java.io包中的PipeStream管道流用于在线程之间传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读取数据,相关的类包括:PipedInputStream和PipedOutputStream,PipedReader和PipedWriter。
/**
* 使用PipedInputStream和PipedOutputStream管道字节流在线程之间传递数据
*
* @author 王游
* @date 2021/3/3 19:45
*/
public class Test {
public static void main(String[] args) {
// 定义管道字节流
PipedInputStream inputStream = new PipedInputStream();
PipedOutputStream outputStream = new PipedOutputStream();
// 建立连接
try {
inputStream.connect(outputStream);
} catch (IOException e) {
e.printStackTrace();
}
new Thread(() -> readData(inputStream)).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> writeData(outputStream)).start();
}
/**
* 定义方法向管道流中写入数据
*/
public static void writeData(PipedOutputStream out) {
int sum = 100;
try {
for (int i = 0; i < sum; i++) {
String data = "" + i;
out.write(data.getBytes());
}
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 定义方法从管道流中读取数据
*/
public static void readData(PipedInputStream in) {
byte[] bytes = new byte[1024];
// 从管道中读取字节
try {
// This method blocks until input data is available, end of file is detected, or an exception is thrown.
int len = in.read(bytes);
while (len != -1) {
System.out.println(new String(bytes, 0, len));
len = in.read(bytes);
}
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.3 thread.join
线程没有执行完之前,会一直阻塞在join方法处。
public class JoinDemo extends Thread{
int i;
Thread previousThread; //上一个线程
public JoinDemo(Thread previousThread,int i){
this.previousThread=previousThread;
this.i=i;
}
@Override
public void run() {
try {
//调用上一个线程的join方法
previousThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("num:"+i);
}
public static void main(String[] args) {
Thread previousThread=Thread.currentThread();
for(int i=0;i<10;i++){
JoinDemo joinDemo=new JoinDemo(previousThread,i);
joinDemo.start();
previousThread=joinDemo;
}
}
}
注意 previousThread.join部分,在没有加join的时候运行的结果是不确定的。加了join以后,运行结果按照递增的顺序展示出来。
4.4 ThreadLocal的使用
除了控制资源的访问外,还可以通过增加资源来保证线程安全。ThreadLocal主要解决的方法是为每个线程绑定自己的值。
public class Test01 {
static ThreadLocal threadLocal = new ThreadLocal();
static class SubThread extends Thread {
@Override
public void run() {
int num = 5;
for (int i = 0; i < num; i++){
// 设置线程关联的值
threadLocal.set(Thread.currentThread().getName() + "-" + i);
// 调用get()方法读取关联的值
System.out.println(Thread.currentThread().getName() + "value = " + threadLocal.get());
}
}
}
public static void main(String[] args) {
SubThread t1 = new SubThread();
SubThread t2 = new SubThread();
t1.start();
t2.start();
/*
两个线程互不干扰
Thread-0value = Thread-0-0
Thread-1value = Thread-1-1
Thread-1value = Thread-1-2
Thread-1value = Thread-1-3
Thread-1value = Thread-1-4
Thread-0value = Thread-0-1
Thread-0value = Thread-0-2
Thread-0value = Thread-0-3
Thread-0value = Thread-0-4
*/
}
}