wait与notifyAll
只能在同步方法或同步代码块中调用wait()、notify()、notifyAll()。
1,wait阻塞代码并释放锁,让其他同步方法执行。notifyAll并不会立马释放锁,只能待方法执行完毕后才释放。
package com.thinking.in.java.course.chapter21.waxomatic;
import java.util.concurrent.TimeUnit;
class CarDemo {
private boolean waxOn = false;
//涂蜡
public synchronized void wax() throws InterruptedException {
if(waxOn) {
System.out.println("开始打蜡2");
TimeUnit.SECONDS.sleep(3);//模拟打蜡的时间
System.out.println("打蜡完成3");
notifyAll();
TimeUnit.SECONDS.sleep(5);//验证notifyAll执行之后 wait是否是立马执行的
waxOn = !waxOn;
}
}
//抛光
public synchronized void buff() throws InterruptedException{
if(!waxOn) {
System.out.println("没打蜡哦,先去打蜡1");
waxOn=!waxOn;
wait();
System.out.println("打蜡完成,进行抛光4");
TimeUnit.SECONDS.sleep(2);//模拟抛光的时间
}
}
//有可能wait释放锁后,由该方法获得锁
public synchronized void test() throws InterruptedException {
while (true) {
TimeUnit.SECONDS.sleep(1);
System.out.println("验证wait或notifyall 释放锁之后,对应方法是否会立即执行");
}
}
}
public class WaxOMaticDemo {
public static void main(String[] args) throws InterruptedException {
final CarDemo carDemo = new CarDemo();
new Thread(()->{
while (true) {
try {
carDemo.buff();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
TimeUnit.MILLISECONDS.sleep(100);
new Thread(()->{
while (true) {
try {
carDemo.wax();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
try {
carDemo.test();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
2,wait、notifyAll一定得是同一实例(对象)才能起作用
示例一
package com.thinking.in.java.course.chapter21.waxomatic;
import java.util.concurrent.TimeUnit;
class CarDemo {
private static boolean waxOn = false;
//涂蜡
public synchronized void wax() throws InterruptedException {
if(waxOn) {
System.out.println("开始打蜡2");
TimeUnit.SECONDS.sleep(3);//模拟打蜡的时间
System.out.println("打蜡完成3");
notifyAll();
TimeUnit.SECONDS.sleep(5);//验证notifyAll执行之后 wait是否是立马执行的
waxOn = !waxOn;
}
}
//抛光
public synchronized void buff() throws InterruptedException{
if(!waxOn) {
System.out.println("没打蜡哦,先去打蜡1");
waxOn=!waxOn;
wait();
System.out.println("打蜡完成,进行抛光4");
TimeUnit.SECONDS.sleep(2);//模拟抛光的时间
}
}
//有可能wait释放锁后,由该方法获得锁
public synchronized void test() throws InterruptedException {
while (true) {
TimeUnit.SECONDS.sleep(1);
System.out.println("验证wait或notifyall 释放锁之后,对应方法是否会立即执行");
}
}
}
public class WaxOMaticDemo {
public static void main(String[] args) throws InterruptedException {
final CarDemo carDemo = new CarDemo();
final CarDemo carDemo2 = new CarDemo();
new Thread(()->{
while (true) {
try {
carDemo.buff();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
TimeUnit.MILLISECONDS.sleep(100);
new Thread(()->{
while (true) {
try {
carDemo2.wax();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
//-----------采用lock进行重写
package com.thinking.in.java.course.chapter21.waxomatic2;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class CarDemo2 {
private boolean waxOn = false;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
//涂蜡
public void wax() throws InterruptedException {
try {
lock.lock();
if (waxOn) {
System.out.println("开始打蜡2");
TimeUnit.SECONDS.sleep(3);//模拟打蜡的时间
System.out.println("打蜡完成3");
condition.signalAll();
TimeUnit.SECONDS.sleep(5);//验证notifyAll执行之后 wait是否是立马执行的
waxOn = !waxOn;
}
}finally {
lock.unlock();
}
}
//抛光
public void buff() throws InterruptedException{
try {
lock.lock();
if (!waxOn) {
System.out.println("没打蜡哦,先去打蜡1");
waxOn = !waxOn;
condition.await();
System.out.println("打蜡完成,进行抛光4");
TimeUnit.SECONDS.sleep(2);//模拟抛光的时间
}
}finally {
lock.unlock();
}
}
}
public class WaxOMaticDemo2 {
public static void main(String[] args) throws InterruptedException {
final CarDemo2 carDemo2 = new CarDemo2();
new Thread(()->{
while (true) {
try {
carDemo2.buff();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
TimeUnit.MILLISECONDS.sleep(100);
new Thread(()->{
while (true) {
try {
carDemo2.wax();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
TimeUnit.SECONDS.sleep(20);
System.exit(0);
}
}
示例二
任务包含对象的写法
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
import java.util.*;
class Blocker {
synchronized void waitingCall(String str) {
try {
while (!Thread.interrupted()) {
System.out.println(this + "即将wait");
wait();
System.out.println(str+" "+Thread.currentThread() + " 唤醒了"+this);
}
} catch (InterruptedException e) {
// OK to exit this way
}
}
synchronized void prod() {
System.out.println(Thread.currentThread()+" notify");
notify();
}
synchronized void prodAll() {
notifyAll();
}
}
class Task implements Runnable {
Blocker blocker = new Blocker();
public void run() {
blocker.waitingCall("Task1->");
}
}
class Task2 implements Runnable {
// A separate Blocker object:
static Blocker blocker = new Blocker();
public void run() {
blocker.waitingCall("Task2->");
}
}
public class NotifyVsNotifyAll {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
Task task1 = new Task();
Task task2 = new Task();
Task task3 = new Task();
Task task4 = new Task();
Task task5 = new Task();
exec.execute(task1);
exec.execute(task2);
exec.execute(task3);
exec.execute(task4);
exec.execute(task5);
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true;
public void run() {
task1.blocker.prod();
}
}, 400, 400); // Run every .4 second
TimeUnit.SECONDS.sleep(5); // Run for a while...
timer.cancel();
System.out.println("\nTimer canceled");
exec.shutdownNow();
}
}
示例三:notifyAll唤醒的是所有线程在某个对象上的监听
任务包含对象的写法
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
import java.util.*;
class Blocker {
synchronized void waitingCall(String str) {
try {
while (!Thread.interrupted()) {
//System.out.println(this + "即将wait");
wait();
System.out.println(str+" "+Thread.currentThread() + " 唤醒了");
}
} catch (InterruptedException e) {
// OK to exit this way
}
}
synchronized void prod() {
System.out.println(Thread.currentThread()+" notify");
notify();
}
synchronized void prodAll() {
notifyAll();
}
}
class Task implements Runnable {
Blocker blocker = new Blocker();
public void run() {
blocker.waitingCall("Task1->");
}
}
class Task2 implements Runnable {
// A separate Blocker object:
static Blocker blocker = new Blocker();
public void run() {
blocker.waitingCall("Task2->");
}
}
public class NotifyVsNotifyAll {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
Task2 task1 = new Task2();
Task2 task2 = new Task2();
Task2 task3 = new Task2();
Task2 task4 = new Task2();
Task2 task5 = new Task2();
exec.execute(task1);
exec.execute(task2);
exec.execute(task3);
exec.execute(task4);
exec.execute(task5);
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true;
public void run() {
Task2.blocker.prodAll();
}
}, 400, 400); // Run every .4 second
TimeUnit.SECONDS.sleep(5); // Run for a while...
timer.cancel();
System.out.println("\nTimer canceled");
exec.shutdownNow();
}
}
3,何时才能使用object的wait、nofify;condition的await、signal
注意:每个对lock()的调用都必须紧跟一个try finally的子句,以保证在任何情况下都能释放锁
package com.thinking.in.java.course.chapter21.waxomatic2;
import java.nio.Buffer;
import java.sql.Time;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//来自Condition源码中的示例
public class BoundedBufferDemo {
final Lock lock = new ReentrantLock();
final Condition full = lock.newCondition();
final Condition empty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
full.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
empty.signal();
} finally {
lock.unlock();
}
}
//只要有锁的操作(lock、synchronized都可以直接调signal或await或wait、notify)
public void test(){
lock.lock();
empty.signal();
lock.unlock();
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
empty.await();
}
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
full.signal();
return x;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
BoundedBufferDemo demo = new BoundedBufferDemo();
new BoundedBufferDemo().test();
new Thread(()->{
for (int i = 0; i < 100; i++) {
try {
demo.put(i+"");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"write").start();
TimeUnit.SECONDS.sleep(10);
new Thread(()->{
while (true) {
Object o = null;
try {
o = demo.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(o);
}
},"reader").start();
}
}
生产者消费者队列
容量为null的时候,消费者挂起。容量满的时候,生产者挂起。待消费一个任务的时候,再去添加任务。
LinkedBlockingDeque:可以存放Integer.MAX_VALUE个任务 ArrayBlockingQueue:存放指定容量大小的任务,在无消费者的时候直至阻塞 SynchronousQueue:不见兔子不撒鹰,没有消费者,将直接挂起生产者,内部没有容量
流程控制示例 结合Enum
package com.thinking.in.java.course.chapter21;
import java.util.concurrent.*;
import java.util.*;
class Toast {
public enum Status {DRY, BUTTERED, JAMMED}
private Status status = Status.DRY;
private final int id;
public Toast(int idn) {
id = idn;
}
public void butter() {
status = Status.BUTTERED;
}
public void jam() {
status = Status.JAMMED;
}
public Status getStatus() {
return status;
}
public int getId() {
return id;
}
public String toString() {
return "Toast " + id + ": " + status;
}
}
class ToastQueue extends SynchronousQueue<Toast> {
}
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) {
toastQueue = tq;
}
public void run() {
try {
while (!Thread.interrupted()) {
//TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
// Make toast
Toast t = new Toast(count++);
System.out.println(t);
// Insert into queue
toastQueue.put(t);
//控制生产面包的数量,并保证生产过程能完整结束
if(count >= 1000) {
System.exit(0);
}
}
} catch (InterruptedException e) {
System.out.println("Toaster interrupted");
}
System.out.println("Toaster off");
}
}
// Apply butter to toast:
class Butterer implements Runnable {
private ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
public void run() {
try {
while (!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butteredQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Butterer interrupted");
}
System.out.println("Butterer off");
}
}
// Apply jam to buttered toast:
class Jammer implements Runnable {
private ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while (!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finishedQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Jammer interrupted");
}
System.out.println("Jammer off");
}
}
// Consume the toast:
class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
public void run() {
try {
while (!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = finishedQueue.take();
// Verify that the toast is coming in order,
// and that all pieces are getting jammed:
if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>>> Error: " + t);
System.exit(1);
} else
System.out.println("Chomp! " + t);
}
} catch (Exception e) {
System.out.println("Eater interrupted");
}
System.out.println("Eater off");
}
}
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}