模式图

生产者和消费者模式 - 图1

使用BlockingQueue实现生产者和消费者模式

image.png
如上图所示,对于生产者和消费者主要就是包含中类型的对象

  • 生产者
  • 消费者
  • 仓库(阻塞队列)

BlockingQueue就是一个阻塞队列,当使用take方法的时候,如果没有数据,就会阻塞,使用put方法的时候,如果队列已经满了,也会阻塞。实现代码如下:
生产者

  1. public class Producer implements Runnable {
  2. // 阻塞队列
  3. private BlockingQueue<Object> queue;
  4. public Producer(BlockingQueue<Object> queue) {
  5. this.queue = queue;
  6. }
  7. @Override
  8. public void run() {
  9. try {
  10. while (true) {
  11. Thread.sleep(1000);
  12. queue.put(new Date());
  13. System.out.println("生产了一个,共有" + queue.size());
  14. }
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }

消费者

  1. public class Consumer implements Runnable {
  2. private BlockingQueue<Object> queue;
  3. public Consumer(BlockingQueue<Object> blockingQueue) {
  4. this.queue = blockingQueue;
  5. }
  6. @Override
  7. public void run() {
  8. try {
  9. while (true) {
  10. Thread.sleep(1000);
  11. Object take = queue.take();
  12. System.out.println("消费了" + take + "还剩" + queue.size());
  13. }
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }

测试代码

  1. public class ProducerAndConsumer {
  2. public static void main(String[] args) throws InterruptedException {
  3. BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
  4. Producer producer = new Producer(queue);
  5. for (int i = 0; i < 10; i++) {
  6. new Thread(producer).start();
  7. }
  8. Consumer consumer = new Consumer(queue);
  9. for (int i = 0; i < 4; i++) {
  10. new Thread(consumer).start();
  11. }
  12. }
  13. }

使用Condition实现生产者和消费者模式

  1. package ltd.personalstudy.threadbasic;
  2. import java.util.LinkedList;
  3. import java.util.Queue;
  4. import java.util.concurrent.locks.Condition;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. /**
  7. * @Author 咖啡杯里的茶
  8. * @date 2020/12/6
  9. */
  10. public class CustomCondition {
  11. private Queue queue;
  12. private int max = 16;
  13. private ReentrantLock lock = new ReentrantLock();
  14. // 提醒消费者
  15. private Condition notEmpty = lock.newCondition();
  16. // 提醒生产者
  17. private Condition notFull = lock.newCondition();
  18. public CustomCondition(int max) {
  19. this.max = max;
  20. queue = new LinkedList();
  21. }
  22. /**
  23. * 生产者生产产品
  24. */
  25. public void put(Object o) {
  26. lock.lock();
  27. try {
  28. while (queue.size() == max) {
  29. notFull.await();
  30. }
  31. // 生产了一个产品
  32. queue.add(o);
  33. System.out.println("生产了一个" + System.currentTimeMillis());
  34. // 提醒消费者
  35. notEmpty.signalAll();
  36. } catch (InterruptedException e) {
  37. e.printStackTrace();
  38. } finally {
  39. lock.unlock();
  40. }
  41. }
  42. /**
  43. * 消费者
  44. *
  45. * @return
  46. */
  47. public Object take() throws InterruptedException {
  48. lock.lock();
  49. try {
  50. // 这里不使用if进行判断是为了避免虚假唤醒
  51. while (queue.size() == 0) {
  52. notEmpty.await();
  53. }
  54. Object remove = queue.remove();
  55. System.out.println("消费了一个" + System.currentTimeMillis());
  56. notFull.signalAll();
  57. return remove;
  58. } finally {
  59. lock.unlock();
  60. }
  61. }
  62. public static void main(String[] args) {
  63. CustomCondition condition = new CustomCondition(16);
  64. // 创建生产者线程
  65. new Thread(()->{
  66. while (true) {
  67. condition.put(new Object());
  68. }
  69. }).start();
  70. // 创建消费者线程
  71. new Thread(()->{
  72. while (true) {
  73. try {
  74. condition.take();
  75. } catch (InterruptedException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. }).start();
  80. }
  81. }