image.png

要点

  • 与保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式
  1. public class TestProducerConsumer {
  2. public static void main(String[] args) {
  3. MessageQueue messageQueue = new MessageQueue(2);
  4. for (int i = 0; i < 4; i++) {
  5. int id = i;
  6. new Thread(() -> {
  7. try {
  8. log.debug("download...");
  9. List<String> response = Downloader.download();
  10. log.debug("try put message({})", id);
  11. messageQueue.put(new Message(id, response));
  12. } catch (IOException e) {
  13. e.printStackTrace();
  14. }
  15. }, "生产者" + i).start();
  16. }
  17. new Thread(() -> {
  18. while (true) {
  19. Message message = messageQueue.take();
  20. List<String> response = (List<String>) message.getMessage();
  21. log.debug("take message({}): [{}] lines", message.getId(), response.size());
  22. }
  23. }, "消费者").start();
  24. }
  25. }
  26. class Message {
  27. private int id;
  28. private Object message;
  29. public Message(int id, Object message) {
  30. this.id = id;
  31. this.message = message;
  32. }
  33. public int getId() {
  34. return id;
  35. }
  36. public Object getMessage() {
  37. return message;
  38. }
  39. }
  40. @Slf4j(topic = "c.MessageQueue")
  41. class MessageQueue {
  42. private LinkedList<Message> queue;
  43. private int capacity;
  44. public MessageQueue(int capacity) {
  45. this.capacity = capacity;
  46. queue = new LinkedList<>();
  47. }
  48. public Message take() {
  49. synchronized (queue) {
  50. while (queue.isEmpty()) {
  51. log.debug("没货了, wait");
  52. try {
  53. queue.wait();
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. Message message = queue.removeFirst();
  59. queue.notifyAll();
  60. return message;
  61. }
  62. }
  63. public void put(Message message) {
  64. synchronized (queue) {
  65. while (queue.size() == capacity) {
  66. log.debug("库存已达上限, wait");
  67. try {
  68. queue.wait();
  69. } catch (InterruptedException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. queue.addLast(message);
  74. queue.notifyAll();
  75. }
  76. }
  77. }