概述
生产者消费者模式是一个多线程协作的模式,在这个模式中,一部分线程负责生产数据、一部分线程负责消费数据,通过缓冲区完成生产者与消费者的解耦,缓冲区相当于数据仓库。
实现方式
wait/notify
//数据仓库,缓冲区
public class DataBuffer {
List<String> dataBuffer = new LinkedList<>();
public void add(String data){
dataBuffer.add(data);
}
public String get(){
String data = null;
if(dataBuffer.size() > 0){
data = dataBuffer.get(0);
if(data != null){
dataBuffer.remove(data);
}
}
return data;
}
}
//生产者
public class Producter implements Runnable {
private DataBuffer dataBuffer;
AtomicInteger atomicInteger = new AtomicInteger(0);
public Producter(DataBuffer dataBuffer){
this.dataBuffer = dataBuffer;
}
@Override
public void run() {
while (true){
synchronized (dataBuffer){
atomicInteger.incrementAndGet();
dataBuffer.add("a" + atomicInteger.intValue());
System.out.println("生产数据,并唤醒等待线程");
dataBuffer.notifyAll();
try {
//Thread.sleep(100L);
}catch (Exception e) {
}
}
}
}
}
//消费者
public class Consumer implements Runnable {
private DataBuffer dataBuffer;
public Consumer(DataBuffer dataBuffer){
this.dataBuffer = dataBuffer;
}
@Override
public void run() {
while (true){
synchronized (dataBuffer){
String a = dataBuffer.get();
if(a == null){
try {
dataBuffer.wait();
}catch (Exception e){
e.printStackTrace();
}
}else {
System.out.println("消费的数据为:" + a);
}
}
}
}
}
//测试类
public class TestOne {
public static void main(String[] args){
DataBuffer dataBuffer = new DataBuffer();
new Thread(new Producter(dataBuffer)).start();
new Thread(new Consumer(dataBuffer)).start();
new Thread(new Producter(dataBuffer)).start();
new Thread(new Consumer(dataBuffer)).start();
new Thread(new Producter(dataBuffer)).start();
new Thread(new Consumer(dataBuffer)).start();
}
}
BlockingQueue
//数据缓冲区
public class DataBuffer {
BlockingQueue<String> dataBuffer = new LinkedBlockingQueue<>(20);
public void add(String data){
try {
dataBuffer.put(data);
}catch (Exception e){
}
}
public String get(){
String data = null;
try {
data = dataBuffer.take();
}catch (Exception e){
}
return data;
}
}
//生产者
public class Producter implements Runnable {
private DataBuffer dataBuffer;
AtomicInteger atomicInteger = new AtomicInteger(0);
public Producter(DataBuffer dataBuffer){
this.dataBuffer = dataBuffer;
}
@Override
public void run() {
int b = atomicInteger.incrementAndGet();
dataBuffer.add("a" + b);
System.out.println("生产数据 - a" + b);
}
}
//消费者
public class Consumer implements Runnable {
private DataBuffer dataBuffer;
public Consumer(DataBuffer dataBuffer){
this.dataBuffer = dataBuffer;
}
@Override
public void run() {
while (true){
String a = dataBuffer.get();
System.out.println("消费的数据为:" + a);
}
}
}
//测试类
public class TestTwo {
public static void main(String[] args){
DataBuffer dataBuffer = new DataBuffer();
new Thread(new Producter(dataBuffer)).start();
new Thread(new Consumer(dataBuffer)).start();
new Thread(new Producter(dataBuffer)).start();
new Thread(new Consumer(dataBuffer)).start();
new Thread(new Producter(dataBuffer)).start();
new Thread(new Consumer(dataBuffer)).start();
}
}
ReentrantLock&Condition
public class DataBuffer {
List<String> dataBuffer = new LinkedList<>();
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public Lock getLock(){
return lock;
}
public void add(String data){
dataBuffer.add(data);
}
public String get(){
String data = null;
if(dataBuffer.size() > 0){
data = dataBuffer.get(0);
if(data != null){
dataBuffer.remove(data);
}
}
return data;
}
}
public class Producter implements Runnable {
private DataBuffer dataBuffer;
AtomicInteger atomicInteger = new AtomicInteger(0);
public Producter(DataBuffer dataBuffer){
this.dataBuffer = dataBuffer;
}
@Override
public void run() {
while (true){
Lock lock = dataBuffer.getLock();
try {
lock.lock();
atomicInteger.incrementAndGet();
dataBuffer.add("a" + atomicInteger.intValue());
System.out.println("生产数据,并唤醒等待线程");
dataBuffer.condition.signalAll();
//Thread.sleep(100L);
}catch (Exception e) {
}finally {
lock.unlock();
}
}
}
}
public class Consumer implements Runnable {
private DataBuffer dataBuffer;
public Consumer(DataBuffer dataBuffer){
this.dataBuffer = dataBuffer;
}
@Override
public void run() {
while (true){
Lock lock = dataBuffer.getLock();
try {
lock.lock();
String a = dataBuffer.get();
if(a == null){
try {
dataBuffer.condition.await();
}catch (Exception e){
e.printStackTrace();
}
}else {
System.out.println("消费的数据为:" + a);
}
}catch (Exception e){
}finally {
lock.unlock();
}
}
}
}
public class TestOne {
public static void main(String[] args){
DataBuffer dataBuffer = new DataBuffer();
new Thread(new Producter(dataBuffer)).start();
new Thread(new Consumer(dataBuffer)).start();
new Thread(new Producter(dataBuffer)).start();
new Thread(new Consumer(dataBuffer)).start();
new Thread(new Producter(dataBuffer)).start();
new Thread(new Consumer(dataBuffer)).start();
}
}