使用queue作为线程间通信的方式,并wait()/notifyAll()
package org.example.concurrency.test;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
/**
* @author huskyui
*/
@Slf4j
public class Test21 {
public static void main(String[] args) {
MessageQueue messageQueue = new MessageQueue(2);
for (int i = 0;i<3;i++){
Message message = new Message(i,"message"+i);
new Thread(()->{
messageQueue.put(message);
},"生产者"+i).start();
}
new Thread(()->{
while (true) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
Message message = messageQueue.take();
System.out.println(message);
}
},"消费者").start();
}
}
@Slf4j
class MessageQueue {
private LinkedList<Message> list = new LinkedList<>();
private int capacity;
public MessageQueue(int capacity){
this.capacity = capacity;
}
public Message take(){
synchronized (list){
while (list.isEmpty()){
try {
log.info("当前消息队列消息为空");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = list.removeLast();
log.info("接受消息");
list.notifyAll();
return message;
}
}
public void put(Message message){
synchronized (list){
while (list.size() >= capacity){
try {
log.info("当前队列已满");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.addFirst(message);
log.info("放入信息{}",message);
list.notifyAll();
}
}
}
class Message {
private int id;
private Object value;
public int getId() {
return id;
}
public Object getValue() {
return value;
}
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}