1 概念
什么是消息队列?
基于生产者消费者模型,即生产一个消费一个,无生产则无消费,有消费无生产则等待;
采用队列数据结构,先进先出,和生活中排队一致;
为什么消息要形成队列?
计算机处理单条消息的事件始终受限,微观上来看本就逐条处理,但宏观上同时到达的任务会导致并发困难以至于积压任务;
消息队列就是对任务形成一种逻辑处理顺序,不再是无需的任务处理请求,按照先来先出来的原则,逐次处理任务;
由于微观上的逐次处理和多任务导致宏观上的并发丧失,消息队列提供了最坏的时间长度和一般的逻辑处理,计算机不再随机或者说遭遇堵塞;
对于许多任务并非要求实时性,比如大V的微博发布,对于在线用户可以直接推,但对于离线用户可以存储在消息队列,等用户上线后再推送,
对于服务器而言,减少了并发处理的任务量,对于用户而言,基本感觉不到消息的延迟,而对于推荐而言,公共区域的推送直接节省了多个用户推送的数据量,展示更加直观;
代码模板
# -*- coding: UTF-8 -*-
"""
@author:41999
@file:队列.py
@time:2021/10/22
"""
import queue
q = queue.Queue(maxsize=100)
q.put(123)
q.put(456)
q.put(789)
print(q.get())
print(q.get())
print(q.get(block=False))
# block用于关闭阻塞时读取操作的挂起
为什么使用消息队列?
简介
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
。
应用场景—外卖系统
外卖订单的数据模型:用户,时间,店铺,金额
处理流程:用户完成付款,订单系统负责提供信息,分为四个对象,
- 对用户而言,告知交易成功;
- 对骑手系统而言,告知新的任务已产生,等待抢单,
- 对于产品存储系统而言,告知其库存减量,
- 对于商家系统而言,扣除前端库存,并且告知交易产生
通知的方式:链式处理,骑手系统 —> 产品存储系统 —> 商家系统 —> 用户[完整走完四个部件,数据却可能丢失]
传统业务一对一,结算系统开启一个接口,逐次通知四个系统;
消息队列的方式:开放四个接口,被四个系统监听,并且形成一个计数器,成功读取四次后自动销毁数据,等待下一个订单的生产;
5. 消息队列充当了一个中间层,左侧是订单系统,右侧分别是骑手系统,产品库存系统,商家系统和用户,右侧四者分别订阅消息,左侧订单的变动会实时推送给右侧四个主体;
评价方式[内聚耦合]
低内聚高耦合,模块相对独立;
RabbitMQ
简介:
分为简单模式和交换机模式两种
交换机模式分为:发布订阅;关键字匹配模式;模糊匹配模式
简单模式
生产者:链接服务器;创建队列;将消息插入队列;
消费者:链接服务器;创建消息队列;
生产者代码
# -*- coding: UTF-8 -*-
"""
@author:41999
@file:01producer.py
@time:2021/10/27
"""
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 简单模式,交换机参数为空,第二个参数是指定队列
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print("[x] Sent 'Hello world!'")
消费者
# -*- coding: UTF-8 -*-
"""
@author:41999
@file:02consumer.py
@time:2021/10/27
"""
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建消息队列
channel.queue_declare(queue='hello')
# 确定回调函数
def callback(ch, mnethod, properties, body):
print("[x] Received %r" % body)
# 监听队列 auto_ack 默认应答
channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback)
print("[*] Waiting for messages. To exit press CTRL + C")
# 正式开始监听
channel.start_consuming()
最终效果
参数使用
应答参数
应用场景
1. 应答参数所属的函数以及消息接收阶段
1. 属于消费者模块的函数channel.basic_consume;属于最后的确定监听队列阶段,在确定回调函数之后
2. 存在的意义
1. 默认应答则在生产一个消息后消费一个消息
2. 当生产者生产一个消息,消费者创建连接--->创建队列--->创建回调函数时,程序接收但意外终止
四个步骤并没有完全走完,那么数据就会丢失[原子操作]
3. 解决策略:将应答参数改为False,手动应答
代码
===========================producer==================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 简单模式,交换机参数为空,第二个参数是指定队列
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print("[x] Sent 'Hello world!'")
===========================consumer=================================================
# 确定回调函数
def callback(ch, mnethod, properties, body):
print("[x] Received %r" % body)
# 监听队列 auto_ack 默认应答
channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback)
print("[*] Waiting for messages. To exit press CTRL + C")
# 正式开始监听
channel.start_consuming()
最终效果
持久化
应用场景
为什么设置这个参数?
RabbitMQ可能会意外停止,而持久化可以保证即使程序重启,消费者队列依然可以接收到生产者的产品
代码
=====================================producer.py=====================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True, passive=True)
# 简单模式,交换机参数为空,第二个参数是指定队列
channel.basic_publish(exchange='',
routing_key='hello',
properties=pika.BasicProperties(delivery_mode=2),
body='Hello World!')
print("[x] Sent 'Hello world!'")
=====================================consumer.py=====================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建消息队列
channel.queue_declare(queue='hello', durable=True, passive=True)
# 确定回调函数
def callback(ch, mnethod, properties, body):
print("[x] Received %r" % body)
# ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消费者取得数据后,消费者在队列中删除产品
# 监听队列 auto_ack 默认应答
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print("[*] Waiting for messages. To exit press CTRL + C")
# 正式开始监听
channel.start_consuming()
报错:pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false'")
解决
在队列建立阶段中中新增passive参数,并且设置为True。生产者与消费者均需要设置,且均为持久化
参考文档:
https://www.cnblogs.com/gangzi4321/p/11001497.html
订阅者模式
场景
生产者通过交换机发布消息,多个消费者建立自己的队列从交换机读取同一份信息,类似于村里的广播
代码
=======================================producer.py====================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为long的,交换机类型为fanout的交换机
message = "Info:20211027"
channel.basic_publish(exchange='logs',
exchange_type='fanout',
routing_key='',
body=message)
print("[x] Sent % r" % message)
connection.close()
=======================================consumer.py====================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为log类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 将队列和交换机绑定
channel.queue_bind(exchange='logs',
queue=queue_name)
print('[*] Waiting for logs. To exit press CTRL + C')
def callback(ch, method, properties, body):
print("[X] % r" % body)
# 将消费者的队列与交换机绑定
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
公平分发
简介
1. 缺陷
我们提供多个消费者,目的就是为了提高系统的性能,提升系统处理任务的速度;
如果将消息平均的分发给每个消费者,那么处理消息快的服务是不是会空闲下来;
而处理慢的服务可能会阻塞等待处理,这样的场景是我们不愿意看到的;
所以有了今天要说的分发模式,公平分发。
2. 能者多劳
所谓的公平分发,其实用能者多劳描述更为贴切,
根据名字就可以知道,谁有能力处理更多的任务,那么就交给谁处理,防止消息的挤压。
3. 前提
将自动应答改为手动应答
代码
===================================================producer.py===================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
# 简单模式,交换机参数为空,第二个参数是指定队列
channel.basic_publish(exchange='',
routing_key='hello',
body='222')
print("[x] Sent 'Hello world!'")
===================================================consumer.py===================================================
import pika,time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建消息队列
channel.queue_declare(queue='hello')
# 确定回调函数
def callback(ch, mnethod, properties, body):
time.sleep(5)
print("[x] Received %r" % body)
# ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消费者取得数据后,消费者在队列中删除产品
# 公平分发---能者多劳
channel.basic_qos(prefetch_count=1)
# 监听队列 auto_ack 默认应答
channel.basic_consume(queue='hello',
auto_ack=False,
on_message_callback=callback)
print("[*] Waiting for messages. To exit press CTRL + C")
# 正式开始监听
channel.start_consuming()
问题
1. 出现的问题
1. 本次新增的参数为:channel.basic_qos(prefetch_count=1);
2. 想要实现公平分配,即工作效率高,快速回复的对象接收最多的消息
2. 实验方法
1. 消费者分三次发送消息,间隔看手速,三个消费者持续监听同一个队列;
2. 三个消费者的睡眠时间分别为5,10,15;
3. 预期实现的效果:睡眠时间最短的消费者收到最多的消息,也就是三条;
4. 实际运行的效果:三个消费者收到的消息平摊,每人一条。
交换机模式之关键字模式
简介
发布订阅相当于群发,而关键字模式类似于私发,前者一对多,后者一对一
测试方法
对于生产者而言,只需要更改参数消息发布中的routine_key和exchange_type=’direct’ ,对于消费者,需要修改exchange_type 和 routine_key前者修改模式,后者指定关键字
一个生产者对应三个消费者
===================================================producer.py====================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为long的,交换机类型为fanout的交换机
message = "Info:20211029"
channel.basic_publish(exchange='logs',
exchange_type='direct',
routing_key='info',
body=message)
print("[x] Sent % r" % message)
connection.close()
===================================================consumer1.py===================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为log类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='direct')
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 将队列和交换机绑定
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key='error',
)
print('[*] Waiting for logs. To exit press CTRL + C')
def callback(ch, method, properties, body):
print("[X] % r" % body)
# 将消费者的队列与交换机绑定
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
===================================================consumer2.py===================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为log类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='direct')
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 将队列和交换机绑定
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key='warning',
)
print('[*] Waiting for logs. To exit press CTRL + C')
def callback(ch, method, properties, body):
print("[X] % r" % body)
# 将消费者的队列与交换机绑定
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
===================================================consumer3.py===================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为log类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='direct')
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 将队列和交换机绑定
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key='info',
)
print('[*] Waiting for logs. To exit press CTRL + C')
def callback(ch, method, properties, body):
print("[X] % r" % body)
# 将消费者的队列与交换机绑定
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
交换机之通配符
简介
订阅者模式是一对多,关键字模式是一对一,通配符类似于群组分发,只要满足某个共同规则就可以
场景
一个生产者,四个消费者
需要修改的参数:对于生产者,在声明消息队列的时候,需要指定交换机类型为topic
对于消费者,在声明消息队列的时候,修改交换机类型为topic,修改关键字为通配符;
================================================producer.py=======================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='topic')
message = "info:gunpowder"
channel.basic_publish(exchange='logs',
routing_key='USA.news',
body=message)
print(" [x] Sent %r" % message)
connection.close()
================================================consumer1.py======================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为log类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='topic')
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 将队列和交换机绑定
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key='#.news',
)
print('[*] Waiting for logs. To exit press CTRL + C')
def callback(ch, method, properties, body):
print("[X] % r" % body)
# 将消费者的队列与交换机绑定
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
================================================consumer2.py======================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为log类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='topic')
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 将队列和交换机绑定
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key='#.weather',
)
print('[*] Waiting for logs. To exit press CTRL + C')
def callback(ch, method, properties, body):
print("[X] % r" % body)
# 将消费者的队列与交换机绑定
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
================================================consumer3.py======================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为log类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='topic')
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 将队列和交换机绑定
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key='USA.#',
)
print('[*] Waiting for logs. To exit press CTRL + C')
def callback(ch, method, properties, body):
print("[X] % r" % body)
# 将消费者的队列与交换机绑定
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
================================================consumer4.py======================================================
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明一个名为log类型为fanout的交换机
channel.exchange_declare(exchange='logs',
exchange_type='topic')
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 将队列和交换机绑定
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key='EU.#',
)
print('[*] Waiting for logs. To exit press CTRL + C')
def callback(ch, method, properties, body):
print("[X] % r" % body)
# 将消费者的队列与交换机绑定
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
引用与参考
RabbitMQ常用命令:
# 管理命令
rabbitmq-service.bat install 或 rabbitmq-service install
rabbitmq-service.bat stop 或 rabbitmq-service stop
rabbitmq-service.bat start 或 rabbitmq-server start
# 查看状态
rabbitmqctl status
# 启动网页管理界面
# 接口: localhost:15672
# 命令:rabbitmq-plugins enable rabbitmq_management
link of downloading
http://www.erlang.org/downloads
https://www.rabbitmq.com/download.html
# 消息延迟插件
https://www.rabbitmq.com/download.html
# 插件应用命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 安装教程链接
https://www.cnblogs.com/yyee/p/14281111.html