1. 定义
要点
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK 中各种阻塞队列,采用的就是这种模式
- 阻塞队列有:BlockingQueue

BlockingQueue的源码
// 队列项final Object[] items;// 保护所有通道的主锁final ReentrantLock lock;// 等待条件private final Condition notEmpty;// 等待插入锁private final Condition notFull;// 容量int count;// 下一个 put、offer 或 add 的项目索引int putIndex;// 每加入一个的元素,容量加一private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}/**** 构造函数*/public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();// 将数组的容量赋值this.items = new Object[capacity];// fair为 true ,先进先出(FIFO)lock = new ReentrantLock(fair);// 创建两个新的锁notEmpty = lock.newCondition();notFull = lock.newCondition();}public void put(E e) throws InterruptedException {// 检查对象是否为空checkNotNull(e);// 创建lock锁final ReentrantLock lock = this.lock;// 等待锁lock.lockInterruptibly();try {// 该队列容量满了while (count == items.length)// 睡眠notFull.await();enqueue(e);} finally {// 唤醒lock.unlock();}}public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}
示例
@Slf4j(topic = "c.Test21")public class Test21 {public static void main(String[] args) {MessageQueue queue = new MessageQueue(2);for (int i = 0; i < 3; i++) {int id = i;new Thread(() -> {queue.put(new Message(id , "值"+id));}, "生产者" + i).start();}new Thread(() -> {while(true) {sleep(1);Message message = queue.take();}}, "消费者").start();}}// 消息队列类 , java 线程之间通信@Slf4j(topic = "c.MessageQueue")class MessageQueue {// 消息的队列集合private LinkedList<Message> list = new LinkedList<>();// 队列容量private int capcity;public MessageQueue(int capcity) {this.capcity = capcity;}// 获取消息public Message take() {// 检查队列是否为空synchronized (list) {while(list.isEmpty()) {try {log.debug("队列为空, 消费者线程等待");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}// 从队列头部获取消息并返回Message message = list.removeFirst();log.debug("已消费消息 {}", message);list.notifyAll();return message;}}// 存入消息public void put(Message message) {synchronized (list) {// 检查对象是否已满while(list.size() == capcity) {try {log.debug("队列已满, 生产者线程等待");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}// 将消息加入队列尾部list.addLast(message);log.debug("已生产消息 {}", message);list.notifyAll();}}}final class Message {private int id;private Object value;public Message(int id, Object value) {this.id = id;this.value = value;}public int getId() {return id;}public Object getValue() {return value;}@Overridepublic String toString() {return "Message{" +"id=" + id +", value=" + value +'}';}}
结果
22:58:21.875 c.MessageQueue [生产者1] - 已生产消息 Message{id=1, value=值1}22:58:21.878 c.MessageQueue [生产者0] - 已生产消息 Message{id=0, value=值0}22:58:21.878 c.MessageQueue [生产者2] - 队列已满, 生产者线程等待22:58:22.877 c.MessageQueue [消费者] - 已消费消息 Message{id=1, value=值1}22:58:22.877 c.MessageQueue [生产者2] - 已生产消息 Message{id=2, value=值2}22:58:23.891 c.MessageQueue [消费者] - 已消费消息 Message{id=0, value=值0}22:58:24.906 c.MessageQueue [消费者] - 已消费消息 Message{id=2, value=值2}22:58:25.920 c.MessageQueue [消费者] - 队列为空, 消费者线程等待
