要点
- 与保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
public class TestProducerConsumer { public static void main(String[] args) { MessageQueue messageQueue = new MessageQueue(2); for (int i = 0; i < 4; i++) { int id = i; new Thread(() -> { try { log.debug("download..."); List<String> response = Downloader.download(); log.debug("try put message({})", id); messageQueue.put(new Message(id, response)); } catch (IOException e) { e.printStackTrace(); } }, "生产者" + i).start(); } new Thread(() -> { while (true) { Message message = messageQueue.take(); List<String> response = (List<String>) message.getMessage(); log.debug("take message({}): [{}] lines", message.getId(), response.size()); } }, "消费者").start(); }}class Message { private int id; private Object message; public Message(int id, Object message) { this.id = id; this.message = message; } public int getId() { return id; } public Object getMessage() { return message; }}@Slf4j(topic = "c.MessageQueue")class MessageQueue { private LinkedList<Message> queue; private int capacity; public MessageQueue(int capacity) { this.capacity = capacity; queue = new LinkedList<>(); } public Message take() { synchronized (queue) { while (queue.isEmpty()) { log.debug("没货了, wait"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = queue.removeFirst(); queue.notifyAll(); return message; } } public void put(Message message) { synchronized (queue) { while (queue.size() == capacity) { log.debug("库存已达上限, wait"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(message); queue.notifyAll(); } }}