序言:当使用线程来同时运行多个任务的时候,可以通过使用锁(互斥)来同步两个对象的行为。从而使得另一个对象不会干涉另一个任务的资源(线程安全)。下面要使线程之间相互协作,从而使多个任务可以一起工作去解决某个问题。
一、wait()和notifyAll();
注意事项:wait和notify()/notifyAll()只能在加锁的方法中去使用,不然会抛出IllegalMonitorStateException
1、Wait():
wait使当前线程在等待某个特定的条件前被挂起,同时释放锁,只有在notify或notidyAll发生的时候,当前线程才会被唤醒。有两种形式的wait:
①:接受毫秒作为参数,在规定时间后唤醒线程。
class Blocked{synchronized void waitinfo2(){try {System.out.println("线程被挂起");wait(2000);//2000毫秒后线程自动唤醒System.out.println("当前线程为:"+Thread.currentThread());}catch (InterruptedException e) {e.printStackTrace();}}class WaitTest implements Runnable{Blocked blocked=new Blocked();@Overridepublic void run() {blocked.waitinfo2();}}public class WaitDemo {public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(new WaitTest());}}
②:不接受任何参数,将无限等待下去,直到线程收到notify或者notifyAll()消息
class Blocked{synchronized void waitinfo(){try {System.out.println("线程被挂起");wait();System.out.println("当前线程为:"+Thread.currentThread());}catch (InterruptedException e) {e.printStackTrace();}synchronized void prod(){notify();}}class WaitTest implements Runnable{Blocked blocked=new Blocked();@Overridepublic void run() {blocked.waitinfo();}}public class WaitDemo {public static void main(String[] args) throws InterruptedException {WaitTest waitTest = new WaitTest();ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(waitTest);TimeUnit.SECONDS.sleep(2);new Thread(){@Overridepublic void run() {waitTest.blocked.prod();}}.start();}}
注意:对于wait来说,只有同一个对象锁的notify或者notifyAll才能将其唤醒
class Blocked{synchronized void waitinfo(){try {System.out.println("线程被挂起");wait();System.out.println("当前线程为:"+Thread.currentThread());}catch (InterruptedException e) {e.printStackTrace();}synchronized void prod(){notify();}}class WaitTest implements Runnable{Blocked blocked=new Blocked();@Overridepublic void run() {blocked.waitinfo();}}public class WaitDemo {public static void main(String[] args) throws InterruptedException {WaitTest waitTest = new WaitTest();WaitTest waitTest2 = new WaitTest();ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(waitTest);TimeUnit.SECONDS.sleep(2);new Thread(){@Overridepublic void run() {// waitTest2.blocked.prod();//此时不是同一个对象锁,则无法唤醒线程waitTest.blocked.prod();}}.start();}}
2、notify和notifyAll的区别:
notify在众多等待**同一个锁**的任务**只有一个**会被唤醒,所有在使用notify时要保证被唤醒的是恰当的任务。而notifyAll会将众多等**待同一个锁**的任务**一起唤醒。notify如果因某个特定的锁而被调用时,只有等待这个特定锁的任务才会被唤醒。**
class Blocker {synchronized void waitingCall() {try {while (!Thread.interrupted()) {wait();System.out.println("唤醒的线程为: " + Thread.currentThread() + " ");}} catch (InterruptedException e) {}}synchronized void prod() {notify();}synchronized void prodAll() {notifyAll();}}class Task implements Runnable {static Blocker blocker = new Blocker();public void run() {blocker.waitingCall();}}class Task2 implements Runnable {static Blocker blocker = new Blocker();public void run() {blocker.waitingCall();}}public class NotifyVsNotifyAll {public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();for (int i = 0; i < 5; i++)exec.execute(new Task());exec.execute(new Task2());Timer timer = new Timer();//开启一个定时任务timer.scheduleAtFixedRate(new TimerTask() {boolean prod = true;@Overridepublic void run() {if (prod) {System.out.println("唤醒一个线程 notify()");Task.blocker.prod();prod = false;} else {System.out.println("唤醒所以的线程 notifyAll()");Task.blocker.prodAll();prod = true;}}}, 400, 400);TimeUnit.SECONDS.sleep(2); // Run for a while...timer.cancel();//关闭计时器TimeUnit.SECONDS.sleep(2);System.out.println("=============");Task2.blocker.prodAll();//唤醒特定的notifyAll();exec.shutdownNow();}}
3.wait和notify的示例(生产者与消费者):
在下面的列子中生产者是厨师,消费者是服务员,两者相互工作互不干扰。
class Meal {//食物private final int orderNum;Meal(int orderNum) {this.orderNum = orderNum;}@Overridepublic String toString() {return "Meal " + orderNum;}}class WaitPerson implements Runnable {//消费者private Restaurnt restaurnt;WaitPerson(Restaurnt restaurnt) {this.restaurnt = restaurnt;}@Overridepublic void run() {try {while (!Thread.interrupted()) {synchronized (this) {while (restaurnt.meal == null)wait();//此时服务员线程的锁被释放}System.out.println("Waitperson got " + restaurnt.meal);synchronized (restaurnt.chef) {//先捕获对应的锁,解开对应chef的waitrestaurnt.meal = null;restaurnt.chef.notifyAll();}}} catch (InterruptedException e) {System.out.println(e);}}}class Chef implements Runnable {private Restaurnt restaurnt;private int count = 0;Chef(Restaurnt restaurnt) {this.restaurnt = restaurnt;}@Overridepublic void run() {try {while (!Thread.interrupted()) {synchronized (this) {while (restaurnt.meal != null)//如果有食物则进入挂起状态wait();}if (++count == 10) {System.out.println("以达到销售目标,停止售卖");restaurnt.service.shutdownNow();//关闭线程}System.out.println("Order up!");synchronized (restaurnt.waitPerson) {//先捕获对应的锁,解开对应waitPerson的waitrestaurnt.meal = new Meal(count);restaurnt.waitPerson.notifyAll();}TimeUnit.MILLISECONDS.sleep(100);}} catch (InterruptedException e) {System.out.println("Chef interrupted");}}}public class Restaurnt {Meal meal;Chef chef = new Chef(this);WaitPerson waitPerson = new WaitPerson(this);ExecutorService service = Executors.newCachedThreadPool();Restaurnt() {service.execute(chef);service.execute(waitPerson);}public static void main(String[] args) {new Restaurnt();}}
在上面的列子中,保证了使用了同一个对象锁,并且当需要调用notifyAll的时候要先获得对应的锁才可以唤醒对应的线程
二、condition的await、signal
可以通过Condition中的await()方法来挂起一个任务,同时通过signal()/signalAll()来唤醒被挂起的线程。
1.示例一:源码中的典型示例
//来自Condition源码中的示例public class BoundedBufferDemo {final Lock lock = new ReentrantLock();final Condition full = lock.newCondition();final Condition empty = lock.newCondition();final Object[] items = new Object[100];int putptr, takeptr, count;public void put(Object x) throws InterruptedException {lock.lock();try {while (count == items.length)full.await();items[putptr] = x;if (++putptr == items.length) putptr = 0;++count;empty.signal();} finally {lock.unlock();}}//只要有锁的操作(lock、synchronized都可以直接调signal或await或wait、notify)public void test(){lock.lock();empty.signal();lock.unlock();}public Object take() throws InterruptedException {lock.lock();try {while (count == 0) {empty.await();}Object x = items[takeptr];if (++takeptr == items.length) takeptr = 0;--count;full.signal();return x;} finally {lock.unlock();}}public static void main(String[] args) throws InterruptedException {BoundedBufferDemo demo = new BoundedBufferDemo();new BoundedBufferDemo().test();new Thread(()->{for (int i = 0; i < 100; i++) {try {demo.put(i+"");} catch (InterruptedException e) {e.printStackTrace();}}},"write").start();TimeUnit.SECONDS.sleep(10);new Thread(()->{while (true) {Object o = null;try {o = demo.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(o);}},"reader").start();}}
2.示例二:重写汽车涂料示例
class Car {private Lock lock = new ReentrantLock();private Condition condition = lock.newCondition();private boolean waxOn = false;public void waxed() {lock.lock();try {waxOn = true; // Ready to buffcondition.signalAll();} finally {lock.unlock();}}public void buffed() {lock.lock();try {waxOn = false; // Ready for another coat of waxcondition.signalAll();} finally {lock.unlock();}}public void waitForWaxing() throws InterruptedException {lock.lock();try {while(waxOn == false)condition.await();} finally {lock.unlock();}}public void waitForBuffing() throws InterruptedException{lock.lock();try {while(waxOn == true)condition.await();} finally {lock.unlock();}}}class WaxOn implements Runnable {private Car car;public WaxOn(Car c) { car = c; }public void run() {try {while(!Thread.interrupted()) {printnb("Wax On! ");TimeUnit.MILLISECONDS.sleep(200);car.waxed();car.waitForBuffing();}} catch(InterruptedException e) {print("Exiting via interrupt");}print("Ending Wax On task");}}class WaxOff implements Runnable {private Car car;public WaxOff(Car c) { car = c; }public void run() {try {while(!Thread.interrupted()) {car.waitForWaxing();printnb("Wax Off! ");TimeUnit.MILLISECONDS.sleep(200);car.buffed();}} catch(InterruptedException e) {print("Exiting via interrupt");}print("Ending Wax Off task");}}public class WaxOMatic2 {public static void main(String[] args) throws Exception {Car car = new Car();ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new WaxOff(car));exec.execute(new WaxOn(car));TimeUnit.SECONDS.sleep(5);exec.shutdownNow();}}
三、同步队列
我们可以使用同步对列来解决任务协作的问题,同步对列在任何时刻都值允许一个任务插入或移除元素。
1.LinkedBlockingQueue:无限容量,添加多少取出多少
class LiftOffRunner implements Runnable{//生产者private BlockingQueue<LiftOff> rockets;public LiftOffRunner(BlockingQueue<LiftOff> queue){rockets=queue;}public void add(LiftOff lo){//生产方法try {rockets.put(lo);System.out.println("添加时对列的大小:"+rockets.size());} catch (InterruptedException e) {System.out.println("Interrupted during put()");}}@Overridepublic void run() {//消费方法try {while (!Thread.interrupted()) {LiftOff liftOff = rockets.take();//消费者消费System.out.println("取出后对列的大小:"+rockets.size());liftOff.run();}} catch (Exception e) {System.out.println("Interrupted take()");}System.out.println("Exiting LiftOffRunner");}}public class TestBlockingQueue {static void test(String msg,BlockingQueue<LiftOff> queue) throws InterruptedException {System.out.println("对列名称:"+msg);LiftOffRunner runner = new LiftOffRunner(queue);//创建消费者Thread thread = new Thread(runner);for (int i = 0; i <5 ; i++) {runner.add(new LiftOff(5));//添加了五个LiftOff对象}thread.start();TimeUnit.SECONDS.sleep(5);thread.interrupt();System.out.println("Finshed "+msg+" test");}public static void main(String[] args) throws InterruptedException {test("LinkdedBlockQueue",new LinkedBlockingQueue<LiftOff>());}}
2.ArrayBlockingQueue,
可以设置固定的尺寸,当该对列为空时,消费者将被挂起,当生产者达到设定的最大目标的时候,如果没有消费,此时生成者将被挂起。
class LiftOffRunner implements Runnable{private BlockingQueue<LiftOff> rockets;public LiftOffRunner(BlockingQueue<LiftOff> queue){rockets=queue;}public void add(LiftOff lo){try {rockets.put(lo);System.out.println("添加时对列的大小:"+rockets.size());} catch (InterruptedException e) {System.out.println("Interrupted during put()");}}@Overridepublic void run() {try {while (!Thread.interrupted()) {LiftOff liftOff = rockets.take();//消费者消费System.out.println("取出后对列的大小:"+rockets.size());liftOff.run();}} catch (Exception e) {System.out.println("Interrupted take()");}System.out.println("Exiting LiftOffRunner");}}public class TestBlockingQueue {static void test(String msg,BlockingQueue<LiftOff> queue) throws InterruptedException {System.out.println("对列名称:"+msg);LiftOffRunner runner = new LiftOffRunner(queue);//创建消费者Thread thread = new Thread(runner);//将消费者放在生产前面,此时的生产者设定的目标为三,这个时候生产者到达生产目标之后,将会被消费之后在进行生产thread.start();for (int i = 0; i <5 ; i++) {runner.add(new LiftOff(5));//添加了五个LiftOff对象}// thread.start();//如果将消费者放在后面,此时的生产者设定的目标为三,这个时候生产者将被挂起TimeUnit.SECONDS.sleep(5);System.out.println("Finshed "+msg+" test");}public static void main(String[] args) throws InterruptedException {test("ArrayBlockingQueu",new ArrayBlockingQueue<LiftOff>(3));}}
3.SynchronousQueue:
由于该对列本身没用任何容量,所以该对列放入一个消费一个,如果没用去消费。则任何元素都无法添加到该对列中去
class LiftOffRunner implements Runnable{private BlockingQueue<LiftOff> rockets;public LiftOffRunner(BlockingQueue<LiftOff> queue){rockets=queue;}public void add(LiftOff lo){try {rockets.put(lo);System.out.println("添加时对列的大小:"+rockets.size());} catch (InterruptedException e) {System.out.println("Interrupted during put()");}}@Overridepublic void run() {try {while (!Thread.interrupted()) {LiftOff liftOff = rockets.take();//消费者消费System.out.println("取出后对列的大小:"+rockets.size());liftOff.run();}} catch (Exception e) {System.out.println("Interrupted take()");}System.out.println("Exiting LiftOffRunner");}}public class TestBlockingQueue {static void test(String msg,BlockingQueue<LiftOff> queue) throws InterruptedException {System.out.println("对列名称:"+msg);LiftOffRunner runner = new LiftOffRunner(queue);//创建消费者Thread thread = new Thread(runner);thread.start();for (int i = 0; i <5 ; i++) {runner.add(new LiftOff(5));//添加了五个LiftOff对象}TimeUnit.SECONDS.sleep(5);System.out.println("Finshed "+msg+" test");}public static void main(String[] args) throws InterruptedException {test("Synchronous",new SynchronousQueue<LiftOff>());}}
4. 同步队示例——烤土司
1.吐司类
public class Toast {public enum Status {DRY,BUTTERED,JAMMED}private Status status=Status.DRY;//默认烤吐司private final int id;public Toast(int i){id=i;}public void butter(){status=Status.BUTTERED;//涂黄油}public void jammed(){//涂果酱status=Status.JAMMED;}public Status getStatus(){return status;}public int getId() {return id;}@Overridepublic String toString() {return "Toast"+id+":"+status;}}class ToastQueue extends LinkedBlockingQueue<Toast>{}
2.拷吐司之后摸黄油然后涂果酱
class Toaster implements Runnable{private ToastQueue toasts;//该对列放置烤好的吐司private int count=0;public Toaster(ToastQueue queue){toasts=queue;}@Overridepublic void run() {try {while (!Thread.interrupted()) {Toast toast = new Toast(count++);TimeUnit.MILLISECONDS.sleep(200);System.out.println(toast);//把烤好的吐司放入对列中toasts.put(toast);}} catch (InterruptedException e) {System.out.println("Toaster interrupted");}System.out.println("Toaster off");}}//涂果酱class Butterer implements Runnable{private ToastQueue dryQueue,//放置烤好的吐司butteredQueue;//放置吐完黄油的吐司Butterer(ToastQueue dryQueue,ToastQueue butteredQueue){this.dryQueue=dryQueue;this.butteredQueue=butteredQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()){Toast take = dryQueue.take();//先获得烤好的吐司take.butter();//给吐司涂黄油System.out.println(take);butteredQueue.put(take);//放入对列}} catch (InterruptedException e) {System.out.println("Butter Interrupted");}System.out.println("Butter off");}}class Jammer implements Runnable{private ToastQueue butteredQueue,//放置好涂完黄油的吐司jammerQueue;//放置吐完果酱的吐司Jammer(ToastQueue butteredQueue,ToastQueue jammerQueue){this.butteredQueue=butteredQueue;this.jammerQueue=jammerQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()){Toast take = butteredQueue.take();//先获得涂完黄油的吐司take.jammed();System.out.println(take);jammerQueue.put(take);//涂好的果酱吐司放入对列}}catch (InterruptedException e){System.out.println("Jammer Interrupted");}System.out.println("Jammer off");}}
3.吃烤好的吐司
public class Eater implements Runnable{private ToastQueue jammerQueue;private int count=0;Eater(ToastQueue jammerQueue){this.jammerQueue=jammerQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()) {Toast take = jammerQueue.take();if (take.getId()!=count++||take.getStatus()!=Toast.Status.JAMMED){System.out.println("吐司还没玩成"+take);}else{System.out.println("吃!!! "+take);System.out.println();}}} catch (InterruptedException e) {System.out.println("Eater InterruptedException");}}}public class ToastMatic {public static void main(String[] args) throws InterruptedException {ToastQueue dryQueue = new ToastQueue(),butteredQueue = new ToastQueue(),jammerQueue = new ToastQueue();ExecutorService service = Executors.newCachedThreadPool();service.execute(new Toaster(dryQueue));//先烤吐司service.execute(new Butterer(dryQueue,butteredQueue));service.execute(new Jammer(butteredQueue,jammerQueue));service.execute(new Eater(jammerQueue));TimeUnit.SECONDS.sleep(5);service.shutdownNow();}}
扩展:因为该实例使用的是linked同步对列,具有无限容量,所以当休眠取消后就会有许多土司放入该队列。所以更应该去使用SynchronousQueue。
四、任务间使用管道进行输入输出
通过输入输出流在线程中进行通信是很有用的,线程中提供“管道”的形式进行支持。两者通过管道进行传输,管道基本上是一个阻塞队列。**PiedWtiter**(允许任务线程管道写)、**PipedReader**(允许不同任务从同一个管道中读取)<br />
class Sender implements Runnable{private PipedWriter pipedWriter=new PipedWriter();public PipedWriter getPipedWriter() {return pipedWriter;}@Overridepublic void run() {try {while (true)for (char c = 'A'; c <'c' ; c++) {pipedWriter.write(c);//不断进行写入TimeUnit.MILLISECONDS.sleep(100);}} catch (IOException | InterruptedException e) {System.out.println("输入流被打断");}}}class Receiver implements Runnable{private PipedReader in;int i=0;Receiver(Sender sender) throws IOException {in=new PipedReader(sender.getPipedWriter());}@Overridepublic void run() {try{while (true){i++;int read = in.read();System.out.print((char)read);if (i%5==0)System.out.println();}}catch (Exception e){System.out.println("输出流被打断");}}}public class PipedIo {public static void main(String[] args) throws IOException, InterruptedException {Sender sender = new Sender();Receiver receiver = new Receiver(sender);ExecutorService service = Executors.newCachedThreadPool();service.execute(sender);service.execute(receiver);TimeUnit.SECONDS.sleep(4);service.shutdownNow();}}
