生产者消费者问题也称为有限缓冲问题,是一个多线程同步的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即“生产者”和“消费者”在实际运行时会发生的问题。
生产者的主要作用是产生一定量的数据放到缓冲区中,然后重复此过程。
同时,消费者在缓冲区消耗这些数据。
该问题的关键是保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
**
案例:生产者生产鸡,两个消费者分别消费鸡
1. 使用synchronized+wait+notify+notifyAll 实现
public class TestSynchronized {public static void main(String[] args) {Container container = new Container();new Producer(container).start();new Consumer(container).start();new Consumer(container).start();}}//商品class Chicken{int id;public Chicken(int id){this.id = id;}}//缓冲区class Container{//定义一个固定大小的容器Chicken[] chickens = new Chicken[10];int size = 0;//生产者生产完100只鸡后置为trueboolean flag = false;//生产者放入产品public synchronized void push(Chicken chicken) throws InterruptedException {//如果容器满了,就需要等待消费者消费while (size >= chickens.length) {this.wait();}//如果没有满,就丢入产品chickens[size] = chicken;System.out.println("生产了第" +chicken.id+"只鸡");size++;if (chicken.id == 100){flag = true;}this.notifyAll();}//消费者消费产品public synchronized void poll() throws InterruptedException {//如果没有鸡了,就得通知生产者生产while (size <= 0){if (flag){return;}this.notifyAll();this.wait();}//如果还有鸡,就消费鸡size--;Chicken chicken = chickens[size];System.out.println(Thread.currentThread().getName()+"消费了第"+chicken.id+"只鸡");}}//生产者class Producer extends Thread{Container container;public Producer(Container container){this.container= container;}public void run() {for (int i = 1; i <= 100; i++) {try {container.push(new Chicken(i));} catch (InterruptedException e) {e.printStackTrace();}}}}//消费者class Consumer extends Thread{Container container;public Consumer(Container container) {this.container = container;}public void run() {for (int i = 0; i < 100; i++) {try {container.poll();} catch (InterruptedException e) {e.printStackTrace();}}}}
必须要注意的是:notify不能指定唤醒,但也不是随机唤醒,底层有一定的规则,但不知道是啥。
如果将poll()方法中this.notifyAll()换成this.notify(),则会出现两个消费者互相唤醒,但就是不唤醒生产者的问题。
注意,this.notify()和this.wait()必须在同步方法中使用。
2. 使用Lock
每条线程需要被一个同步监视器Condition来监视,做到精准唤醒。
如下代码,如果在Main方法中直接再new一条消费者线程,当唤醒消费者时会同时唤醒两个消费者,因为两条线程共用了consume()方法中的监视器,如果想要精准唤醒其中的一条线程就必须再写一组Condition+consume1方法+线程调用consume1方法。
import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class TestLock {public static void main(String[] args) {Container1 container1 = new Container1();new Producer1(container1).start();new Consumer1(container1).start();}}class Container1 {Queue<Product> queue = new LinkedList<>();Lock lock = new ReentrantLock();Condition consumerCondition = lock.newCondition();Condition producerCondition = lock.newCondition();public void consume() {lock.lock();try {while (queue.size() == 0) {consumerCondition.await();}Product product = queue.poll();producerCondition.signal();System.out.println(Thread.currentThread().getName()+"消费了---第"+product.id+"个产品---");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public void produce(Product product) {lock.lock();try {while(queue.size()>= 10) {producerCondition.await();}queue.add(product);System.out.println(Thread.currentThread().getName()+"生产了第"+product.id+"个产品");consumerCondition.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}class Product{int id ;public Product(int id){this.id = id;}}class Consumer1 extends Thread{Container1 container1;public Consumer1(Container1 container1){this.container1 = container1;}public void run() {for (int i = 1; i <= 100; i++) {container1.consume();}}}class Producer1 extends Thread{Container1 container1;public Producer1(Container1 container1){this.container1 = container1;}public void run() {for (int i = 1; i <= 100; i++) {container1.produce(new Product(i));}}}
3. 使用阻塞队列
阻塞队列的特征:
当队列为空时,线程从中取元素会被阻塞并且置于等待状态,直到有线程向其中放入元素;
当队列为满时,向其中放入元素的线程会被阻塞并且置于等待状态,直到有线程取出元素。
使用ArrayBlockingQueue实现,put为放入,take为取出。
import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class TestBlockingQueue {static BlockingQueue<Person> blockingQueue = new ArrayBlockingQueue<>(10);public static void main(String[] args) {new Thread(() -> {for (int i = 1; i < 100; i++) {try {blockingQueue.put(new Person(i));System.out.println("第"+i+"个大佬进入队列排队");Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}},"Producer").start();new Thread(() -> {for (int i = 1; i < 100; i++) {Person person = null;try {person = blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+ "-----将第"+person.id+"位大佬带出队列");}},"Consumer1").start();new Thread(() -> {for (int i = 0; i < 100; i++) {Person person = null;try {person = blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+ "-----将第"+person.id+"位大佬带出队列");}},"Consumer2").start();new Thread(() -> {for (int i = 0; i < 100; i++) {Person person = null;try {person = blockingQueue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+ "-----将第"+person.id+"位大佬带出队列");}},"Consumer3").start();}}class Person{int id;public Person(int id){this.id = id;}}
4. 自旋锁+原子类实现
将原子类充当缓冲区数据量的计数器,先更新数据再更新原子类,可以实现生产者与消费者模型。
但由于更新数据和更新原子类计数器是分步操作,所以会出现问题:
存在一个数据但消费者无法消费或队列没有数据但生产者无法生产。
这里理论上可以不用原子类,但使用普通int数作为计数器产生了意料之外的死循环。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestCAS {
static AtomicInteger num = new AtomicInteger(0);
static Queue<Chicken> queue = new LinkedList<>();
public static void main(String[] args) {
new Consumer2().start();
new Producer2().start();
}
static class Producer2 extends Thread {
public void run() {
while (true) {
while (num.get() >= 20) {
}
queue.add(new Chicken(1));
int afterProduce = num.incrementAndGet();
System.out.println("生产后剩余" + afterProduce + "个产品");
}
}
}
static class Consumer2 extends Thread {
public void run() {
while (true) {
while (num.get() <= 0) {
}
queue.poll();
int afterConsumer = num.getAndDecrement();
System.out.println("消费后剩余" + afterConsumer + "个产品");
}
}
}
}
