1 概念

什么是消息队列?

基于生产者消费者模型,即生产一个消费一个,无生产则无消费,有消费无生产则等待;

采用队列数据结构,先进先出,和生活中排队一致;

为什么消息要形成队列?

计算机处理单条消息的事件始终受限,微观上来看本就逐条处理,但宏观上同时到达的任务会导致并发困难以至于积压任务;

消息队列就是对任务形成一种逻辑处理顺序,不再是无需的任务处理请求,按照先来先出来的原则,逐次处理任务;

由于微观上的逐次处理和多任务导致宏观上的并发丧失,消息队列提供了最坏的时间长度和一般的逻辑处理,计算机不再随机或者说遭遇堵塞;

对于许多任务并非要求实时性,比如大V的微博发布,对于在线用户可以直接推,但对于离线用户可以存储在消息队列,等用户上线后再推送,

对于服务器而言,减少了并发处理的任务量,对于用户而言,基本感觉不到消息的延迟,而对于推荐而言,公共区域的推送直接节省了多个用户推送的数据量,展示更加直观;

代码模板

  1. # -*- coding: UTF-8 -*-
  2. """
  3. @author:41999
  4. @file:队列.py
  5. @time:2021/10/22
  6. """
  7. import queue
  8. q = queue.Queue(maxsize=100)
  9. q.put(123)
  10. q.put(456)
  11. q.put(789)
  12. print(q.get())
  13. print(q.get())
  14. print(q.get(block=False))
  15. # block用于关闭阻塞时读取操作的挂起

为什么使用消息队列?

简介

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

应用场景—外卖系统

外卖订单的数据模型:用户,时间,店铺,金额

处理流程:用户完成付款,订单系统负责提供信息,分为四个对象,

  1. 对用户而言,告知交易成功;
  2. 对骑手系统而言,告知新的任务已产生,等待抢单,
  3. 对于产品存储系统而言,告知其库存减量,
  4. 对于商家系统而言,扣除前端库存,并且告知交易产生

通知的方式:链式处理,骑手系统 —> 产品存储系统 —> 商家系统 —> 用户[完整走完四个部件,数据却可能丢失]

  1. 传统业务一对一,结算系统开启一个接口,逐次通知四个系统;
  2. 消息队列的方式:开放四个接口,被四个系统监听,并且形成一个计数器,成功读取四次后自动销毁数据,等待下一个订单的生产;
  3. 5. 消息队列充当了一个中间层,左侧是订单系统,右侧分别是骑手系统,产品库存系统,商家系统和用户,右侧四者分别订阅消息,左侧订单的变动会实时推送给右侧四个主体;

评价方式[内聚耦合]

低内聚高耦合,模块相对独立;

RabbitMQ

简介:

  1. 分为简单模式和交换机模式两种
  2. 交换机模式分为:发布订阅;关键字匹配模式;模糊匹配模式
  3. 简单模式
  4. 生产者:链接服务器;创建队列;将消息插入队列;
  5. 消费者:链接服务器;创建消息队列;

生产者代码

  1. # -*- coding: UTF-8 -*-
  2. """
  3. @author:41999
  4. @file:01producer.py
  5. @time:2021/10/27
  6. """
  7. import pika
  8. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  9. channel = connection.channel()
  10. channel.queue_declare(queue='hello')
  11. # 简单模式,交换机参数为空,第二个参数是指定队列
  12. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
  13. print("[x] Sent 'Hello world!'")

消费者

  1. # -*- coding: UTF-8 -*-
  2. """
  3. @author:41999
  4. @file:02consumer.py
  5. @time:2021/10/27
  6. """
  7. import pika
  8. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  9. channel = connection.channel()
  10. # 创建消息队列
  11. channel.queue_declare(queue='hello')
  12. # 确定回调函数
  13. def callback(ch, mnethod, properties, body):
  14. print("[x] Received %r" % body)
  15. # 监听队列 auto_ack 默认应答
  16. channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback)
  17. print("[*] Waiting for messages. To exit press CTRL + C")
  18. # 正式开始监听
  19. channel.start_consuming()

最终效果

消息队列 - 图1

参数使用

应答参数

应用场景

  1. 1. 应答参数所属的函数以及消息接收阶段
  2. 1. 属于消费者模块的函数channel.basic_consume;属于最后的确定监听队列阶段,在确定回调函数之后
  3. 2. 存在的意义
  4. 1. 默认应答则在生产一个消息后消费一个消息
  5. 2. 当生产者生产一个消息,消费者创建连接--->创建队列--->创建回调函数时,程序接收但意外终止
  6. 四个步骤并没有完全走完,那么数据就会丢失[原子操作]
  7. 3. 解决策略:将应答参数改为False,手动应答

代码

  1. ===========================producer==================================================
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  4. channel = connection.channel()
  5. channel.queue_declare(queue='hello')
  6. # 简单模式,交换机参数为空,第二个参数是指定队列
  7. channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
  8. print("[x] Sent 'Hello world!'")
  9. ===========================consumer=================================================
  10. # 确定回调函数
  11. def callback(ch, mnethod, properties, body):
  12. print("[x] Received %r" % body)
  13. # 监听队列 auto_ack 默认应答
  14. channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback)
  15. print("[*] Waiting for messages. To exit press CTRL + C")
  16. # 正式开始监听
  17. channel.start_consuming()

最终效果

消息队列 - 图2

持久化

应用场景

为什么设置这个参数?

RabbitMQ可能会意外停止,而持久化可以保证即使程序重启,消费者队列依然可以接收到生产者的产品

代码

  1. =====================================producer.py=====================================
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  4. channel = connection.channel()
  5. channel.queue_declare(queue='hello', durable=True, passive=True)
  6. # 简单模式,交换机参数为空,第二个参数是指定队列
  7. channel.basic_publish(exchange='',
  8. routing_key='hello',
  9. properties=pika.BasicProperties(delivery_mode=2),
  10. body='Hello World!')
  11. print("[x] Sent 'Hello world!'")
  12. =====================================consumer.py=====================================
  13. import pika
  14. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  15. channel = connection.channel()
  16. # 创建消息队列
  17. channel.queue_declare(queue='hello', durable=True, passive=True)
  18. # 确定回调函数
  19. def callback(ch, mnethod, properties, body):
  20. print("[x] Received %r" % body)
  21. # ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消费者取得数据后,消费者在队列中删除产品
  22. # 监听队列 auto_ack 默认应答
  23. channel.basic_consume(queue='hello',
  24. auto_ack=True,
  25. on_message_callback=callback)
  26. print("[*] Waiting for messages. To exit press CTRL + C")
  27. # 正式开始监听
  28. channel.start_consuming()

报错:pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false'")

解决

  1. 在队列建立阶段中中新增passive参数,并且设置为True。生产者与消费者均需要设置,且均为持久化
  2. 参考文档:
  3. https://www.cnblogs.com/gangzi4321/p/11001497.html

订阅者模式

场景

生产者通过交换机发布消息,多个消费者建立自己的队列从交换机读取同一份信息,类似于村里的广播

代码

  1. =======================================producer.py====================================
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  4. channel = connection.channel()
  5. # 声明一个名为long的,交换机类型为fanout的交换机
  6. message = "Info:20211027"
  7. channel.basic_publish(exchange='logs',
  8. exchange_type='fanout',
  9. routing_key='',
  10. body=message)
  11. print("[x] Sent % r" % message)
  12. connection.close()
  13. =======================================consumer.py====================================
  14. import pika
  15. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  16. channel = connection.channel()
  17. # 声明一个名为log类型为fanout的交换机
  18. channel.exchange_declare(exchange='logs',
  19. exchange_type='fanout')
  20. # 创建队列
  21. result = channel.queue_declare("", exclusive=True)
  22. queue_name = result.method.queue
  23. print(queue_name)
  24. # 将队列和交换机绑定
  25. channel.queue_bind(exchange='logs',
  26. queue=queue_name)
  27. print('[*] Waiting for logs. To exit press CTRL + C')
  28. def callback(ch, method, properties, body):
  29. print("[X] % r" % body)
  30. # 将消费者的队列与交换机绑定
  31. channel.basic_consume(queue=queue_name,
  32. auto_ack=True,
  33. on_message_callback=callback)
  34. channel.start_consuming()

公平分发

简介

  1. 1. 缺陷
  2. 我们提供多个消费者,目的就是为了提高系统的性能,提升系统处理任务的速度;
  3. 如果将消息平均的分发给每个消费者,那么处理消息快的服务是不是会空闲下来;
  4. 而处理慢的服务可能会阻塞等待处理,这样的场景是我们不愿意看到的;
  5. 所以有了今天要说的分发模式,公平分发。
  6. 2. 能者多劳
  7. 所谓的公平分发,其实用能者多劳描述更为贴切,
  8. 根据名字就可以知道,谁有能力处理更多的任务,那么就交给谁处理,防止消息的挤压。
  9. 3. 前提
  10. 将自动应答改为手动应答

代码

  1. ===================================================producer.py===================================================
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  4. channel = connection.channel()
  5. channel.queue_declare(queue='hello')
  6. # 简单模式,交换机参数为空,第二个参数是指定队列
  7. channel.basic_publish(exchange='',
  8. routing_key='hello',
  9. body='222')
  10. print("[x] Sent 'Hello world!'")
  11. ===================================================consumer.py===================================================
  12. import pika,time
  13. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  14. channel = connection.channel()
  15. # 创建消息队列
  16. channel.queue_declare(queue='hello')
  17. # 确定回调函数
  18. def callback(ch, mnethod, properties, body):
  19. time.sleep(5)
  20. print("[x] Received %r" % body)
  21. # ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消费者取得数据后,消费者在队列中删除产品
  22. # 公平分发---能者多劳
  23. channel.basic_qos(prefetch_count=1)
  24. # 监听队列 auto_ack 默认应答
  25. channel.basic_consume(queue='hello',
  26. auto_ack=False,
  27. on_message_callback=callback)
  28. print("[*] Waiting for messages. To exit press CTRL + C")
  29. # 正式开始监听
  30. channel.start_consuming()

问题

  1. 1. 出现的问题
  2. 1. 本次新增的参数为:channel.basic_qos(prefetch_count=1);
  3. 2. 想要实现公平分配,即工作效率高,快速回复的对象接收最多的消息
  4. 2. 实验方法
  5. 1. 消费者分三次发送消息,间隔看手速,三个消费者持续监听同一个队列;
  6. 2. 三个消费者的睡眠时间分别为5,10,15
  7. 3. 预期实现的效果:睡眠时间最短的消费者收到最多的消息,也就是三条;
  8. 4. 实际运行的效果:三个消费者收到的消息平摊,每人一条。

交换机模式之关键字模式

简介

发布订阅相当于群发,而关键字模式类似于私发,前者一对多,后者一对一

测试方法

对于生产者而言,只需要更改参数消息发布中的routine_key和exchange_type=’direct’ ,对于消费者,需要修改exchange_type 和 routine_key前者修改模式,后者指定关键字

一个生产者对应三个消费者

  1. ===================================================producer.py====================================================
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  4. channel = connection.channel()
  5. # 声明一个名为long的,交换机类型为fanout的交换机
  6. message = "Info:20211029"
  7. channel.basic_publish(exchange='logs',
  8. exchange_type='direct'
  9. routing_key='info',
  10. body=message)
  11. print("[x] Sent % r" % message)
  12. connection.close()
  13. ===================================================consumer1.py===================================================
  14. import pika
  15. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  16. channel = connection.channel()
  17. # 声明一个名为log类型为fanout的交换机
  18. channel.exchange_declare(exchange='logs',
  19. exchange_type='direct')
  20. # 创建队列
  21. result = channel.queue_declare("", exclusive=True)
  22. queue_name = result.method.queue
  23. print(queue_name)
  24. # 将队列和交换机绑定
  25. channel.queue_bind(exchange='logs',
  26. queue=queue_name,
  27. routing_key='error',
  28. )
  29. print('[*] Waiting for logs. To exit press CTRL + C')
  30. def callback(ch, method, properties, body):
  31. print("[X] % r" % body)
  32. # 将消费者的队列与交换机绑定
  33. channel.basic_consume(queue=queue_name,
  34. auto_ack=True,
  35. on_message_callback=callback)
  36. channel.start_consuming()
  37. ===================================================consumer2.py===================================================
  38. import pika
  39. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  40. channel = connection.channel()
  41. # 声明一个名为log类型为fanout的交换机
  42. channel.exchange_declare(exchange='logs',
  43. exchange_type='direct')
  44. # 创建队列
  45. result = channel.queue_declare("", exclusive=True)
  46. queue_name = result.method.queue
  47. print(queue_name)
  48. # 将队列和交换机绑定
  49. channel.queue_bind(exchange='logs',
  50. queue=queue_name,
  51. routing_key='warning',
  52. )
  53. print('[*] Waiting for logs. To exit press CTRL + C')
  54. def callback(ch, method, properties, body):
  55. print("[X] % r" % body)
  56. # 将消费者的队列与交换机绑定
  57. channel.basic_consume(queue=queue_name,
  58. auto_ack=True,
  59. on_message_callback=callback)
  60. channel.start_consuming()
  61. ===================================================consumer3.py===================================================
  62. import pika
  63. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  64. channel = connection.channel()
  65. # 声明一个名为log类型为fanout的交换机
  66. channel.exchange_declare(exchange='logs',
  67. exchange_type='direct')
  68. # 创建队列
  69. result = channel.queue_declare("", exclusive=True)
  70. queue_name = result.method.queue
  71. print(queue_name)
  72. # 将队列和交换机绑定
  73. channel.queue_bind(exchange='logs',
  74. queue=queue_name,
  75. routing_key='info',
  76. )
  77. print('[*] Waiting for logs. To exit press CTRL + C')
  78. def callback(ch, method, properties, body):
  79. print("[X] % r" % body)
  80. # 将消费者的队列与交换机绑定
  81. channel.basic_consume(queue=queue_name,
  82. auto_ack=True,
  83. on_message_callback=callback)
  84. channel.start_consuming()

交换机之通配符

简介

  1. 订阅者模式是一对多,关键字模式是一对一,通配符类似于群组分发,只要满足某个共同规则就可以

场景

  1. 一个生产者,四个消费者
  2. 需要修改的参数:对于生产者,在声明消息队列的时候,需要指定交换机类型为topic
  3. 对于消费者,在声明消息队列的时候,修改交换机类型为topic,修改关键字为通配符;
  1. ================================================producer.py=======================================================
  2. import pika
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(
  4. host='localhost'))
  5. channel = connection.channel()
  6. channel.exchange_declare(exchange='logs',
  7. exchange_type='topic')
  8. message = "info:gunpowder"
  9. channel.basic_publish(exchange='logs',
  10. routing_key='USA.news',
  11. body=message)
  12. print(" [x] Sent %r" % message)
  13. connection.close()
  14. ================================================consumer1.py======================================================
  15. import pika
  16. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  17. channel = connection.channel()
  18. # 声明一个名为log类型为fanout的交换机
  19. channel.exchange_declare(exchange='logs',
  20. exchange_type='topic')
  21. # 创建队列
  22. result = channel.queue_declare("", exclusive=True)
  23. queue_name = result.method.queue
  24. print(queue_name)
  25. # 将队列和交换机绑定
  26. channel.queue_bind(exchange='logs',
  27. queue=queue_name,
  28. routing_key='#.news',
  29. )
  30. print('[*] Waiting for logs. To exit press CTRL + C')
  31. def callback(ch, method, properties, body):
  32. print("[X] % r" % body)
  33. # 将消费者的队列与交换机绑定
  34. channel.basic_consume(queue=queue_name,
  35. auto_ack=True,
  36. on_message_callback=callback)
  37. channel.start_consuming()
  38. ================================================consumer2.py======================================================
  39. import pika
  40. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  41. channel = connection.channel()
  42. # 声明一个名为log类型为fanout的交换机
  43. channel.exchange_declare(exchange='logs',
  44. exchange_type='topic')
  45. # 创建队列
  46. result = channel.queue_declare("", exclusive=True)
  47. queue_name = result.method.queue
  48. print(queue_name)
  49. # 将队列和交换机绑定
  50. channel.queue_bind(exchange='logs',
  51. queue=queue_name,
  52. routing_key='#.weather',
  53. )
  54. print('[*] Waiting for logs. To exit press CTRL + C')
  55. def callback(ch, method, properties, body):
  56. print("[X] % r" % body)
  57. # 将消费者的队列与交换机绑定
  58. channel.basic_consume(queue=queue_name,
  59. auto_ack=True,
  60. on_message_callback=callback)
  61. channel.start_consuming()
  62. ================================================consumer3.py======================================================
  63. import pika
  64. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  65. channel = connection.channel()
  66. # 声明一个名为log类型为fanout的交换机
  67. channel.exchange_declare(exchange='logs',
  68. exchange_type='topic')
  69. # 创建队列
  70. result = channel.queue_declare("", exclusive=True)
  71. queue_name = result.method.queue
  72. print(queue_name)
  73. # 将队列和交换机绑定
  74. channel.queue_bind(exchange='logs',
  75. queue=queue_name,
  76. routing_key='USA.#',
  77. )
  78. print('[*] Waiting for logs. To exit press CTRL + C')
  79. def callback(ch, method, properties, body):
  80. print("[X] % r" % body)
  81. # 将消费者的队列与交换机绑定
  82. channel.basic_consume(queue=queue_name,
  83. auto_ack=True,
  84. on_message_callback=callback)
  85. channel.start_consuming()
  86. ================================================consumer4.py======================================================
  87. import pika
  88. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  89. channel = connection.channel()
  90. # 声明一个名为log类型为fanout的交换机
  91. channel.exchange_declare(exchange='logs',
  92. exchange_type='topic')
  93. # 创建队列
  94. result = channel.queue_declare("", exclusive=True)
  95. queue_name = result.method.queue
  96. print(queue_name)
  97. # 将队列和交换机绑定
  98. channel.queue_bind(exchange='logs',
  99. queue=queue_name,
  100. routing_key='EU.#',
  101. )
  102. print('[*] Waiting for logs. To exit press CTRL + C')
  103. def callback(ch, method, properties, body):
  104. print("[X] % r" % body)
  105. # 将消费者的队列与交换机绑定
  106. channel.basic_consume(queue=queue_name,
  107. auto_ack=True,
  108. on_message_callback=callback)
  109. channel.start_consuming()

引用与参考

RabbitMQ常用命令:

  1. # 管理命令
  2. rabbitmq-service.bat install rabbitmq-service install
  3. rabbitmq-service.bat stop rabbitmq-service stop
  4. rabbitmq-service.bat start rabbitmq-server start
  5. # 查看状态
  6. rabbitmqctl status
  7. # 启动网页管理界面
  8. # 接口: localhost:15672
  9. # 命令:rabbitmq-plugins enable rabbitmq_management

link of downloading

  1. http://www.erlang.org/downloads
  2. https://www.rabbitmq.com/download.html
  3. # 消息延迟插件
  4. https://www.rabbitmq.com/download.html
  5. # 插件应用命令
  6. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  7. # 安装教程链接
  8. https://www.cnblogs.com/yyee/p/14281111.html

reference

RabbitMQ工作队列之公平分发消息与消息应答(ACK) - SegmentFault 思否

【更新】RabbitMQ -05 持久化 & 不公平分发 - 知乎 (zhihu.com)