序言:当使用线程来同时运行多个任务的时候,可以通过使用锁(互斥)来同步两个对象的行为。从而使得另一个对象不会干涉另一个任务的资源(线程安全)。下面要使线程之间相互协作,从而使多个任务可以一起工作去解决某个问题。
一、wait()和notifyAll();
注意事项:wait和notify()/notifyAll()只能在加锁的方法中去使用,不然会抛出IllegalMonitorStateException
1、Wait():
wait使当前线程在等待某个特定的条件前被挂起,同时释放锁,只有在notify或notidyAll发生的时候,当前线程才会被唤醒。有两种形式的wait:
①:接受毫秒作为参数,在规定时间后唤醒线程。
class Blocked{
synchronized void waitinfo2(){
try {
System.out.println("线程被挂起");
wait(2000);//2000毫秒后线程自动唤醒
System.out.println("当前线程为:"+Thread.currentThread());
}catch (InterruptedException e) {
e.printStackTrace();
}
}
class WaitTest implements Runnable{
Blocked blocked=new Blocked();
@Override
public void run() {
blocked.waitinfo2();
}
}
public class WaitDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(new WaitTest());
}
}
②:不接受任何参数,将无限等待下去,直到线程收到notify或者notifyAll()消息
class Blocked{
synchronized void waitinfo(){
try {
System.out.println("线程被挂起");
wait();
System.out.println("当前线程为:"+Thread.currentThread());
}catch (InterruptedException e) {
e.printStackTrace();
}
synchronized void prod(){
notify();
}
}
class WaitTest implements Runnable{
Blocked blocked=new Blocked();
@Override
public void run() {
blocked.waitinfo();
}
}
public class WaitDemo {
public static void main(String[] args) throws InterruptedException {
WaitTest waitTest = new WaitTest();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(waitTest);
TimeUnit.SECONDS.sleep(2);
new Thread(){
@Override
public void run() {
waitTest.blocked.prod();
}
}.start();
}
}
注意:对于wait来说,只有同一个对象锁的notify或者notifyAll才能将其唤醒
class Blocked{
synchronized void waitinfo(){
try {
System.out.println("线程被挂起");
wait();
System.out.println("当前线程为:"+Thread.currentThread());
}catch (InterruptedException e) {
e.printStackTrace();
}
synchronized void prod(){
notify();
}
}
class WaitTest implements Runnable{
Blocked blocked=new Blocked();
@Override
public void run() {
blocked.waitinfo();
}
}
public class WaitDemo {
public static void main(String[] args) throws InterruptedException {
WaitTest waitTest = new WaitTest();
WaitTest waitTest2 = new WaitTest();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(waitTest);
TimeUnit.SECONDS.sleep(2);
new Thread(){
@Override
public void run() {
// waitTest2.blocked.prod();//此时不是同一个对象锁,则无法唤醒线程
waitTest.blocked.prod();
}
}.start();
}
}
2、notify和notifyAll的区别:
notify在众多等待**同一个锁**的任务**只有一个**会被唤醒,所有在使用notify时要保证被唤醒的是恰当的任务。而notifyAll会将众多等**待同一个锁**的任务**一起唤醒。notify如果因某个特定的锁而被调用时,只有等待这个特定锁的任务才会被唤醒。**
class Blocker {
synchronized void waitingCall() {
try {
while (!Thread.interrupted()) {
wait();
System.out.println("唤醒的线程为: " + Thread.currentThread() + " ");
}
} catch (InterruptedException e) {
}
}
synchronized void prod() {
notify();
}
synchronized void prodAll() {
notifyAll();
}
}
class Task implements Runnable {
static Blocker blocker = new Blocker();
public void run() {
blocker.waitingCall();
}
}
class Task2 implements Runnable {
static Blocker blocker = new Blocker();
public void run() {
blocker.waitingCall();
}
}
public class NotifyVsNotifyAll {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(new Task());
exec.execute(new Task2());
Timer timer = new Timer();//开启一个定时任务
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true;
@Override
public void run() {
if (prod) {
System.out.println("唤醒一个线程 notify()");
Task.blocker.prod();
prod = false;
} else {
System.out.println("唤醒所以的线程 notifyAll()");
Task.blocker.prodAll();
prod = true;
}
}
}, 400, 400);
TimeUnit.SECONDS.sleep(2); // Run for a while...
timer.cancel();//关闭计时器
TimeUnit.SECONDS.sleep(2);
System.out.println("=============");
Task2.blocker.prodAll();//唤醒特定的notifyAll();
exec.shutdownNow();
}
}
3.wait和notify的示例(生产者与消费者):
在下面的列子中生产者是厨师,消费者是服务员,两者相互工作互不干扰。
class Meal {//食物
private final int orderNum;
Meal(int orderNum) {
this.orderNum = orderNum;
}
@Override
public String toString() {
return "Meal " + orderNum;
}
}
class WaitPerson implements Runnable {//消费者
private Restaurnt restaurnt;
WaitPerson(Restaurnt restaurnt) {
this.restaurnt = restaurnt;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurnt.meal == null)
wait();//此时服务员线程的锁被释放
}
System.out.println("Waitperson got " + restaurnt.meal);
synchronized (restaurnt.chef) {//先捕获对应的锁,解开对应chef的wait
restaurnt.meal = null;
restaurnt.chef.notifyAll();
}
}
} catch (InterruptedException e) {
System.out.println(e);
}
}
}
class Chef implements Runnable {
private Restaurnt restaurnt;
private int count = 0;
Chef(Restaurnt restaurnt) {
this.restaurnt = restaurnt;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurnt.meal != null)//如果有食物则进入挂起状态
wait();
}
if (++count == 10) {
System.out.println("以达到销售目标,停止售卖");
restaurnt.service.shutdownNow();//关闭线程
}
System.out.println("Order up!");
synchronized (restaurnt.waitPerson) {//先捕获对应的锁,解开对应waitPerson的wait
restaurnt.meal = new Meal(count);
restaurnt.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("Chef interrupted");
}
}
}
public class Restaurnt {
Meal meal;
Chef chef = new Chef(this);
WaitPerson waitPerson = new WaitPerson(this);
ExecutorService service = Executors.newCachedThreadPool();
Restaurnt() {
service.execute(chef);
service.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurnt();
}
}
在上面的列子中,保证了使用了同一个对象锁,并且当需要调用notifyAll的时候要先获得对应的锁才可以唤醒对应的线程
二、condition的await、signal
可以通过Condition中的await()方法来挂起一个任务,同时通过signal()/signalAll()来唤醒被挂起的线程。
1.示例一:源码中的典型示例
//来自Condition源码中的示例
public class BoundedBufferDemo {
final Lock lock = new ReentrantLock();
final Condition full = lock.newCondition();
final Condition empty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
full.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
empty.signal();
} finally {
lock.unlock();
}
}
//只要有锁的操作(lock、synchronized都可以直接调signal或await或wait、notify)
public void test(){
lock.lock();
empty.signal();
lock.unlock();
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
empty.await();
}
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
full.signal();
return x;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
BoundedBufferDemo demo = new BoundedBufferDemo();
new BoundedBufferDemo().test();
new Thread(()->{
for (int i = 0; i < 100; i++) {
try {
demo.put(i+"");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"write").start();
TimeUnit.SECONDS.sleep(10);
new Thread(()->{
while (true) {
Object o = null;
try {
o = demo.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(o);
}
},"reader").start();
}
}
2.示例二:重写汽车涂料示例
class Car {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean waxOn = false;
public void waxed() {
lock.lock();
try {
waxOn = true; // Ready to buff
condition.signalAll();
} finally {
lock.unlock();
}
}
public void buffed() {
lock.lock();
try {
waxOn = false; // Ready for another coat of wax
condition.signalAll();
} finally {
lock.unlock();
}
}
public void waitForWaxing() throws InterruptedException {
lock.lock();
try {
while(waxOn == false)
condition.await();
} finally {
lock.unlock();
}
}
public void waitForBuffing() throws InterruptedException{
lock.lock();
try {
while(waxOn == true)
condition.await();
} finally {
lock.unlock();
}
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
printnb("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax On task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) { car = c; }
public void run() {
try {
while(!Thread.interrupted()) {
car.waitForWaxing();
printnb("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch(InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax Off task");
}
}
public class WaxOMatic2 {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
三、同步队列
我们可以使用同步对列来解决任务协作的问题,同步对列在任何时刻都值允许一个任务插入或移除元素。
1.LinkedBlockingQueue:无限容量,添加多少取出多少
class LiftOffRunner implements Runnable{//生产者
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue){
rockets=queue;
}
public void add(LiftOff lo){//生产方法
try {
rockets.put(lo);
System.out.println("添加时对列的大小:"+rockets.size());
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");
}
}
@Override
public void run() {//消费方法
try {
while (!Thread.interrupted()) {
LiftOff liftOff = rockets.take();//消费者消费
System.out.println("取出后对列的大小:"+rockets.size());
liftOff.run();
}
} catch (Exception e) {
System.out.println("Interrupted take()");
}
System.out.println("Exiting LiftOffRunner");
}
}
public class TestBlockingQueue {
static void test(String msg,BlockingQueue<LiftOff> queue) throws InterruptedException {
System.out.println("对列名称:"+msg);
LiftOffRunner runner = new LiftOffRunner(queue);//创建消费者
Thread thread = new Thread(runner);
for (int i = 0; i <5 ; i++) {
runner.add(new LiftOff(5));//添加了五个LiftOff对象
}
thread.start();
TimeUnit.SECONDS.sleep(5);
thread.interrupt();
System.out.println("Finshed "+msg+" test");
}
public static void main(String[] args) throws InterruptedException {
test("LinkdedBlockQueue",new LinkedBlockingQueue<LiftOff>());
}
}
2.ArrayBlockingQueue,
可以设置固定的尺寸,当该对列为空时,消费者将被挂起,当生产者达到设定的最大目标的时候,如果没有消费,此时生成者将被挂起。
class LiftOffRunner implements Runnable{
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue){
rockets=queue;
}
public void add(LiftOff lo){
try {
rockets.put(lo);
System.out.println("添加时对列的大小:"+rockets.size());
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");
}
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
LiftOff liftOff = rockets.take();//消费者消费
System.out.println("取出后对列的大小:"+rockets.size());
liftOff.run();
}
} catch (Exception e) {
System.out.println("Interrupted take()");
}
System.out.println("Exiting LiftOffRunner");
}
}
public class TestBlockingQueue {
static void test(String msg,BlockingQueue<LiftOff> queue) throws InterruptedException {
System.out.println("对列名称:"+msg);
LiftOffRunner runner = new LiftOffRunner(queue);//创建消费者
Thread thread = new Thread(runner);
//将消费者放在生产前面,此时的生产者设定的目标为三,这个时候生产者到达生产目标之后,将会被消费之后在进行生产
thread.start();
for (int i = 0; i <5 ; i++) {
runner.add(new LiftOff(5));//添加了五个LiftOff对象
}
// thread.start();//如果将消费者放在后面,此时的生产者设定的目标为三,这个时候生产者将被挂起
TimeUnit.SECONDS.sleep(5);
System.out.println("Finshed "+msg+" test");
}
public static void main(String[] args) throws InterruptedException {
test("ArrayBlockingQueu",new ArrayBlockingQueue<LiftOff>(3));
}
}
3.SynchronousQueue:
由于该对列本身没用任何容量,所以该对列放入一个消费一个,如果没用去消费。则任何元素都无法添加到该对列中去
class LiftOffRunner implements Runnable{
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue){
rockets=queue;
}
public void add(LiftOff lo){
try {
rockets.put(lo);
System.out.println("添加时对列的大小:"+rockets.size());
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");
}
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
LiftOff liftOff = rockets.take();//消费者消费
System.out.println("取出后对列的大小:"+rockets.size());
liftOff.run();
}
} catch (Exception e) {
System.out.println("Interrupted take()");
}
System.out.println("Exiting LiftOffRunner");
}
}
public class TestBlockingQueue {
static void test(String msg,BlockingQueue<LiftOff> queue) throws InterruptedException {
System.out.println("对列名称:"+msg);
LiftOffRunner runner = new LiftOffRunner(queue);//创建消费者
Thread thread = new Thread(runner);
thread.start();
for (int i = 0; i <5 ; i++) {
runner.add(new LiftOff(5));//添加了五个LiftOff对象
}
TimeUnit.SECONDS.sleep(5);
System.out.println("Finshed "+msg+" test");
}
public static void main(String[] args) throws InterruptedException {
test("Synchronous",new SynchronousQueue<LiftOff>());
}
}
4. 同步队示例——烤土司
1.吐司类
public class Toast {
public enum Status {DRY,BUTTERED,JAMMED}
private Status status=Status.DRY;//默认烤吐司
private final int id;
public Toast(int i){
id=i;
}
public void butter(){
status=Status.BUTTERED;//涂黄油
}
public void jammed(){//涂果酱
status=Status.JAMMED;
}
public Status getStatus(){
return status;
}
public int getId() {
return id;
}
@Override
public String toString() {
return "Toast"+id+":"+status;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast>{
}
2.拷吐司之后摸黄油然后涂果酱
class Toaster implements Runnable{
private ToastQueue toasts;//该对列放置烤好的吐司
private int count=0;
public Toaster(ToastQueue queue){
toasts=queue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast toast = new Toast(count++);
TimeUnit.MILLISECONDS.sleep(200);
System.out.println(toast);
//把烤好的吐司放入对列中
toasts.put(toast);
}
} catch (InterruptedException e) {
System.out.println("Toaster interrupted");
}
System.out.println("Toaster off");
}
}
//涂果酱
class Butterer implements Runnable{
private ToastQueue dryQueue,//放置烤好的吐司
butteredQueue;//放置吐完黄油的吐司
Butterer(ToastQueue dryQueue,ToastQueue butteredQueue){
this.dryQueue=dryQueue;
this.butteredQueue=butteredQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
Toast take = dryQueue.take();//先获得烤好的吐司
take.butter();//给吐司涂黄油
System.out.println(take);
butteredQueue.put(take);//放入对列
}
} catch (InterruptedException e) {
System.out.println("Butter Interrupted");
}
System.out.println("Butter off");
}
}
class Jammer implements Runnable{
private ToastQueue butteredQueue,//放置好涂完黄油的吐司
jammerQueue;//放置吐完果酱的吐司
Jammer(ToastQueue butteredQueue,ToastQueue jammerQueue){
this.butteredQueue=butteredQueue;
this.jammerQueue=jammerQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()){
Toast take = butteredQueue.take();//先获得涂完黄油的吐司
take.jammed();
System.out.println(take);
jammerQueue.put(take);//涂好的果酱吐司放入对列
}
}catch (InterruptedException e){
System.out.println("Jammer Interrupted");
}
System.out.println("Jammer off");
}
}
3.吃烤好的吐司
public class Eater implements Runnable{
private ToastQueue jammerQueue;
private int count=0;
Eater(ToastQueue jammerQueue){
this.jammerQueue=jammerQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
Toast take = jammerQueue.take();
if (take.getId()!=count++||take.getStatus()!=Toast.Status.JAMMED){
System.out.println("吐司还没玩成"+take);
}else{
System.out.println("吃!!! "+take);
System.out.println();
}
}
} catch (InterruptedException e) {
System.out.println("Eater InterruptedException");
}
}
}
public class ToastMatic {
public static void main(String[] args) throws InterruptedException {
ToastQueue dryQueue = new ToastQueue(),butteredQueue = new ToastQueue(),jammerQueue = new ToastQueue();
ExecutorService service = Executors.newCachedThreadPool();
service.execute(new Toaster(dryQueue));//先烤吐司
service.execute(new Butterer(dryQueue,butteredQueue));
service.execute(new Jammer(butteredQueue,jammerQueue));
service.execute(new Eater(jammerQueue));
TimeUnit.SECONDS.sleep(5);
service.shutdownNow();
}
}
扩展:因为该实例使用的是linked同步对列,具有无限容量,所以当休眠取消后就会有许多土司放入该队列。所以更应该去使用SynchronousQueue。
四、任务间使用管道进行输入输出
通过输入输出流在线程中进行通信是很有用的,线程中提供“管道”的形式进行支持。两者通过管道进行传输,管道基本上是一个阻塞队列。**PiedWtiter**(允许任务线程管道写)、**PipedReader**(允许不同任务从同一个管道中读取)<br />
class Sender implements Runnable{
private PipedWriter pipedWriter=new PipedWriter();
public PipedWriter getPipedWriter() {
return pipedWriter;
}
@Override
public void run() {
try {
while (true)
for (char c = 'A'; c <'c' ; c++) {
pipedWriter.write(c);//不断进行写入
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (IOException | InterruptedException e) {
System.out.println("输入流被打断");
}
}
}
class Receiver implements Runnable{
private PipedReader in;
int i=0;
Receiver(Sender sender) throws IOException {
in=new PipedReader(sender.getPipedWriter());
}
@Override
public void run() {
try{
while (true){
i++;
int read = in.read();
System.out.print((char)read);
if (i%5==0)
System.out.println();
}
}
catch (Exception e){
System.out.println("输出流被打断");
}
}
}
public class PipedIo {
public static void main(String[] args) throws IOException, InterruptedException {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(sender);
service.execute(receiver);
TimeUnit.SECONDS.sleep(4);
service.shutdownNow();
}
}