wait与notifyAll

  1. 只能在同步方法或同步代码块中调用wait()、notify()、notifyAll()。

1,wait阻塞代码并释放锁,让其他同步方法执行。notifyAll并不会立马释放锁,只能待方法执行完毕后才释放。

package com.thinking.in.java.course.chapter21.waxomatic;

import java.util.concurrent.TimeUnit;

class CarDemo {
    private  boolean waxOn = false;
    //涂蜡
    public synchronized void wax() throws InterruptedException {
        if(waxOn) {
            System.out.println("开始打蜡2");
            TimeUnit.SECONDS.sleep(3);//模拟打蜡的时间
            System.out.println("打蜡完成3");
            notifyAll();
            TimeUnit.SECONDS.sleep(5);//验证notifyAll执行之后 wait是否是立马执行的
            waxOn = !waxOn;
        }
    }

    //抛光
    public synchronized void buff() throws InterruptedException{
        if(!waxOn) {
            System.out.println("没打蜡哦,先去打蜡1");
            waxOn=!waxOn;
            wait();
            System.out.println("打蜡完成,进行抛光4");
            TimeUnit.SECONDS.sleep(2);//模拟抛光的时间
        }
    }

    //有可能wait释放锁后,由该方法获得锁
    public synchronized void test() throws InterruptedException {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            System.out.println("验证wait或notifyall 释放锁之后,对应方法是否会立即执行");
        }
    }
}
public class WaxOMaticDemo {
    public static void main(String[] args) throws InterruptedException {
        final CarDemo carDemo = new CarDemo();
        new Thread(()->{
            while (true) {
                try {
                    carDemo.buff();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        TimeUnit.MILLISECONDS.sleep(100);

        new Thread(()->{
            while (true) {
                try {
                    carDemo.wax();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(()->{
            try {
                carDemo.test();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

2,wait、notifyAll一定得是同一实例(对象)才能起作用

示例一

package com.thinking.in.java.course.chapter21.waxomatic;

import java.util.concurrent.TimeUnit;

class CarDemo {
    private static boolean waxOn = false;
    //涂蜡
    public synchronized void wax() throws InterruptedException {
        if(waxOn) {
            System.out.println("开始打蜡2");
            TimeUnit.SECONDS.sleep(3);//模拟打蜡的时间
            System.out.println("打蜡完成3");
            notifyAll();
            TimeUnit.SECONDS.sleep(5);//验证notifyAll执行之后 wait是否是立马执行的
            waxOn = !waxOn;
        }
    }

    //抛光
    public synchronized void buff() throws InterruptedException{
        if(!waxOn) {
            System.out.println("没打蜡哦,先去打蜡1");
            waxOn=!waxOn;
            wait();
            System.out.println("打蜡完成,进行抛光4");
            TimeUnit.SECONDS.sleep(2);//模拟抛光的时间
        }
    }

    //有可能wait释放锁后,由该方法获得锁
    public synchronized void test() throws InterruptedException {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            System.out.println("验证wait或notifyall 释放锁之后,对应方法是否会立即执行");
        }
    }
}
public class WaxOMaticDemo {
    public static void main(String[] args) throws InterruptedException {
        final CarDemo carDemo = new CarDemo();
        final CarDemo carDemo2 = new CarDemo();
        new Thread(()->{
            while (true) {
                try {
                    carDemo.buff();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        TimeUnit.MILLISECONDS.sleep(100);

        new Thread(()->{
            while (true) {
                try {
                    carDemo2.wax();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }
}


//-----------采用lock进行重写
package com.thinking.in.java.course.chapter21.waxomatic2;


import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class CarDemo2 {
    private  boolean waxOn = false;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    //涂蜡
    public  void wax() throws InterruptedException {
        try {
            lock.lock();
            if (waxOn) {
                System.out.println("开始打蜡2");
                TimeUnit.SECONDS.sleep(3);//模拟打蜡的时间
                System.out.println("打蜡完成3");
                condition.signalAll();
                TimeUnit.SECONDS.sleep(5);//验证notifyAll执行之后 wait是否是立马执行的
                waxOn = !waxOn;
            }
        }finally {
            lock.unlock();
        }
    }

    //抛光
    public  void buff() throws InterruptedException{
        try {
            lock.lock();
            if (!waxOn) {
                System.out.println("没打蜡哦,先去打蜡1");
                waxOn = !waxOn;
                condition.await();
                System.out.println("打蜡完成,进行抛光4");
                TimeUnit.SECONDS.sleep(2);//模拟抛光的时间
            }
        }finally {
            lock.unlock();
        }
    }
}

public class WaxOMaticDemo2 {
    public static void main(String[] args) throws InterruptedException {
        final CarDemo2 carDemo2 = new CarDemo2();
        new Thread(()->{
            while (true) {
                try {
                    carDemo2.buff();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        TimeUnit.MILLISECONDS.sleep(100);
        new Thread(()->{
            while (true) {
                try {
                    carDemo2.wax();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        TimeUnit.SECONDS.sleep(20);
        System.exit(0);

    }
}

示例二
任务包含对象的写法

package com.thinking.in.java.course.chapter21;

import java.util.concurrent.*;
import java.util.*;

class Blocker {
    synchronized void waitingCall(String str) {
        try {
            while (!Thread.interrupted()) {
                System.out.println(this + "即将wait");
                wait();
                System.out.println(str+" "+Thread.currentThread() + " 唤醒了"+this);
            }
        } catch (InterruptedException e) {
            // OK to exit this way
        }
    }

    synchronized void prod() {
        System.out.println(Thread.currentThread()+"  notify");
        notify();
    }

    synchronized void prodAll() {
        notifyAll();
    }
}

class Task implements Runnable {
     Blocker blocker = new Blocker();

    public void run() {
        blocker.waitingCall("Task1->");
    }
}

class Task2 implements Runnable {
    // A separate Blocker object:
    static Blocker blocker = new Blocker();

    public void run() {
        blocker.waitingCall("Task2->");
    }
}

public class NotifyVsNotifyAll {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        Task task1 = new Task();
        Task task2 = new Task();
        Task task3 = new Task();
        Task task4 = new Task();
        Task task5 = new Task();

        exec.execute(task1);
        exec.execute(task2);
        exec.execute(task3);
        exec.execute(task4);
        exec.execute(task5);


        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            boolean prod = true;

            public void run() {
                task1.blocker.prod();
            }
        }, 400, 400); // Run every .4 second
      TimeUnit.SECONDS.sleep(5); // Run for a while...
        timer.cancel();
        System.out.println("\nTimer canceled");

       exec.shutdownNow();
    }
}

示例三:notifyAll唤醒的是所有线程在某个对象上的监听
任务包含对象的写法

package com.thinking.in.java.course.chapter21;

import java.util.concurrent.*;
import java.util.*;

class Blocker {
    synchronized void waitingCall(String str) {
        try {
            while (!Thread.interrupted()) {
                //System.out.println(this + "即将wait");
                wait();
                System.out.println(str+" "+Thread.currentThread() + " 唤醒了");
            }
        } catch (InterruptedException e) {
            // OK to exit this way
        }
    }

    synchronized void prod() {
        System.out.println(Thread.currentThread()+"  notify");
        notify();
    }

    synchronized void prodAll() {
        notifyAll();
    }
}

class Task implements Runnable {
     Blocker blocker = new Blocker();

    public void run() {
        blocker.waitingCall("Task1->");
    }
}

class Task2 implements Runnable {
    // A separate Blocker object:
    static Blocker blocker = new Blocker();

    public void run() {
        blocker.waitingCall("Task2->");
    }
}

public class NotifyVsNotifyAll {
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        Task2 task1 = new Task2();
        Task2 task2 = new Task2();
        Task2 task3 = new Task2();
        Task2 task4 = new Task2();
        Task2 task5 = new Task2();
        exec.execute(task1);
        exec.execute(task2);
        exec.execute(task3);
        exec.execute(task4);
        exec.execute(task5);

        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
            boolean prod = true;

            public void run() {
                Task2.blocker.prodAll();
            }
        }, 400, 400); // Run every .4 second
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        timer.cancel();
        System.out.println("\nTimer canceled");

        exec.shutdownNow();

    }
}

3,何时才能使用object的wait、nofify;condition的await、signal

注意:每个对lock()的调用都必须紧跟一个try finally的子句,以保证在任何情况下都能释放锁

package com.thinking.in.java.course.chapter21.waxomatic2;

import java.nio.Buffer;
import java.sql.Time;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//来自Condition源码中的示例
public class BoundedBufferDemo {

    final Lock lock = new ReentrantLock();
    final Condition full  = lock.newCondition();
    final Condition empty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                full.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            empty.signal();
        } finally {
            lock.unlock();
        }
    }

    //只要有锁的操作(lock、synchronized都可以直接调signal或await或wait、notify)
    public void test(){
        lock.lock();
        empty.signal();
        lock.unlock();
    }


    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                empty.await();
            }
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            full.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        BoundedBufferDemo demo = new BoundedBufferDemo();
        new BoundedBufferDemo().test();

        new Thread(()->{
            for (int i = 0; i < 100; i++) {
                try {
                    demo.put(i+"");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"write").start();

        TimeUnit.SECONDS.sleep(10);

        new Thread(()->{
            while (true) {
                Object o = null;
                try {
                    o = demo.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(o);
            }
        },"reader").start();

    }
}

生产者消费者队列

容量为null的时候,消费者挂起。容量满的时候,生产者挂起。待消费一个任务的时候,再去添加任务。

LinkedBlockingDeque:可以存放Integer.MAX_VALUE个任务 ArrayBlockingQueue:存放指定容量大小的任务,在无消费者的时候直至阻塞 SynchronousQueue:不见兔子不撒鹰,没有消费者,将直接挂起生产者,内部没有容量

流程控制示例 结合Enum

package com.thinking.in.java.course.chapter21;

import java.util.concurrent.*;
import java.util.*;


class Toast {
    public enum Status {DRY, BUTTERED, JAMMED}

    private Status status = Status.DRY;
    private final int id;

    public Toast(int idn) {
        id = idn;
    }

    public void butter() {
        status = Status.BUTTERED;
    }

    public void jam() {
        status = Status.JAMMED;
    }

    public Status getStatus() {
        return status;
    }

    public int getId() {
        return id;
    }

    public String toString() {
        return "Toast " + id + ": " + status;
    }
}

class ToastQueue extends SynchronousQueue<Toast> {
}

class Toaster implements Runnable {
    private ToastQueue toastQueue;
    private int count = 0;
    private Random rand = new Random(47);

    public Toaster(ToastQueue tq) {
        toastQueue = tq;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                //TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
                // Make toast
                Toast t = new Toast(count++);
                System.out.println(t);
                // Insert into queue
                toastQueue.put(t);
                //控制生产面包的数量,并保证生产过程能完整结束
                if(count >= 1000) {
                    System.exit(0);
                }

            }
        } catch (InterruptedException e) {
            System.out.println("Toaster interrupted");
        }
        System.out.println("Toaster off");
    }
}

// Apply butter to toast:
class Butterer implements Runnable {
    private ToastQueue dryQueue, butteredQueue;

    public Butterer(ToastQueue dry, ToastQueue buttered) {
        dryQueue = dry;
        butteredQueue = buttered;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                // Blocks until next piece of toast is available:
                Toast t = dryQueue.take();
                t.butter();
                System.out.println(t);
                butteredQueue.put(t);
            }
        } catch (InterruptedException e) {
            System.out.println("Butterer interrupted");
        }
        System.out.println("Butterer off");
    }
}

// Apply jam to buttered toast:
class Jammer implements Runnable {
    private ToastQueue butteredQueue, finishedQueue;

    public Jammer(ToastQueue buttered, ToastQueue finished) {
        butteredQueue = buttered;
        finishedQueue = finished;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                // Blocks until next piece of toast is available:
                Toast t = butteredQueue.take();
                t.jam();
                System.out.println(t);
                finishedQueue.put(t);
            }
        } catch (InterruptedException e) {
            System.out.println("Jammer interrupted");
        }
        System.out.println("Jammer off");
    }
}

// Consume the toast:
class Eater implements Runnable {
    private ToastQueue finishedQueue;
    private int counter = 0;

    public Eater(ToastQueue finished) {
        finishedQueue = finished;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                // Blocks until next piece of toast is available:
               Toast t = finishedQueue.take();
                // Verify that the toast is coming in order,
                // and that all pieces are getting jammed:
                if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
                    System.out.println(">>>> Error: " + t);
                    System.exit(1);
                } else
                    System.out.println("Chomp! " + t);
            }
        } catch (Exception e) {
            System.out.println("Eater interrupted");
        }
        System.out.println("Eater off");
    }
}

public class ToastOMatic {
    public static void main(String[] args) throws Exception {
        ToastQueue dryQueue = new ToastQueue(),
                butteredQueue = new ToastQueue(),
                finishedQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Toaster(dryQueue));
        exec.execute(new Butterer(dryQueue, butteredQueue));
        exec.execute(new Jammer(butteredQueue, finishedQueue));
        exec.execute(new Eater(finishedQueue));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
    }
}