使用BlockingQueue实现生产者消费者模式
public static void main(String[] args) {BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);// 生产者Runnable producer = () -> {while (true) {try {queue.put(new Object());} catch (InterruptedException e) {e.printStackTrace();}}};new Thread(producer).start();// 消费者Runnable consumer = () -> {while (true) {try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}}};new Thread(consumer).start();}
使用Condition实现生产者消费者模式
运行方法把上述方法的BlockingQueue换成MyBlockingQueueForCondition即可
public class MyBlockingQueueForCondition {private Queue<Object> queue;private int max = 16;private static ReentrantLock lock = new ReentrantLock();private static Condition notEmpty = lock.newCondition();private static Condition notFull = lock.newCondition();public MyBlockingQueueForCondition(int size){this.max = size;queue = new LinkedList<>();}public void put(Object o) throws InterruptedException {lock.lock();try {while (queue.size() == max){notFull.await();}queue.add(o);notEmpty.signalAll();}finally {lock.unlock();}}public Object take() throws InterruptedException {lock.lock();try {while (queue.size() == 0){notEmpty.await();}Object remove = queue.remove();notFull.signalAll();return remove;}finally {lock.unlock();}}}
!> 这里使用while而不使用if的原因是,当线程进入等待状态被唤醒的时候,if会在被唤醒的地方接着执行下面的代码,而while会再次进行条件判断,当不满足条件时会继续进入等待状态。
使用wait/notify实现生产者消费者模式
运行方法同上
public class MyBlockingQueue {private int maxSize;private LinkedList<Object> storage;public MyBlockingQueue(int size) {this.maxSize = size;storage = new LinkedList<>();}public synchronized void put(Object o) throws InterruptedException {while (storage.size() == maxSize) {wait();}storage.add(o);notifyAll();}public synchronized Object take() throws InterruptedException {while (storage.size() == 0){wait();}Object o = storage.remove();notifyAll();return o;}}
