使用BlockingQueue实现生产者消费者模式
public static void main(String[] args) {
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
// 生产者
Runnable producer = () -> {
while (true) {
try {
queue.put(new Object());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(producer).start();
// 消费者
Runnable consumer = () -> {
while (true) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
new Thread(consumer).start();
}
使用Condition实现生产者消费者模式
运行方法把上述方法的BlockingQueue
换成MyBlockingQueueForCondition
即可
public class MyBlockingQueueForCondition {
private Queue<Object> queue;
private int max = 16;
private static ReentrantLock lock = new ReentrantLock();
private static Condition notEmpty = lock.newCondition();
private static Condition notFull = lock.newCondition();
public MyBlockingQueueForCondition(int size){
this.max = size;
queue = new LinkedList<>();
}
public void put(Object o) throws InterruptedException {
lock.lock();
try {
while (queue.size() == max){
notFull.await();
}
queue.add(o);
notEmpty.signalAll();
}finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (queue.size() == 0){
notEmpty.await();
}
Object remove = queue.remove();
notFull.signalAll();
return remove;
}finally {
lock.unlock();
}
}
}
!> 这里使用while而不使用if的原因是,当线程进入等待状态被唤醒的时候,if会在被唤醒的地方接着执行下面的代码,而while会再次进行条件判断,当不满足条件时会继续进入等待状态。
使用wait/notify实现生产者消费者模式
运行方法同上
public class MyBlockingQueue {
private int maxSize;
private LinkedList<Object> storage;
public MyBlockingQueue(int size) {
this.maxSize = size;
storage = new LinkedList<>();
}
public synchronized void put(Object o) throws InterruptedException {
while (storage.size() == maxSize) {
wait();
}
storage.add(o);
notifyAll();
}
public synchronized Object take() throws InterruptedException {
while (storage.size() == 0){
wait();
}
Object o = storage.remove();
notifyAll();
return o;
}
}