实现生产者和消费者
1. wait() / notifyAll()
public class ProducerConsumer {
private static class Producer implements Runnable {
private List<Integer> list;
private int capacity;
public Producer(List list, int capacity) {
this.list = list;
this.capacity = capacity;
}
@Override
public void run() {
while (true) {
synchronized (list) {
try {
String producer = Thread.currentThread().getName();
while (list.size() == capacity) {
System.out.println("生产者 " + producer + ":list 已达到最大容量,进行wait");
list.wait();
System.out.println("生产者 " + producer + ":退出 wait");
}
Random random = new Random();
int i = random.nextInt();
System.out.println("生产者 " + producer + ":生产数据" + i);
list.add(i);
list.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
private static class Consumer implements Runnable {
private List<Integer> list;
public Consumer(List list) {
this.list = list;
}
@Override
public void run() {
while (true) {
synchronized (list) {
try {
String consumer = Thread.currentThread().getName();
while (list.isEmpty()) {
System.out.println("消费者 " + consumer + ":list 为空,进行 wait");
list.wait();
System.out.println("消费者 " + consumer + ":退出wait");
}
Integer element = list.remove(0);
System.out.println("消费者 " + consumer + ":消费数据:" + element);
list.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) {
final LinkedList linkedList = new LinkedList();
final int capacity = 5;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(new Producer(linkedList, capacity));
executor.execute(new Consumer(linkedList));
executor.shutdown();
}
}
2. await() / sigalAll()
public class ProducerConsumer {
private static ReentrantLock lock = new ReentrantLock();
private static Condition full = lock.newCondition();
private static Condition empty = lock.newCondition();
private static class Producer implements Runnable{
private List<Integer> list;
private int capacity;
public Producer(List list, int capacity) {
this.list = list;
this.capacity = capacity;
}
@Override
public void run() {
while (true){
lock.lock();
try {
String producer = Thread.currentThread().getName();
while (list.size() == capacity){
System.out.println("生产者 " + producer + ":list 已达到最大容量,进行wait");
full.await();
System.out.println("生产者 " + producer + ":退出 wait");
}
Random random = new Random();
int i = random.nextInt();
System.out.println("生产者 " + producer + ":生产数据" + i);
list.add(i);
empty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
private static class Consumer implements Runnable{
private List<Integer> list;
public Consumer(List list) {
this.list = list;
}
@Override
public void run() {
while (true) {
lock.lock();
try {
String consumer = Thread.currentThread().getName();
while (list.isEmpty()) {
System.out.println("消费者 " + consumer + ":list 为空,进行 wait");
empty.await();
System.out.println("消费者 " + consumer + ":退出wait");
}
Integer element = list.remove(0);
System.out.println("消费者 " + consumer + ":消费数据:" + element);
full.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
final LinkedList linkedList = new LinkedList();
final int capacity = 5;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(new Producer(linkedList,capacity));
executor.execute(new Consumer(linkedList));
}
}
3. 阻塞队列
public class ProducerConsumer {
private static class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
String producer = Thread.currentThread().getName();
Random random = new Random();
int i = random.nextInt();
System.out.println("生产者 " + producer + ":生产数据" + i);
queue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
String consumer = Thread.currentThread().getName();
Integer element = queue.take();
System.out.println("消费者 " + consumer + ":消费数据:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(new Producer(queue));
executor.execute(new Consumer(queue));
executor.shutdown();
}
}