synchronized(背过这种写法、现场写法也可以)
- 有现成的阻塞队列吗?put和take===> BlockingQueue
- 这是一种同步容器:可以支持多线程往里装,也可以支持多线程往外拿
- get和put加了synchronized,所以可以直接调用,不必在调用的时候再加锁了
- 为什么要加synchronized?
- 因为count是共享变量,需要互斥访问
- 不加的话,会造成得到的不是最新数据,会出现错误(会出现容器中有空位却加不进来,容器中有东西却拿不出来)
- 为什么用while而不是用if?
- 因为生产者和消费者都不止一个
- 一个生产者因为容器满了阻塞等待,消费者消费后唤醒生产者,这时可能被其他得生产者获得了执行得机会,这样原先得生产者还是不能执行,因此得循环判断!消费者的while循环同理。
- 不循环的话,会造成非互斥访问共享资源,还是会出问题;所以醒了之后还是得回来判断一遍容器是不是还是满着的!!!
- 对于复杂问题一定要解耦!!!一定要细分成小情况仔细讨论!!!这样才能看出奥秘,而不至于被其中得多种情况冲昏头脑。比如上面得生产者因为没有空位阻塞和消费者因为没有产品而阻塞是两种相反的、矛盾的情况,假如放在一起很容易把自己搞晕,会造成两个结论相互否定,从而怀疑自己,觉得自己想的不对,会陷入事实上两个是两种不同情况下得到的都是正确的结论但是又相互矛盾的情况。逻辑是正确的,但是因为是不同的情况而造成了觉得自己错了的情况!!!
/**
* 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
* 能够支持2个生产者线程以及10个消费者线程的阻塞调用
*
* 使用wait和notify/notifyAll来实现
*
* @author mashibing
*/
package com.mashibing.juc.c_021_01_interview;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
public class MyContainer1<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;
public synchronized void put(T t) {
while(lists.size() == MAX) { //想想为什么用while而不是用if?
try {
// 扔满了当前线程停住
this.wait(); //effective java
} catch (InterruptedException e) {
e.printStackTrace();
}
}
lists.add(t);
++count;
this.notifyAll(); //通知消费者线程进行消费
}
public synchronized T get() {
T t = null;
while(lists.size() == 0) {
try {
// 拿完了当前线程停住
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
t = lists.removeFirst();
count --;
this.notifyAll(); //通知生产者进行生产
return t;
}
public static void main(String[] args) {
MyContainer1<String> c = new MyContainer1<>();
//启动消费者线程
for(int i=0; i<10; i++) {
new Thread(()->{
// 一个消费者线程get5次
for(int j=0; j<5; j++) System.out.println(c.get());
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//启动生产者线程
for(int i=0; i<2; i++) {
new Thread(()->{
// 一个生产者线程put25次
for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}
CAS
- put中上来先读出count,再将值放入队列,最后count++时用CAS来加
- CAS的期望值是5,更新值是6,只要现有值等于期望的值5,就说明这之间(存入队列的过程中)没有任何一个线程打断过他
- ABA问题
- 并且队列也是一个共享资源也是需要互斥访问的,不光要互斥访问count,也要互斥访问队列!!!
利用Condition更加细化地处理等待队列(阻塞与唤醒)
- 上述代码仍然有一些瑕疵:在代码中唤醒地时候用的是notifyAll,会唤醒所有等待的线程(synchronized一加任意时刻就只有一个线程在那运行,其他生产者消费者线程都在等待),所有等待线程争抢这把锁,抢到的线程仍然只有一个;wait是阻塞自己,并且释放锁,让别的线程去执行;假如生产者生产满了,会wait释放锁叫醒其他非wait线程,叫醒消费者线程是最好的(本意也是如此),但是不幸的是,他同样可能会叫醒另一个生产者线程,即另一个线程拿到了这把锁,拿到锁之后,这个生产者线程也会wait一遍;假如一直唤醒的是生产者,会一直wait,造成效率低下
- wait会释放锁,会让其他线程执行,而不是唤醒阻塞线程;notify是唤醒那些wait的线程
- 一个线程刚好填满容器,然后notifyAll,会唤醒其他线程,有可能唤醒的还是生产者线程,他会wait,其他线程会补上,假如还是生产者就不行,而是消费者时就行
- 是生产者wait的,是没有必要去叫醒其他生产者的,只要叫醒消费者即可
- 是消费者wait的,是没有必要去叫醒其他消费者的,只要叫醒生产者即可
- 生产者线程只负责叫醒消费者,消费者线程只负责叫醒生产者
- 像完成这种需求,可以用Condition+ReentrantLock(对于不同的情况有不同的阻塞队列)
- 这也是ReentrantLock和synchronized的最大区别:ReentrantLock可以有条件
- Condition能够准确地指定哪些线程被叫醒!!!(是哪些而不是哪个)
- lock Condition的本质是什么(consumer和producer)—->本质是这把锁
- 之前一把锁对应一个等待队列(阻塞队列)
- 用了Condition之后就变成了多个等待队列(阻塞队列)
- Condition的本质就是等待队列的个数!!!
- Condition的本质就是不同的等待队列
/**
* 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法,
* 能够支持2个生产者线程以及10个消费者线程的阻塞调用
*
* 使用wait和notify/notifyAll来实现
*
* 使用Lock和Condition来实现
* 对比两种方式,Condition的方式可以更加精确的指定哪些线程被唤醒
*
* @author mashibing
*/
package com.mashibing.juc.c_021_01_interview;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyContainer2<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void put(T t) {
try {
lock.lock();
while(lists.size() == MAX) { //想想为什么用while而不是用if?
producer.await();
}
lists.add(t);
++count;
consumer.signalAll(); //通知消费者线程进行消费
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
try {
lock.lock();
while(lists.size() == 0) {
consumer.await();
}
t = lists.removeFirst();
count --;
producer.signalAll(); //通知生产者进行生产
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
//启动消费者线程
for(int i=0; i<10; i++) {
new Thread(()->{
for(int j=0; j<5; j++) System.out.println(c.get());
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//启动生产者线程
for(int i=0; i<2; i++) {
new Thread(()->{
for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}