RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。可维护多个队列,可实现消息的一对一和广播等方式发送
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

安装

  1. curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
  2. curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
  3. yum -y install rabbitmq-server
  4. # 启动
  5. systemctl start rabbitmq-server
  6. # 查看状态
  7. rabbitmqctl status
  8. # 启用插件
  9. rabbitmq-plugins enable rabbitmq_management
  10. # 创建用户
  11. rabbitmqctl add_user admin admin
  12. # 修改角色为管理员
  13. rabbitmqctl set_permissions -p / admin "." "." ".*"
  14. # 设置权限
  15. rabbitmqctl set_user_tags admin administrator
  16. # 得到所有队列及存在的数据条数
  17. rabbitmqctl list_queues

image.png

RabbitMQ 特点

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  1. 可靠性(Reliability)
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
  2. 灵活的路由(Flexible Routing)
    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
  3. 消息集群(Clustering)
    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
  4. 高可用(Highly Available Queues)
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  5. 多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
  6. 多语言客户端(Many Clients)
    RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
  7. 管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
  8. 跟踪机制(Tracing)
    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
  9. 插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

RabbitMQ 持久化

MQ默认建立的是临时 queue 和 exchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。所以我们一般在创建 queue 或者 exchange 的时候会声明 持久化。

  1. queue 声明持久化

    1. # 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储
    2. result = channel.queue_declare(queue = 'python-queue',durable = True)
  2. exchange 声明持久化

    1. # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储
    2. channel.exchange_declare(exchange = 'python-exchange', durable = True)

    注意:如果已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,因为当前状态不能更改 queue 或 exchange 存储属性,需要删除重建。如果 queue 和 exchange 中一个声明了持久化,另一个没有声明持久化,则不允许绑定。
    3. 消息持久化
    虽然 exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。

    1. # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
    2. channel.basic_publish(exchange = 'python-exchange', routing_key = 'python-routing_key', body = message, properties=pika.BasicProperties(delivery_mode = 2))
  3. acknowledgement 消息不丢失

消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给 rabbitmq ,重新再被消费者消费,这个时候需要设置确认标识。

  1. # no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
  2. channel.basic_consume(on_message_callback = callback, queue = 'python-queue', no_ack = False)

RabbitMQ 发布与订阅

rabbitmq 的发布与订阅要借助交换机(Exchange)的原理实现:
RabbitMQ - 图2
Exchange 一共有三种工作模式:fanout, direct, topicd
安装Pika

  1. pip install pika

模式一:fanout

这种模式下,传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

  • 不需要指定 routing_key ,即使指定了也是无效。
  • 需要提前将 exchange 和 queue 绑定,一个 exchange 可以绑定多个 queue,一个queue可以绑定多个exchange。
  • 需要先启动订阅者,此模式下的队列是 consumer 随机生成的,发布者仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。

发布者:

  1. import json
  2. import pika
  3. import datetime
  4. # 生成消息入口处
  5. def get_message():
  6. for i in range(100): # 生成10条消息
  7. message = json.dumps(
  8. {'id': "10000%s" % i, "amount": 100 * i, "name": "tony", "createtime": str(datetime.datetime.now())})
  9. producter(message)
  10. def producter(message): # 消息生产者
  11. exchange_name = "rt_default_y" # 主题
  12. routing_key = '' # 路由键
  13. # 建立连接与rabbitmq 服务的连接,虚拟队列需要指定参数 virtual_host,如果是默认的可以不填(默认为/),也可以自己创建一个
  14. userx = pika.PlainCredentials("admin", "admin") # 账号密码
  15. conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.131.128", 5672, '/', credentials=userx))
  16. # 创建一个 AMQP 信道(Channel),建造一个大邮箱,隶属于这家邮局的邮箱
  17. channel = conn.channel()
  18. # 声明消息队列rt_autoidc_queue,消息将在这个队列传递,如不存在,则创建, exchange_type表示使用那种工作模式,durable = True代表exchange持久化存储,False 非持久化存储
  19. channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', durable=True)
  20. # 向队列插入数值routing_key为空,body 就是放入的消息内容,exchange指定消息在哪个队列传递,这里是空的exchange但仍然能够发送消息到队列中,因为我们使用的是我们定义的空字符串“”exchange(默认的exchange)
  21. channel.basic_publish(exchange=exchange_name, # 确定发布主题为
  22. routing_key=routing_key, # 路由键
  23. body=message, # 发送的数据
  24. properties=pika.BasicProperties(delivery_mode=2)) # delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
  25. # 关闭连接
  26. conn.close()
  27. if __name__ == '__main__':
  28. try:
  29. get_message()
  30. except KeyboardInterrupt:
  31. print("Quit")

image.png
订阅者:

  1. import pika
  2. def rt_receive():
  3. exchange_name = "rt_default_y" # 主题
  4. routing_key = '' # 路由键
  5. # 建立连接
  6. userx = pika.PlainCredentials("admin", "admin") # 账号密码
  7. conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.131.128", 5672, '/', credentials=userx))
  8. # 创建一个信道
  9. channel = conn.channel()
  10. # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
  11. result = channel.queue_declare('', exclusive=True)
  12. # 声明exchange的类型,由exchange指定消息在哪个队列传递,如不存在,则创建,exchange_type表示使用那种工作模式,durable = True代表exchange持久化存储,False 非持久化存储
  13. channel.exchange_declare(exchange=exchange_name, exchange_type='fanout', passive=False, durable=True, auto_delete=False)
  14. # 消费者绑定queue,确定订阅主题为rt_autoidc_queue
  15. channel.queue_bind(queue=result.method.queue, exchange=exchange_name, routing_key=routing_key)
  16. # 收到指定消息的回调设置,auto_ack设置成False,在调用callback函数时,未收到确认标识,消息会重回队列。设置成True,无论调用callback成功与否,消息都被消费掉
  17. channel.basic_consume(queue=result.method.queue, # 队列名
  18. on_message_callback=callback, # 收到消息的回调函数
  19. auto_ack=False)
  20. # 开始循环监听
  21. channel.start_consuming()
  22. # 回调函数
  23. def callback(ch, method, properties, body):
  24. print(ch, method, properties, body)
  25. # 是否消费队列,可以添加判断条件
  26. # if models_info == True:
  27. # ch.basic_ack(delivery_tag=method.delivery_tag)
  28. if __name__ == '__main__':
  29. try:
  30. rt_receive()
  31. except KeyboardInterrupt:
  32. print("Quit")

返回结果

  1. <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x00000192E30F3388> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.52d63d66ec2c41a5ba91ee2ee3192b56', 'delivery_tag=1', 'exchange=rt_default_y', 'redelivered=False', 'routing_key='])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100000", "amount": 0, "name": "tony", "createtime": "2021-04-27 22:04:48.859094"}'
  2. <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x00000192E30F3388> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.52d63d66ec2c41a5ba91ee2ee3192b56', 'delivery_tag=2', 'exchange=rt_default_y', 'redelivered=False', 'routing_key='])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100001", "amount": 100, "name": "tony", "createtime": "2021-04-27 22:04:48.884085"}'
  3. <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x00000192E30F3388> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.52d63d66ec2c41a5ba91ee2ee3192b56', 'delivery_tag=3', 'exchange=rt_default_y', 'redelivered=False', 'routing_key='])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100002", "amount": 200, "name": "tony", "createtime": "2021-04-27 22:04:48.898071"}'

image.png

模式二:direct

这种工作模式的原理是 消息发送至 exchange,exchange 根据 路由键(routing_key)转发到相对应的 queue 上。

  • 可以使用默认 exchange =’ ‘ ,也可以自定义 exchange
  • 这种模式下不需要将 exchange 和 任何进行绑定,当然绑定也是可以的。可以将 exchange 和 queue ,routing_key 和 queue 进行绑定
  • 传递或接受消息时 需要 指定 routing_key
  • 需要先启动 订阅者,此模式下的队列是 consumer 随机生成的,发布者 仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。

发布者:
如果生成多个的话,实现效果是轮询发送,一个一个循环发送数据,如同“皇帝轮流做…”

  1. import json
  2. import pika
  3. import datetime
  4. # 生成消息入口处
  5. def get_message():
  6. for i in range(10): # 生成10条消息
  7. message = json.dumps(
  8. {'id': "10000%s" % i, "amount": 100 * i, "name": "tony", "createtime": str(datetime.datetime.now())})
  9. producter(message)
  10. def producter(message): # 消息生产者
  11. exchange_name = "rt_default_x" # 主题
  12. routing_key = 'rt_autoidc_queue' # 路由键
  13. # 建立连接与rabbitmq 服务的连接,虚拟队列需要指定参数 virtual_host,如果是默认的可以不填(默认为/),也可以自己创建一个
  14. userx = pika.PlainCredentials("admin", "admin")
  15. conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.131.128", 5672, '/', credentials=userx))
  16. # 创建一个 AMQP 信道(Channel),建造一个大邮箱,隶属于这家邮局的邮箱
  17. channel = conn.channel()
  18. # 声明消息队列rt_autoidc_queue,消息将在这个队列传递,如不存在,则创建, exchange_type表示使用那种工作模式,durable = True代表exchange持久化存储,False 非持久化存储
  19. channel.exchange_declare(exchange=exchange_name, exchange_type='direct', durable=True)
  20. # 向队列插入数值routing_key的队列名为rt_autoidc_queue,body 就是放入的消息内容,exchange指定消息在哪个队列传递,这里是空的exchange但仍然能够发送消息到队列中,因为我们使用的是我们定义的空字符串“”exchange(默认的exchange)
  21. channel.basic_publish(exchange=exchange_name, # 确定发布主题为
  22. routing_key=routing_key, # 路由键
  23. body=message, # 发送的数据
  24. properties=pika.BasicProperties(delivery_mode=2)) # delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
  25. # 关闭连接
  26. conn.close()
  27. if __name__ == '__main__':
  28. try:
  29. get_message()
  30. except KeyboardInterrupt:
  31. print("Quit")

订阅者:

  1. import pika
  2. def rt_receive():
  3. exchange_name = "rt_default_x" # 主题
  4. routing_key = 'rt_autoidc_queue' # 路由键
  5. # 建立连接
  6. userx = pika.PlainCredentials("admin", "admin") # 账号密码
  7. conn = pika.BlockingConnection(pika.ConnectionParameters("192.168.131.128", 5672, '/', credentials=userx))
  8. # 创建一个信道
  9. channel = conn.channel()
  10. # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
  11. result = channel.queue_declare('', exclusive=True)
  12. # 声明exchange的类型,由exchange指定消息在哪个队列传递,如不存在,则创建,exchange_type表示使用那种工作模式,durable = True代表exchange持久化存储,False 非持久化存储
  13. channel.exchange_declare(exchange=exchange_name, exchange_type='direct', passive=False, durable=True, auto_delete=False)
  14. # 消费者绑定queue,确定订阅主题为rt_autoidc_queue
  15. channel.queue_bind(queue=result.method.queue, exchange=exchange_name, routing_key=routing_key)
  16. # 收到指定消息的回调设置,auto_ack设置成False,在调用callback函数时,未收到确认标识,消息会重回队列。设置成True,无论调用callback成功与否,消息都被消费掉
  17. channel.basic_consume(queue=result.method.queue, # 队列名
  18. on_message_callback=callback, # 收到消息的回调函数
  19. auto_ack=False)
  20. # 开始循环监听
  21. channel.start_consuming()
  22. # 回调函数
  23. def callback(ch, method, properties, body):
  24. print(ch, method, properties, body)
  25. # 是否消费队列,可以添加判断条件
  26. # if models_info == True:
  27. # ch.basic_ack(delivery_tag=method.delivery_tag)
  28. if __name__ == '__main__':
  29. try:
  30. rt_receive()
  31. except KeyboardInterrupt:
  32. print("Quit")

返回数据

  1. <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x0000024125AB3588> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.b5cd70883d414facb51a748df4a8824c', 'delivery_tag=21', 'exchange=rt_default_x', 'redelivered=False', 'routing_key=rt_autoidc_queue'])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100000", "amount": 0, "name": "tony", "createtime": "2021-04-27 21:59:08.161943"}'
  2. <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x0000024125AB3588> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.b5cd70883d414facb51a748df4a8824c', 'delivery_tag=22', 'exchange=rt_default_x', 'redelivered=False', 'routing_key=rt_autoidc_queue'])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100001", "amount": 100, "name": "tony", "createtime": "2021-04-27 21:59:08.199919"}'
  3. <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x0000024125AB3588> params=<ConnectionParameters host=192.168.131.128 port=5672 virtual_host=/ ssl=False>>>> <Basic.Deliver(['consumer_tag=ctag1.b5cd70883d414facb51a748df4a8824c', 'delivery_tag=23', 'exchange=rt_default_x', 'redelivered=False', 'routing_key=rt_autoidc_queue'])> <BasicProperties(['delivery_mode=2'])> b'{"id": "100002", "amount": 200, "name": "tony", "createtime": "2021-04-27 21:59:08.211914"}'

模式三:topicd

这种模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发消息到指定的 queue 。 不同点是 routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,比如“#”是匹配全部,“*”是匹配一个词。
举例:routing_key =“#orderid#”,意思是将消息转发至所有 routing_key 包含 “orderid” 字符的队列中。代码和模式二 类似,就不贴出来了。