模式简介

image.png
生产者&消费者模式是线程进进行通信的一种异步通信方式,之所以说它是异步的,是因为生产者生成的数据可以存放到上图中的一个固定大小缓冲区中,而这个缓冲区一般采用“先进先出”的队列来实现,因此生成者put数据之后,消费者不一定可以立刻拿到数据。而上一篇的保护性暂停模式则是一种同步的通信方式,并且只能传递一个对象,而这里的模式可以传递多个对象。我们可以来看一下这个模式具有的一些优点:

  1. 解耦:该模式实现了消费者和生产者之间的解耦,生产者只需要生成数据,不需要像保护性暂停模式那样还要管给谁生成数据;同样,消费者也只需要管消费数据就行了,不需要管这个数据来源与哪里

image.png

  1. 异步:而异步就是上面所结束的一样,无需保证数据一旦产生就必须消费的同步模式

image.png

  1. 平衡生成和消费的速度差:如果消费者消费的很快,就可以多放几个生产者生成数据;同样,如果消费者消费的很慢,少放几个生产者就可以了

image.png

实现代码

  1. @Slf4j(topic = "c.MessageQueue")
  2. public class MessageQueue {
  3. //消息队列,缓冲区
  4. private final LinkedList<Message> queue;
  5. //缓冲区容量
  6. private final int capacity;
  7. public MessageQueue(int capacity) {
  8. this.capacity = capacity;
  9. queue = new LinkedList<>();
  10. }
  11. public Message take() {
  12. synchronized (queue) {
  13. while (queue.isEmpty()) {
  14. log.debug("没货了, wait");
  15. try {
  16. queue.wait();
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. Message message = queue.removeFirst();
  22. //当消息队列容量满的时候,需要唤醒正在等待生成的生产者线程
  23. queue.notifyAll();
  24. return message;
  25. }
  26. }
  27. public void put(Message message) {
  28. synchronized (queue) {
  29. while (queue.size() == capacity) {
  30. log.debug("库存已达上限, wait");
  31. try {
  32. queue.wait();
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. queue.addLast(message);
  38. //当消息队列里面非空时,需要唤醒正在等待消费的消费者线程
  39. queue.notifyAll();
  40. }
  41. }
  42. public static void main(String[] args) {
  43. MessageQueue messageQueue = new MessageQueue(2);
  44. // 4 个生产者线程, 下载任务,返回下载资源response
  45. for (int i = 0; i < 4; i++) {
  46. int id = i;
  47. new Thread(() -> {
  48. try {
  49. log.debug("download...");
  50. //模拟下载资源
  51. int time = id * 1000;
  52. Thread.sleep(time);
  53. String response = "下载的资源,资源标识:" + id;
  54. log.debug("try put message {}", response);
  55. messageQueue.put(new Message(id, response));
  56. } catch (InterruptedException e) {
  57. e.printStackTrace();
  58. }
  59. }, "生产者" + i).start();
  60. }
  61. // 1 个消费者线程, 处理结果
  62. new Thread(() -> {
  63. while (true) {
  64. Message message = messageQueue.take();
  65. String response = (String) message.getMessage();
  66. log.debug("take message( [{}]->[{}] )", message.getId(), response);
  67. }
  68. }, "消费者").start();
  69. }
  70. }
  71. final class Message {
  72. private final int id;
  73. private final Object message;
  74. public Message(int id, Object message) {
  75. this.id = id;
  76. this.message = message;
  77. }
  78. public int getId() {
  79. return id;
  80. }
  81. public Object getMessage() {
  82. return message;
  83. }
  84. }

执行结果:
image.png