定义:

  • 与前面的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK中各种阻塞队列,采用的就是这种模式
  • 是一种异步模式,之所以称保护性暂停为同步模式,是因为一个线程产生结果,另一个线程会立刻获得结果。而生产者/消费者模式是异步模式的原因是,生产线程产生结果后会将结果放入消息队列中,消费者需要时才会从中取出结果。

    image.png

实现:

要熟悉这种多线程编程思想,方法上加synchronized,实际上锁的就是调用该方法的对象,这样实现多个线程调用一个共享的锁对象时,可以同步。
这里默认为,当多个线程调用类中的方法时,使用的是该类的同一个实例对象
对list、this上锁没有区别,因为list是成员变量,一个对象对应一个成员变量,只是粒度的区别。

  1. import java.util.LinkedList;
  2. public class Test21 {
  3. }
  4. class MessageQueue{
  5. //容器
  6. private LinkedList<Message> list = new LinkedList<>();
  7. //容器的容量
  8. private int capcity;
  9. public MessageQueue(int capcity){
  10. this.capcity=capcity;//容量通过构造方法传入
  11. }
  12. //获取消息
  13. public Message take() throws InterruptedException {
  14. //检查队列是否为空
  15. synchronized (list){
  16. while(list.isEmpty()){
  17. list.wait();
  18. }
  19. //从队列的头部获取元素返回
  20. Message message = list.removeFirst();
  21. list.notifyAll();
  22. return message;
  23. }
  24. }
  25. //存入消息
  26. public void put(Message message) throws InterruptedException {
  27. synchronized (list){
  28. //检查队列是否已满
  29. while(list.size()==capcity){
  30. list.wait();
  31. }
  32. //将新的消息加入队列尾部
  33. list.addLast(message);
  34. list.notifyAll();//唤醒线程
  35. }
  36. }
  37. }
  38. //消息类,作为消息队列中存储的对象,需要记录对象的id值以及真实值,可以看到该类是线程安全类,因为只涉及到对id值的读,没有写
  39. class Message{
  40. private int id;
  41. private Object value;
  42. public Message(int id, Object value) {
  43. this.id = id;
  44. this.value = value;
  45. }
  46. public int getId(){
  47. return id;
  48. }
  49. public Object getValue(){
  50. return value;
  51. }
  52. @Override
  53. public String toString() {
  54. return super.toString();
  55. }
  56. }

注意Message类内的方法只是对成员变量作读操作,而没有取操作,所以不存在线程安全问题,类是线程安全类。

测试:

  1. public static void main(String[] args) throws InterruptedException {
  2. MessageQueue queue = new MessageQueue(2);//容量为2,接下来多个线程共享一个MessageQueue对象
  3. for(int i=0;i<3;i++){
  4. int id=i;
  5. new Thread(()->{
  6. try {
  7. queue.put(new Message(id,"值"+id));
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. },"生产者"+i).start();
  12. }
  13. new Thread(()->{
  14. while(true){
  15. try {
  16. Thread.sleep(1000);
  17. Message message = queue.take();
  18. System.out.println("message内容"+message);
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. },"消费者").start();
  24. }