模式图
使用BlockingQueue实现生产者和消费者模式
如上图所示,对于生产者和消费者主要就是包含中类型的对象
- 生产者
- 消费者
- 仓库(阻塞队列)
BlockingQueue
就是一个阻塞队列,当使用take
方法的时候,如果没有数据,就会阻塞,使用put
方法的时候,如果队列已经满了,也会阻塞。实现代码如下:
生产者
public class Producer implements Runnable {
// 阻塞队列
private BlockingQueue<Object> queue;
public Producer(BlockingQueue<Object> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(1000);
queue.put(new Date());
System.out.println("生产了一个,共有" + queue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者
public class Consumer implements Runnable {
private BlockingQueue<Object> queue;
public Consumer(BlockingQueue<Object> blockingQueue) {
this.queue = blockingQueue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(1000);
Object take = queue.take();
System.out.println("消费了" + take + "还剩" + queue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试代码
public class ProducerAndConsumer {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
Producer producer = new Producer(queue);
for (int i = 0; i < 10; i++) {
new Thread(producer).start();
}
Consumer consumer = new Consumer(queue);
for (int i = 0; i < 4; i++) {
new Thread(consumer).start();
}
}
}
使用Condition实现生产者和消费者模式
package ltd.personalstudy.threadbasic;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Author 咖啡杯里的茶
* @date 2020/12/6
*/
public class CustomCondition {
private Queue queue;
private int max = 16;
private ReentrantLock lock = new ReentrantLock();
// 提醒消费者
private Condition notEmpty = lock.newCondition();
// 提醒生产者
private Condition notFull = lock.newCondition();
public CustomCondition(int max) {
this.max = max;
queue = new LinkedList();
}
/**
* 生产者生产产品
*/
public void put(Object o) {
lock.lock();
try {
while (queue.size() == max) {
notFull.await();
}
// 生产了一个产品
queue.add(o);
System.out.println("生产了一个" + System.currentTimeMillis());
// 提醒消费者
notEmpty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 消费者
*
* @return
*/
public Object take() throws InterruptedException {
lock.lock();
try {
// 这里不使用if进行判断是为了避免虚假唤醒
while (queue.size() == 0) {
notEmpty.await();
}
Object remove = queue.remove();
System.out.println("消费了一个" + System.currentTimeMillis());
notFull.signalAll();
return remove;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
CustomCondition condition = new CustomCondition(16);
// 创建生产者线程
new Thread(()->{
while (true) {
condition.put(new Object());
}
}).start();
// 创建消费者线程
new Thread(()->{
while (true) {
try {
condition.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}