1. 定义

要点

  • 与前面的保护性暂停中的 GuardObject不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

image.png

2. 实现

消息类(存放列表中的消息类)

  1. //消息类
  2. class Message {
  3. private int id;
  4. private Object message;
  5. public Message(int id, Object message) {
  6. this.id = id;
  7. this.message = message;
  8. }
  9. public int getId() {
  10. return id;
  11. }
  12. public Object getMessage() {
  13. return message;
  14. }
  15. }

消息队列类

//消息队列类, java线程之间通信
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
    //消息的队列集合
    private LinkedList<Message> queue;
    //队列容量
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }

    //取出消息
    public Message take() {
        synchronized (queue) {
            //1.检查对象是否为空
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    //为空,则等待
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //2.有消息,则从队列同步获取消息并返回
            Message message = queue.removeFirst();
            //3.已经消费表示队列有空出的位置, 通知生产者来生成
            queue.notifyAll();
            return message;
        }
    }

    //存入消息
    public void put(Message message) {
        synchronized (queue) {
            //1.判断队列是否满了
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    //已满,则等待
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //2.往队列添加消息
            queue.addLast(message);
            //3.队列有消息, 通知消费者来消费
            queue.notifyAll();
        }
    }
}

测试

public static void main(String[] args) {
    MessageQueue messageQueue = new MessageQueue(2);
    //生产者
    for (int i = 0; i < 4; i++) {
        int id = i;
        new Thread(() -> {
            try {
                log.debug("download...");
                List<String> response = Downloader.download();
                log.debug("try put message({})", id);
                messageQueue.put(new Message(id, response));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }, "生产者" + i).start();
    }

    //消费者
    new Thread(() -> {
        while (true) {
            Message message = messageQueue.take();
            List<String> response = (List<String>)message.getMessage();
            log.debug("take message({}): [{}] lines", message.getId(), response.size());
        }

    }, "消费者").start();
}

运行结果

14:18:37.047 [生产者1]  c.TestProducerConsumer - download...
14:18:37.047 [生产者0]  c.TestProducerConsumer - download...
14:18:37.054 [消费者]  c.MessageQueue - 没货了, wait
14:18:37.054 [生产者3]  c.TestProducerConsumer - download...
14:18:37.054 [生产者2]  c.TestProducerConsumer - download...
14:18:37.990 [生产者0]  c.TestProducerConsumer - try put message(0)
14:18:37.990 [生产者3]  c.TestProducerConsumer - try put message(3)
14:18:37.990 [生产者1]  c.TestProducerConsumer - try put message(1)
14:18:37.992 [生产者1]  c.MessageQueue - 库存已达上限, wait
14:18:37.990 [生产者2]  c.TestProducerConsumer - try put message(2)
14:18:37.993 [生产者2]  c.MessageQueue - 库存已达上限, wait
14:18:37.994 [生产者1]  c.MessageQueue - 库存已达上限, wait
14:18:37.994 [消费者]  c.TestProducerConsumer - take message(0): [3] lines
14:18:37.994 [消费者]  c.TestProducerConsumer - take message(3): [3] lines
14:18:37.994 [消费者]  c.TestProducerConsumer - take message(2): [3] lines
14:18:37.994 [消费者]  c.TestProducerConsumer - take message(1): [3] lines
14:18:37.994 [消费者]  c.MessageQueue - 没货了, wait