使用BlockingQueue实现生产者消费者模式

  1. public static void main(String[] args) {
  2. BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
  3. // 生产者
  4. Runnable producer = () -> {
  5. while (true) {
  6. try {
  7. queue.put(new Object());
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. };
  13. new Thread(producer).start();
  14. // 消费者
  15. Runnable consumer = () -> {
  16. while (true) {
  17. try {
  18. queue.take();
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. };
  24. new Thread(consumer).start();
  25. }

使用Condition实现生产者消费者模式

运行方法把上述方法的BlockingQueue换成MyBlockingQueueForCondition即可

  1. public class MyBlockingQueueForCondition {
  2. private Queue<Object> queue;
  3. private int max = 16;
  4. private static ReentrantLock lock = new ReentrantLock();
  5. private static Condition notEmpty = lock.newCondition();
  6. private static Condition notFull = lock.newCondition();
  7. public MyBlockingQueueForCondition(int size){
  8. this.max = size;
  9. queue = new LinkedList<>();
  10. }
  11. public void put(Object o) throws InterruptedException {
  12. lock.lock();
  13. try {
  14. while (queue.size() == max){
  15. notFull.await();
  16. }
  17. queue.add(o);
  18. notEmpty.signalAll();
  19. }finally {
  20. lock.unlock();
  21. }
  22. }
  23. public Object take() throws InterruptedException {
  24. lock.lock();
  25. try {
  26. while (queue.size() == 0){
  27. notEmpty.await();
  28. }
  29. Object remove = queue.remove();
  30. notFull.signalAll();
  31. return remove;
  32. }finally {
  33. lock.unlock();
  34. }
  35. }
  36. }

!> 这里使用while而不使用if的原因是,当线程进入等待状态被唤醒的时候,if会在被唤醒的地方接着执行下面的代码,而while会再次进行条件判断,当不满足条件时会继续进入等待状态。

使用wait/notify实现生产者消费者模式

运行方法同上

  1. public class MyBlockingQueue {
  2. private int maxSize;
  3. private LinkedList<Object> storage;
  4. public MyBlockingQueue(int size) {
  5. this.maxSize = size;
  6. storage = new LinkedList<>();
  7. }
  8. public synchronized void put(Object o) throws InterruptedException {
  9. while (storage.size() == maxSize) {
  10. wait();
  11. }
  12. storage.add(o);
  13. notifyAll();
  14. }
  15. public synchronized Object take() throws InterruptedException {
  16. while (storage.size() == 0){
  17. wait();
  18. }
  19. Object o = storage.remove();
  20. notifyAll();
  21. return o;
  22. }
  23. }