生产者消费者问题也称为有限缓冲问题,是一个多线程同步的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即“生产者”和“消费者”在实际运行时会发生的问题。
生产者的主要作用是产生一定量的数据放到缓冲区中,然后重复此过程。
同时,消费者在缓冲区消耗这些数据。
该问题的关键是保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
**
案例:生产者生产鸡,两个消费者分别消费鸡

1. 使用synchronized+wait+notify+notifyAll 实现

  1. public class TestSynchronized {
  2. public static void main(String[] args) {
  3. Container container = new Container();
  4. new Producer(container).start();
  5. new Consumer(container).start();
  6. new Consumer(container).start();
  7. }
  8. }
  9. //商品
  10. class Chicken{
  11. int id;
  12. public Chicken(int id){
  13. this.id = id;
  14. }
  15. }
  16. //缓冲区
  17. class Container{
  18. //定义一个固定大小的容器
  19. Chicken[] chickens = new Chicken[10];
  20. int size = 0;
  21. //生产者生产完100只鸡后置为true
  22. boolean flag = false;
  23. //生产者放入产品
  24. public synchronized void push(Chicken chicken) throws InterruptedException {
  25. //如果容器满了,就需要等待消费者消费
  26. while (size >= chickens.length) {
  27. this.wait();
  28. }
  29. //如果没有满,就丢入产品
  30. chickens[size] = chicken;
  31. System.out.println("生产了第" +chicken.id+"只鸡");
  32. size++;
  33. if (chicken.id == 100){
  34. flag = true;
  35. }
  36. this.notifyAll();
  37. }
  38. //消费者消费产品
  39. public synchronized void poll() throws InterruptedException {
  40. //如果没有鸡了,就得通知生产者生产
  41. while (size <= 0){
  42. if (flag){
  43. return;
  44. }
  45. this.notifyAll();
  46. this.wait();
  47. }
  48. //如果还有鸡,就消费鸡
  49. size--;
  50. Chicken chicken = chickens[size];
  51. System.out.println(Thread.currentThread().getName()+"消费了第"+chicken.id+"只鸡");
  52. }
  53. }
  54. //生产者
  55. class Producer extends Thread{
  56. Container container;
  57. public Producer(Container container){
  58. this.container= container;
  59. }
  60. public void run() {
  61. for (int i = 1; i <= 100; i++) {
  62. try {
  63. container.push(new Chicken(i));
  64. } catch (InterruptedException e) {
  65. e.printStackTrace();
  66. }
  67. }
  68. }
  69. }
  70. //消费者
  71. class Consumer extends Thread{
  72. Container container;
  73. public Consumer(Container container) {
  74. this.container = container;
  75. }
  76. public void run() {
  77. for (int i = 0; i < 100; i++) {
  78. try {
  79. container.poll();
  80. } catch (InterruptedException e) {
  81. e.printStackTrace();
  82. }
  83. }
  84. }
  85. }

必须要注意的是:notify不能指定唤醒,但也不是随机唤醒,底层有一定的规则,但不知道是啥。
如果将poll()方法中this.notifyAll()换成this.notify(),则会出现两个消费者互相唤醒,但就是不唤醒生产者的问题。
注意,this.notify()和this.wait()必须在同步方法中使用。

2. 使用Lock

每条线程需要被一个同步监视器Condition来监视,做到精准唤醒。
如下代码,如果在Main方法中直接再new一条消费者线程,当唤醒消费者时会同时唤醒两个消费者,因为两条线程共用了consume()方法中的监视器,如果想要精准唤醒其中的一条线程就必须再写一组Condition+consume1方法+线程调用consume1方法。

  1. import java.util.LinkedList;
  2. import java.util.Queue;
  3. import java.util.concurrent.locks.Condition;
  4. import java.util.concurrent.locks.Lock;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. public class TestLock {
  7. public static void main(String[] args) {
  8. Container1 container1 = new Container1();
  9. new Producer1(container1).start();
  10. new Consumer1(container1).start();
  11. }
  12. }
  13. class Container1 {
  14. Queue<Product> queue = new LinkedList<>();
  15. Lock lock = new ReentrantLock();
  16. Condition consumerCondition = lock.newCondition();
  17. Condition producerCondition = lock.newCondition();
  18. public void consume() {
  19. lock.lock();
  20. try {
  21. while (queue.size() == 0) {
  22. consumerCondition.await();
  23. }
  24. Product product = queue.poll();
  25. producerCondition.signal();
  26. System.out.println(Thread.currentThread().getName()+"消费了---第"+product.id+"个产品---");
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. } finally {
  30. lock.unlock();
  31. }
  32. }
  33. public void produce(Product product) {
  34. lock.lock();
  35. try {
  36. while(queue.size()>= 10) {
  37. producerCondition.await();
  38. }
  39. queue.add(product);
  40. System.out.println(Thread.currentThread().getName()+"生产了第"+product.id+"个产品");
  41. consumerCondition.signal();
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. } finally {
  45. lock.unlock();
  46. }
  47. }
  48. }
  49. class Product{
  50. int id ;
  51. public Product(int id){
  52. this.id = id;
  53. }
  54. }
  55. class Consumer1 extends Thread{
  56. Container1 container1;
  57. public Consumer1(Container1 container1){
  58. this.container1 = container1;
  59. }
  60. public void run() {
  61. for (int i = 1; i <= 100; i++) {
  62. container1.consume();
  63. }
  64. }
  65. }
  66. class Producer1 extends Thread{
  67. Container1 container1;
  68. public Producer1(Container1 container1){
  69. this.container1 = container1;
  70. }
  71. public void run() {
  72. for (int i = 1; i <= 100; i++) {
  73. container1.produce(new Product(i));
  74. }
  75. }
  76. }

3. 使用阻塞队列

阻塞队列的特征:
当队列为空时,线程从中取元素会被阻塞并且置于等待状态,直到有线程向其中放入元素;
当队列为满时,向其中放入元素的线程会被阻塞并且置于等待状态,直到有线程取出元素。
使用ArrayBlockingQueue实现,put为放入,take为取出。

  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.BlockingQueue;
  3. public class TestBlockingQueue {
  4. static BlockingQueue<Person> blockingQueue = new ArrayBlockingQueue<>(10);
  5. public static void main(String[] args) {
  6. new Thread(() -> {
  7. for (int i = 1; i < 100; i++) {
  8. try {
  9. blockingQueue.put(new Person(i));
  10. System.out.println("第"+i+"个大佬进入队列排队");
  11. Thread.sleep(10);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. },"Producer").start();
  17. new Thread(() -> {
  18. for (int i = 1; i < 100; i++) {
  19. Person person = null;
  20. try {
  21. person = blockingQueue.take();
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println(Thread.currentThread().getName()+ "-----将第"+person.id+"位大佬带出队列");
  26. }
  27. },"Consumer1").start();
  28. new Thread(() -> {
  29. for (int i = 0; i < 100; i++) {
  30. Person person = null;
  31. try {
  32. person = blockingQueue.take();
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. System.out.println(Thread.currentThread().getName()+ "-----将第"+person.id+"位大佬带出队列");
  37. }
  38. },"Consumer2").start();
  39. new Thread(() -> {
  40. for (int i = 0; i < 100; i++) {
  41. Person person = null;
  42. try {
  43. person = blockingQueue.take();
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. System.out.println(Thread.currentThread().getName()+ "-----将第"+person.id+"位大佬带出队列");
  48. }
  49. },"Consumer3").start();
  50. }
  51. }
  52. class Person{
  53. int id;
  54. public Person(int id){
  55. this.id = id;
  56. }
  57. }

4. 自旋锁+原子类实现

将原子类充当缓冲区数据量的计数器,先更新数据再更新原子类,可以实现生产者与消费者模型。
但由于更新数据和更新原子类计数器是分步操作,所以会出现问题:
存在一个数据但消费者无法消费或队列没有数据但生产者无法生产。
这里理论上可以不用原子类,但使用普通int数作为计数器产生了意料之外的死循环。

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

public class TestCAS {
    static AtomicInteger num = new AtomicInteger(0);
    static Queue<Chicken> queue = new LinkedList<>();
    public static void main(String[] args) {
        new Consumer2().start();
        new Producer2().start();
    }

    static class Producer2 extends Thread {
        public void run() {
            while (true) {
                while (num.get() >= 20) {
                }
                queue.add(new Chicken(1));
                int afterProduce = num.incrementAndGet();
                System.out.println("生产后剩余" + afterProduce + "个产品");
            }
        }
    }

    static class Consumer2 extends Thread {
        public void run() {
            while (true) {
                while (num.get() <= 0) {
                }
                queue.poll();
                int afterConsumer = num.getAndDecrement();
                System.out.println("消费后剩余" + afterConsumer + "个产品");
            }
        }
    }
}