生产者消费者问题也称为有限缓冲问题,是一个多线程同步的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即“生产者”和“消费者”在实际运行时会发生的问题。
生产者的主要作用是产生一定量的数据放到缓冲区中,然后重复此过程。
同时,消费者在缓冲区消耗这些数据。
该问题的关键是保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
**
案例:生产者生产鸡,两个消费者分别消费鸡
1. 使用synchronized+wait+notify+notifyAll 实现
public class TestSynchronized {
public static void main(String[] args) {
Container container = new Container();
new Producer(container).start();
new Consumer(container).start();
new Consumer(container).start();
}
}
//商品
class Chicken{
int id;
public Chicken(int id){
this.id = id;
}
}
//缓冲区
class Container{
//定义一个固定大小的容器
Chicken[] chickens = new Chicken[10];
int size = 0;
//生产者生产完100只鸡后置为true
boolean flag = false;
//生产者放入产品
public synchronized void push(Chicken chicken) throws InterruptedException {
//如果容器满了,就需要等待消费者消费
while (size >= chickens.length) {
this.wait();
}
//如果没有满,就丢入产品
chickens[size] = chicken;
System.out.println("生产了第" +chicken.id+"只鸡");
size++;
if (chicken.id == 100){
flag = true;
}
this.notifyAll();
}
//消费者消费产品
public synchronized void poll() throws InterruptedException {
//如果没有鸡了,就得通知生产者生产
while (size <= 0){
if (flag){
return;
}
this.notifyAll();
this.wait();
}
//如果还有鸡,就消费鸡
size--;
Chicken chicken = chickens[size];
System.out.println(Thread.currentThread().getName()+"消费了第"+chicken.id+"只鸡");
}
}
//生产者
class Producer extends Thread{
Container container;
public Producer(Container container){
this.container= container;
}
public void run() {
for (int i = 1; i <= 100; i++) {
try {
container.push(new Chicken(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//消费者
class Consumer extends Thread{
Container container;
public Consumer(Container container) {
this.container = container;
}
public void run() {
for (int i = 0; i < 100; i++) {
try {
container.poll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
必须要注意的是:notify不能指定唤醒,但也不是随机唤醒,底层有一定的规则,但不知道是啥。
如果将poll()方法中this.notifyAll()换成this.notify(),则会出现两个消费者互相唤醒,但就是不唤醒生产者的问题。
注意,this.notify()和this.wait()必须在同步方法中使用。
2. 使用Lock
每条线程需要被一个同步监视器Condition来监视,做到精准唤醒。
如下代码,如果在Main方法中直接再new一条消费者线程,当唤醒消费者时会同时唤醒两个消费者,因为两条线程共用了consume()方法中的监视器,如果想要精准唤醒其中的一条线程就必须再写一组Condition+consume1方法+线程调用consume1方法。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestLock {
public static void main(String[] args) {
Container1 container1 = new Container1();
new Producer1(container1).start();
new Consumer1(container1).start();
}
}
class Container1 {
Queue<Product> queue = new LinkedList<>();
Lock lock = new ReentrantLock();
Condition consumerCondition = lock.newCondition();
Condition producerCondition = lock.newCondition();
public void consume() {
lock.lock();
try {
while (queue.size() == 0) {
consumerCondition.await();
}
Product product = queue.poll();
producerCondition.signal();
System.out.println(Thread.currentThread().getName()+"消费了---第"+product.id+"个产品---");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void produce(Product product) {
lock.lock();
try {
while(queue.size()>= 10) {
producerCondition.await();
}
queue.add(product);
System.out.println(Thread.currentThread().getName()+"生产了第"+product.id+"个产品");
consumerCondition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
class Product{
int id ;
public Product(int id){
this.id = id;
}
}
class Consumer1 extends Thread{
Container1 container1;
public Consumer1(Container1 container1){
this.container1 = container1;
}
public void run() {
for (int i = 1; i <= 100; i++) {
container1.consume();
}
}
}
class Producer1 extends Thread{
Container1 container1;
public Producer1(Container1 container1){
this.container1 = container1;
}
public void run() {
for (int i = 1; i <= 100; i++) {
container1.produce(new Product(i));
}
}
}
3. 使用阻塞队列
阻塞队列的特征:
当队列为空时,线程从中取元素会被阻塞并且置于等待状态,直到有线程向其中放入元素;
当队列为满时,向其中放入元素的线程会被阻塞并且置于等待状态,直到有线程取出元素。
使用ArrayBlockingQueue实现,put为放入,take为取出。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestBlockingQueue {
static BlockingQueue<Person> blockingQueue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
new Thread(() -> {
for (int i = 1; i < 100; i++) {
try {
blockingQueue.put(new Person(i));
System.out.println("第"+i+"个大佬进入队列排队");
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Producer").start();
new Thread(() -> {
for (int i = 1; i < 100; i++) {
Person person = null;
try {
person = blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ "-----将第"+person.id+"位大佬带出队列");
}
},"Consumer1").start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
Person person = null;
try {
person = blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ "-----将第"+person.id+"位大佬带出队列");
}
},"Consumer2").start();
new Thread(() -> {
for (int i = 0; i < 100; i++) {
Person person = null;
try {
person = blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ "-----将第"+person.id+"位大佬带出队列");
}
},"Consumer3").start();
}
}
class Person{
int id;
public Person(int id){
this.id = id;
}
}
4. 自旋锁+原子类实现
将原子类充当缓冲区数据量的计数器,先更新数据再更新原子类,可以实现生产者与消费者模型。
但由于更新数据和更新原子类计数器是分步操作,所以会出现问题:
存在一个数据但消费者无法消费或队列没有数据但生产者无法生产。
这里理论上可以不用原子类,但使用普通int数作为计数器产生了意料之外的死循环。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestCAS {
static AtomicInteger num = new AtomicInteger(0);
static Queue<Chicken> queue = new LinkedList<>();
public static void main(String[] args) {
new Consumer2().start();
new Producer2().start();
}
static class Producer2 extends Thread {
public void run() {
while (true) {
while (num.get() >= 20) {
}
queue.add(new Chicken(1));
int afterProduce = num.incrementAndGet();
System.out.println("生产后剩余" + afterProduce + "个产品");
}
}
}
static class Consumer2 extends Thread {
public void run() {
while (true) {
while (num.get() <= 0) {
}
queue.poll();
int afterConsumer = num.getAndDecrement();
System.out.println("消费后剩余" + afterConsumer + "个产品");
}
}
}
}