概述
生产者消费者模式是一个多线程协作的模式,在这个模式中,一部分线程负责生产数据、一部分线程负责消费数据,通过缓冲区完成生产者与消费者的解耦,缓冲区相当于数据仓库。
实现方式
wait/notify
//数据仓库,缓冲区public class DataBuffer {List<String> dataBuffer = new LinkedList<>();public void add(String data){dataBuffer.add(data);}public String get(){String data = null;if(dataBuffer.size() > 0){data = dataBuffer.get(0);if(data != null){dataBuffer.remove(data);}}return data;}}//生产者public class Producter implements Runnable {private DataBuffer dataBuffer;AtomicInteger atomicInteger = new AtomicInteger(0);public Producter(DataBuffer dataBuffer){this.dataBuffer = dataBuffer;}@Overridepublic void run() {while (true){synchronized (dataBuffer){atomicInteger.incrementAndGet();dataBuffer.add("a" + atomicInteger.intValue());System.out.println("生产数据,并唤醒等待线程");dataBuffer.notifyAll();try {//Thread.sleep(100L);}catch (Exception e) {}}}}}//消费者public class Consumer implements Runnable {private DataBuffer dataBuffer;public Consumer(DataBuffer dataBuffer){this.dataBuffer = dataBuffer;}@Overridepublic void run() {while (true){synchronized (dataBuffer){String a = dataBuffer.get();if(a == null){try {dataBuffer.wait();}catch (Exception e){e.printStackTrace();}}else {System.out.println("消费的数据为:" + a);}}}}}//测试类public class TestOne {public static void main(String[] args){DataBuffer dataBuffer = new DataBuffer();new Thread(new Producter(dataBuffer)).start();new Thread(new Consumer(dataBuffer)).start();new Thread(new Producter(dataBuffer)).start();new Thread(new Consumer(dataBuffer)).start();new Thread(new Producter(dataBuffer)).start();new Thread(new Consumer(dataBuffer)).start();}}
BlockingQueue
//数据缓冲区public class DataBuffer {BlockingQueue<String> dataBuffer = new LinkedBlockingQueue<>(20);public void add(String data){try {dataBuffer.put(data);}catch (Exception e){}}public String get(){String data = null;try {data = dataBuffer.take();}catch (Exception e){}return data;}}//生产者public class Producter implements Runnable {private DataBuffer dataBuffer;AtomicInteger atomicInteger = new AtomicInteger(0);public Producter(DataBuffer dataBuffer){this.dataBuffer = dataBuffer;}@Overridepublic void run() {int b = atomicInteger.incrementAndGet();dataBuffer.add("a" + b);System.out.println("生产数据 - a" + b);}}//消费者public class Consumer implements Runnable {private DataBuffer dataBuffer;public Consumer(DataBuffer dataBuffer){this.dataBuffer = dataBuffer;}@Overridepublic void run() {while (true){String a = dataBuffer.get();System.out.println("消费的数据为:" + a);}}}//测试类public class TestTwo {public static void main(String[] args){DataBuffer dataBuffer = new DataBuffer();new Thread(new Producter(dataBuffer)).start();new Thread(new Consumer(dataBuffer)).start();new Thread(new Producter(dataBuffer)).start();new Thread(new Consumer(dataBuffer)).start();new Thread(new Producter(dataBuffer)).start();new Thread(new Consumer(dataBuffer)).start();}}
ReentrantLock&Condition
public class DataBuffer {List<String> dataBuffer = new LinkedList<>();Lock lock = new ReentrantLock();Condition condition = lock.newCondition();public Lock getLock(){return lock;}public void add(String data){dataBuffer.add(data);}public String get(){String data = null;if(dataBuffer.size() > 0){data = dataBuffer.get(0);if(data != null){dataBuffer.remove(data);}}return data;}}public class Producter implements Runnable {private DataBuffer dataBuffer;AtomicInteger atomicInteger = new AtomicInteger(0);public Producter(DataBuffer dataBuffer){this.dataBuffer = dataBuffer;}@Overridepublic void run() {while (true){Lock lock = dataBuffer.getLock();try {lock.lock();atomicInteger.incrementAndGet();dataBuffer.add("a" + atomicInteger.intValue());System.out.println("生产数据,并唤醒等待线程");dataBuffer.condition.signalAll();//Thread.sleep(100L);}catch (Exception e) {}finally {lock.unlock();}}}}public class Consumer implements Runnable {private DataBuffer dataBuffer;public Consumer(DataBuffer dataBuffer){this.dataBuffer = dataBuffer;}@Overridepublic void run() {while (true){Lock lock = dataBuffer.getLock();try {lock.lock();String a = dataBuffer.get();if(a == null){try {dataBuffer.condition.await();}catch (Exception e){e.printStackTrace();}}else {System.out.println("消费的数据为:" + a);}}catch (Exception e){}finally {lock.unlock();}}}}public class TestOne {public static void main(String[] args){DataBuffer dataBuffer = new DataBuffer();new Thread(new Producter(dataBuffer)).start();new Thread(new Consumer(dataBuffer)).start();new Thread(new Producter(dataBuffer)).start();new Thread(new Consumer(dataBuffer)).start();new Thread(new Producter(dataBuffer)).start();new Thread(new Consumer(dataBuffer)).start();}}
