RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。可维护多个队列,可实现消息的一对一和广播等方式发送
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
安装
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum -y install rabbitmq-server
# 启动
systemctl start rabbitmq-server
# 查看状态
rabbitmqctl status
# 启用插件
rabbitmq-plugins enable rabbitmq_management
# 创建用户
rabbitmqctl add_user admin admin
# 修改角色为管理员
rabbitmqctl set_permissions -p / admin "." "." ".*"
# 设置权限
rabbitmqctl set_user_tags admin administrator
# 得到所有队列及存在的数据条数
rabbitmqctl list_queues
RabbitMQ 特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
- 可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 - 灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 - 消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 - 高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 - 多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 - 多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 - 管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 - 跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 - 插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
RabbitMQ 持久化
MQ默认建立的是临时 queue 和 exchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。所以我们一般在创建 queue 或者 exchange 的时候会声明 持久化。
queue 声明持久化
# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储
result = channel.queue_declare(queue = 'python-queue',durable = True)
exchange 声明持久化
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange = 'python-exchange', durable = True)
注意:如果已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,因为当前状态不能更改 queue 或 exchange 存储属性,需要删除重建。如果 queue 和 exchange 中一个声明了持久化,另一个没有声明持久化,则不允许绑定。
3. 消息持久化
虽然 exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
channel.basic_publish(exchange = 'python-exchange', routing_key = 'python-routing_key', body = message, properties=pika.BasicProperties(delivery_mode = 2))
acknowledgement 消息不丢失
消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给 rabbitmq ,重新再被消费者消费,这个时候需要设置确认标识。
# no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
channel.basic_consume(on_message_callback = callback, queue = 'python-queue', no_ack = False)
RabbitMQ 发布与订阅
rabbitmq 的发布与订阅要借助交换机(Exchange)的原理实现:
Exchange 一共有三种工作模式:fanout, direct, topicd
安装Pika
pip install pika
模式一:fanout
这种模式下,传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
- 不需要指定 routing_key ,即使指定了也是无效。
- 需要提前将 exchange 和 queue 绑定,一个 exchange 可以绑定多个 queue,一个queue可以绑定多个exchange。
- 需要先启动订阅者,此模式下的队列是 consumer 随机生成的,发布者仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。
发布者:
import json
import pika
import datetime
# 生成消息入口处
def get_message():
for i in range(100): # 生成10条消息
message = json.dumps(
{'id': "10000%s" % i, "amount": 100 * i, "name": "tony", "createtime": str(datetime.datetime.now())})
producter(message)
def producter(message): # 消息生产者
exchange_name = "rt_default_y" # 主题
routing_key = '' # 路由键
# 建立连接与rabbitmq 服务的连接,虚拟队列需要指定参数 virtual_host,如果是默认的可以不填(默认为/),也可以自己创建一个
userx = pika.PlainCredentials("admin", "admin") # 账号密码
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.131.128", 5672, '/', credentials=userx))
# 创建一个 AMQP 信道(Channel),建造一个大邮箱,隶属于这家邮局的邮箱
channel = conn.channel()
# 声明消息队列rt_autoidc_queue,消息将在这个队列传递,如不存在,则创建, exchange_type表示使用那种工作模式,durable = True代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
# 向队列插入数值routing_key为空,body 就是放入的消息内容,exchange指定消息在哪个队列传递,这里是空的exchange但仍然能够发送消息到队列中,因为我们使用的是我们定义的空字符串“”exchange(默认的exchange)
channel.basic_publish(exchange=exchange_name, # 确定发布主题为
routing_key=routing_key, # 路由键
body=message, # 发送的数据
properties=pika.BasicProperties(delivery_mode=2)) # delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
# 关闭连接
conn.close()
if __name__ == '__main__':
try:
get_message()
except KeyboardInterrupt:
print("Quit")
订阅者:
import pika
def rt_receive():
exchange_name = "rt_default_y" # 主题
routing_key = '' # 路由键
# 建立连接
userx = pika.PlainCredentials("admin", "admin") # 账号密码
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.131.128", 5672, '/', credentials=userx))
# 创建一个信道
channel = conn.channel()
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare('', exclusive=True)
# 声明exchange的类型,由exchange指定消息在哪个队列传递,如不存在,则创建,exchange_type表示使用那种工作模式,durable = True代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', passive=False, durable=True, auto_delete=False)
# 消费者绑定queue,确定订阅主题为rt_autoidc_queue
channel.queue_bind(queue=result.method.queue, exchange=exchange_name, routing_key=routing_key)
# 收到指定消息的回调设置,auto_ack设置成False,在调用callback函数时,未收到确认标识,消息会重回队列。设置成True,无论调用callback成功与否,消息都被消费掉
channel.basic_consume(queue=result.method.queue, # 队列名
on_message_callback=callback, # 收到消息的回调函数
auto_ack=False)
# 开始循环监听
channel.start_consuming()
# 回调函数
def callback(ch, method, properties, body):
print(ch, method, properties, body)
# 是否消费队列,可以添加判断条件
# if models_info == True:
# ch.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == '__main__':
try:
rt_receive()
except KeyboardInterrupt:
print("Quit")
返回结果
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x00000192E30F3388> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.52d63d66ec2c41a5ba91ee2ee3192b56', 'delivery_tag=1', 'exchange=rt_default_y', 'redelivered=False', 'routing_key='])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100000", "amount": 0, "name": "tony", "createtime": "2021-04-27 22:04:48.859094"}'
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x00000192E30F3388> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.52d63d66ec2c41a5ba91ee2ee3192b56', 'delivery_tag=2', 'exchange=rt_default_y', 'redelivered=False', 'routing_key='])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100001", "amount": 100, "name": "tony", "createtime": "2021-04-27 22:04:48.884085"}'
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x00000192E30F3388> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.52d63d66ec2c41a5ba91ee2ee3192b56', 'delivery_tag=3', 'exchange=rt_default_y', 'redelivered=False', 'routing_key='])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100002", "amount": 200, "name": "tony", "createtime": "2021-04-27 22:04:48.898071"}'
模式二:direct
这种工作模式的原理是 消息发送至 exchange,exchange 根据 路由键(routing_key)转发到相对应的 queue 上。
- 可以使用默认 exchange =’ ‘ ,也可以自定义 exchange
- 这种模式下不需要将 exchange 和 任何进行绑定,当然绑定也是可以的。可以将 exchange 和 queue ,routing_key 和 queue 进行绑定
- 传递或接受消息时 需要 指定 routing_key
- 需要先启动 订阅者,此模式下的队列是 consumer 随机生成的,发布者 仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。
发布者:
如果生成多个的话,实现效果是轮询发送,一个一个循环发送数据,如同“皇帝轮流做…”
import json
import pika
import datetime
# 生成消息入口处
def get_message():
for i in range(10): # 生成10条消息
message = json.dumps(
{'id': "10000%s" % i, "amount": 100 * i, "name": "tony", "createtime": str(datetime.datetime.now())})
producter(message)
def producter(message): # 消息生产者
exchange_name = "rt_default_x" # 主题
routing_key = 'rt_autoidc_queue' # 路由键
# 建立连接与rabbitmq 服务的连接,虚拟队列需要指定参数 virtual_host,如果是默认的可以不填(默认为/),也可以自己创建一个
userx = pika.PlainCredentials("admin", "admin")
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.131.128", 5672, '/', credentials=userx))
# 创建一个 AMQP 信道(Channel),建造一个大邮箱,隶属于这家邮局的邮箱
channel = conn.channel()
# 声明消息队列rt_autoidc_queue,消息将在这个队列传递,如不存在,则创建, exchange_type表示使用那种工作模式,durable = True代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange=exchange_name, exchange_type='direct', durable=True)
# 向队列插入数值routing_key的队列名为rt_autoidc_queue,body 就是放入的消息内容,exchange指定消息在哪个队列传递,这里是空的exchange但仍然能够发送消息到队列中,因为我们使用的是我们定义的空字符串“”exchange(默认的exchange)
channel.basic_publish(exchange=exchange_name, # 确定发布主题为
routing_key=routing_key, # 路由键
body=message, # 发送的数据
properties=pika.BasicProperties(delivery_mode=2)) # delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
# 关闭连接
conn.close()
if __name__ == '__main__':
try:
get_message()
except KeyboardInterrupt:
print("Quit")
订阅者:
import pika
def rt_receive():
exchange_name = "rt_default_x" # 主题
routing_key = 'rt_autoidc_queue' # 路由键
# 建立连接
userx = pika.PlainCredentials("admin", "admin") # 账号密码
conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.131.128", 5672, '/', credentials=userx))
# 创建一个信道
channel = conn.channel()
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare('', exclusive=True)
# 声明exchange的类型,由exchange指定消息在哪个队列传递,如不存在,则创建,exchange_type表示使用那种工作模式,durable = True代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange=exchange_name, exchange_type='direct', passive=False, durable=True, auto_delete=False)
# 消费者绑定queue,确定订阅主题为rt_autoidc_queue
channel.queue_bind(queue=result.method.queue, exchange=exchange_name, routing_key=routing_key)
# 收到指定消息的回调设置,auto_ack设置成False,在调用callback函数时,未收到确认标识,消息会重回队列。设置成True,无论调用callback成功与否,消息都被消费掉
channel.basic_consume(queue=result.method.queue, # 队列名
on_message_callback=callback, # 收到消息的回调函数
auto_ack=False)
# 开始循环监听
channel.start_consuming()
# 回调函数
def callback(ch, method, properties, body):
print(ch, method, properties, body)
# 是否消费队列,可以添加判断条件
# if models_info == True:
# ch.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == '__main__':
try:
rt_receive()
except KeyboardInterrupt:
print("Quit")
返回数据
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x0000024125AB3588> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.b5cd70883d414facb51a748df4a8824c', 'delivery_tag=21', 'exchange=rt_default_x', 'redelivered=False', 'routing_key=rt_autoidc_queue'])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100000", "amount": 0, "name": "tony", "createtime": "2021-04-27 21:59:08.161943"}'
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x0000024125AB3588> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.b5cd70883d414facb51a748df4a8824c', 'delivery_tag=22', 'exchange=rt_default_x', 'redelivered=False', 'routing_key=rt_autoidc_queue'])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100001", "amount": 100, "name": "tony", "createtime": "2021-04-27 21:59:08.199919"}'
<BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x0000024125AB3588> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.b5cd70883d414facb51a748df4a8824c', 'delivery_tag=23', 'exchange=rt_default_x', 'redelivered=False', 'routing_key=rt_autoidc_queue'])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100002", "amount": 200, "name": "tony", "createtime": "2021-04-27 21:59:08.211914"}'
模式三:topicd
这种模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发消息到指定的 queue 。 不同点是 routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,比如“#”是匹配全部,“*”是匹配一个词。
举例:routing_key =“#orderid#”,意思是将消息转发至所有 routing_key 包含 “orderid” 字符的队列中。代码和模式二 类似,就不贴出来了。