一、简介:

RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件。消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取 完成通信。而作为中间件的 RabbitMq 无疑是目前最流行的消息队列之一。
RabbitMq 应用场景广泛:
系统的高可用
分布式系统,集成系统,子系统之间的对接,以及架构设计中常常需要考虑消息队列的应用。

二、RabbitMq 生产和消费

生产者(producter):队列消息的产生者,负责生产消息,并将消息传入队列
消费者(consumer):队列消息的接收者,负责 接收并处理 消息队列中的消息

  1. import pika
  2. import json
  3. # mq用户名和密码
  4. credentials = pika.PlainCredentials('admin', 'admin')
  5. # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
  6. connection = pika.BlockingConnection(
  7. pika.ConnectionParameters(host='192.168.9.252', port=5672, virtual_host='/', credentials=credentials))
  8. channel = connection.channel()
  9. # 声明消息队列,消息将在这个队列传递,如不存在,则创建
  10. result = channel.queue_declare(queue='python-test')
  11. for i in range(10):
  12. message = json.dumps({'OrderId': "1000%s" % i})
  13. # 向队列插入数值 routing_key是队列名
  14. channel.basic_publish(exchange='', routing_key='python-test', body=message)
  15. print(message)
  16. connection.close()
  1. import pika
  2. credentials = pika.PlainCredentials('admin', 'admin')
  3. connection = pika.BlockingConnection(
  4. pika.ConnectionParameters(host='192.168.9.252', port=5672, virtual_host='/', credentials=credentials))
  5. channel = connection.channel()
  6. # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
  7. channel.queue_declare(queue='python-test', durable=False)
  8. # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
  9. def callback(ch, method, properties, body):
  10. ch.basic_ack(delivery_tag=method.delivery_tag)
  11. print(body.decode())
  12. # 告诉rabbitmq,用callback来接收消息
  13. channel.basic_consume('python-test', callback)
  14. # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
  15. channel.start_consuming()

三、RabbitMq 持久化

MQ默认建立的是临时 queue 和 exchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。所以我们一般在创建 queue 或者 exchange 的时候会声明 持久化。
1.queue 声明持久化

  1. # 声明消息队列,消息将在这个队列传递,如不存在,则创建。
  2. #durable = True 代表消息队列持久化存储,False 非持久化存储
  3. result = channel.queue_declare(queue = 'python-test',durable = True)

2.exchange 声明持久化

  1. # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.
  2. #durable = True 代表exchange持久化存储,False 非持久化存储
  3. channel.exchange_declare(exchange = 'python-test', durable = True)

注意:如果已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,因为当前状态不能更改 queue 或 exchange 存储属性,需要删除重建。如果 queue 和 exchange 中一个声明了持久化,另一个没有声明持久化,则不允许绑定。
3.消息持久化
虽然 exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。

  1. # 向队列插入数值 routing_key是队列名。
  2. #delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
  3. channel.basic_publish(exchange = '',routing_key = 'python-test',body = message,
  4. properties=pika.BasicProperties(delivery_mode = 2))

4.acknowledgement 消息不丢失
消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给 rabbitmq ,重新再被消费者消费,这个时候需要设置确认标识。

  1. # no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。
  2. #True,无论调用callback成功与否,消息都被消费掉
  3. channel.basic_consume(callback,queue = 'python-test',no_ack = False)

三、RabbitMq 发布与订阅

rabbitmq 的发布与订阅要借助交换机(Exchange)的原理实现:
image.png
Exchange 一共有三种工作模式:fanout, direct, topicd

模式一:fanout

广播模式
这种模式下,传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。

  • 不需要指定 routing_key ,即使指定了也是无效。
  • 需要提前将 exchange 和 queue 绑定,一个 exchange 可以绑定多个 queue,一个queue可以绑定多个exchange。
  • 需要先启动 订阅者,此模式下的队列是 consumer 随机生成的,发布者 仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。

image.png

  1. #生产者
  2. import pika
  3. import json
  4. credentials = pika.PlainCredentials('admin', 'admin') # mq用户名和密码
  5. # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
  6. connection = pika.BlockingConnection(
  7. pika.ConnectionParameters(host='192.168.9.252', port=5672, virtual_host='/', credentials=credentials))
  8. channel = connection.channel()
  9. # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
  10. channel.exchange_declare(exchange='python-test', durable=True, exchange_type='fanout')
  11. for i in range(10):
  12. message = json.dumps({'fanout-OrderId': "1000%s" % i})
  13. # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置
  14. channel.basic_publish(exchange='python-test', routing_key='', body=message,
  15. properties=pika.BasicProperties(delivery_mode=2))
  16. print(message)
  17. connection.close()
  1. #消费者
  2. import pika
  3. credentials = pika.PlainCredentials('admin', 'admin')
  4. connection = pika.BlockingConnection(
  5. pika.ConnectionParameters(host='192.168.9.252', port=5672, virtual_host='/', credentials=credentials))
  6. channel = connection.channel()
  7. #1.创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
  8. result = channel.queue_declare('', exclusive=True)
  9. #2.声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
  10. channel.exchange_declare(exchange='python-test', durable=True, exchange_type='fanout')
  11. #3.绑定exchange和队列 exchange 使我们能够确切地指定消息应该到哪个队列去
  12. channel.queue_bind(exchange='python-test', queue=result.method.queue)
  13. # 定义一个回调函数来处理消息队列中的消息,这里是打印出来
  14. def callback(ch, method, properties, body):
  15. ch.basic_ack(delivery_tag=method.delivery_tag)
  16. print(body.decode())
  17. # 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
  18. channel.basic_consume(result.method.queue, callback, auto_ack=False)
  19. channel.start_consuming()

模式二:direct

image.png
这种工作模式的原理是 消息发送至 exchange,exchange 根据 路由键(routing_key)转发到相对应的 queue 上。

  • 可以使用默认 exchange =’ ‘ ,也可以自定义 exchange
  • 这种模式下不需要将 exchange 和 任何进行绑定,当然绑定也是可以的。可以将 exchange 和 queue ,routing_key 和 queue 进行绑定
  • 传递或接受消息时 需要 指定 routing_key
  • 需要先启动 订阅者,此模式下的队列是 consumer 随机生成的,发布者 仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。 ```python

    生产者

    import pika import json

credentials = pika.PlainCredentials(‘admin’, ‘admin’) # mq用户名和密码

虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。

connection = pika.BlockingConnection( pika.ConnectionParameters(host=’192.168.9.252’, port=5672, virtual_host=’/‘, credentials=credentials)) channel = connection.channel()

声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储

channel.exchange_declare(exchange=’python-test’, durable=True, exchange_type=’direct’)

for i in range(10): message = json.dumps({‘direct-OrderId’: “1000%s” % i})

  1. # 指定 routing_key。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
  2. channel.basic_publish(exchange='python-test', routing_key='OrderId', body=message,
  3. properties=pika.BasicProperties(delivery_mode=2))
  4. print(message)

connection.close()

```python
#消费者
import pika

credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='192.168.9.252', port=5672, virtual_host='/', credentials=credentials))
channel = connection.channel()
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare('', exclusive=True)
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange='python-test', durable=True, exchange_type='direct')
# 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去
channel.queue_bind(exchange='python-test', queue=result.method.queue, routing_key='OrderId')


# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())

# channel.basic_qos(prefetch_count=1)
# 告诉rabbitmq,用callback来接受消息
# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
channel.basic_consume(result.method.queue, callback,auto_ack=False)
channel.start_consuming()

模式三:模糊匹配(topic)模式

这种模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发消息到指定的 queue 。 不同点是 routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,比如“#”是匹配全部,“”是匹配一个词。
image.png
**模糊匹配就是在关键字模式上又做了个升级,即关键字不写死,两方都可以通过通配符来设置routing_key的值。#匹配0个或多个单词,
只匹配一个单词,a和abc都叫做一个单词**

#生产者
import pika
import json

credentials = pika.PlainCredentials('admin', 'admin')  # mq用户名和密码
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='192.168.9.252', port=5672, virtual_host='/', credentials=credentials))
channel = connection.channel()
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange='python-test', durable=True, exchange_type='topic')

for i in range(10):
    message = json.dumps({'topic-OrderId': "1000%s" % i})
    # 指定 routing_key。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化
    #向交换机发送数据,让交换机只给能匹配zhw.OrderId.*的队列发消息
    channel.basic_publish(exchange='python-test', routing_key='zhw.OrderId.*', body=message,
                          properties=pika.BasicProperties(delivery_mode=2))
    print(message)
connection.close()
#消费者
import pika

credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='192.168.9.252', port=5672, virtual_host='/', credentials=credentials))
channel = connection.channel()
# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除
result = channel.queue_declare('', exclusive=True)
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储
channel.exchange_declare(exchange='python-test', durable=True, exchange_type='topic')
# 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去
#routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,比如“#”是匹配全部,“*”是匹配一个词。
channel.queue_bind(exchange='python-test', queue=result.method.queue, routing_key='zhw.*.#')


# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(body.decode())

# channel.basic_qos(prefetch_count=1)
# 告诉rabbitmq,用callback来接受消息
# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
channel.basic_consume(result.method.queue, callback,auto_ack=False)
channel.start_consuming()

exchange模式和非exchange模式对比:

非exchange模式:
简单模式是一对一,一个消费者监听一个队列。Work模式是一对多,多个消费者监听同一个队列.
缺点:
生产者的所有消息全堆积到同一个队列中,没有做消息分类.
exchange模式或交换机模式:
fanout,direct和topic,该模式下的每个消费者都有自己创建的队列,采用三种方式中的任意一种来绑定交换机,再由交换机分配消息给这些队列。exchange模式除了可以应对多个消费者之外,还可以应对消息多样化,因为MQ不知道这个消息到底分给哪个消费者来做。比如一个项目里面,有发送邮件,有发送支付短信,有赠送优惠券,这就是三类消息,使用exchange模式就很好应对,生产者和消费者两端商量好,双方都用send_email作为关键字,来表明这个是发邮件的消息,那交换机就会按照send_email去找队列,就完成了该队列只用于存放邮箱地址。其他类的消息双方又商量同时用另外一个关键字。应用场景最多的就是发布订阅和关键字模式。

RabbitMQ实现RPC模式

image.png
RPC——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议,和RabbitMQ没有必然关系,RPC可以基于tcp或http,http是基于tcp的,RPC直接工作在会话层。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使分布式系统中的应用程序通信更加容易,RPC采用C/S模式。
客户端创建一个临时队列等待服务端的响应:
image.png
服务端创建一个对象,使客户端监听获取返回的数据
image.png
客户端:

import pika
# 用于生成请求的唯一标识correlation_id
import uuid

class RpcClient(object):

    def __init__(self):
        # 连接rabbitmq服务器
        credentials = pika.PlainCredentials('admin', 'admin')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.9.252', port=5672, virtual_host='/', credentials=credentials))
        self.channel = self.connection.channel()
        # 创建随机回调队列,你不随机也是可以的,反正必要要传过去
        ##声明队列,生成一个随机的且不存在的队列,该队列会在连接断开后自动销毁
        result = self.channel.queue_declare(queue='client-queue',exclusive=True)
        # 拿到这个随机队列名
        self.callback_queue = result.method.queue
        # 监听这个回调队列,一旦有响应结果就促发回调on_response(就是为了对比id)
        self.channel.basic_consume(queue=self.callback_queue,auto_ack=True,on_message_callback=self.on_response)
    # 对比id确定这个结果确实是我的响应结果
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        # 用于生成请求的唯一标识
        self.corr_id = str(uuid.uuid4())
        # 向rpc_queue队列中塞消息body,并添加reply_to和correlation_id两个属性
        self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to=self.callback_queue,correlation_id=self.corr_id,),body=str(n))
        while self.response is None:
            # 防止连接自动断开,消费者主线程定时发心跳交互,耗时较长的消息消费
            self.connection.process_data_events()
        return str(self.response)

rpc = RpcClient()
response = rpc.call(2)
print("客户端已发出RPC请求,客户端这边传了个20给服务端,想要调用服务器端的fun函数")
print("客户端拿到本次RPC请求的响应结果:{}".format(response))

服务端:

import pika

# 连接rabbitmq服务器
credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='192.168.9.252', port=5672, virtual_host='/', credentials=credentials))
channel = connection.channel()

# 创建rpc_queue队列
channel.queue_declare(queue='rpc_queue')

# 这个fun就是我们远程要调用的这么一个简单的接口
def fun(n):
    return 100 * n

# body就是来自客户端塞进队列的消息,props就是来自客户端properties里的两个键值对
def on_request(ch, method, props, body):
    n = int(body)
    response = fun(n)
    # 向接收到的props.reply_to队列塞进响应结果response
    ch.basic_publish(exchange='',routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id),body=str(response))
    print("服务器端已经把响应结果放进客户端的回调队列了,结果是:{}".format(response))
    # 通知MQ这条消息对应处理成功,可以删除这条消息了
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 消费者不止这一个时,谁先处理完谁就去消息队列取,这句话最好在每个消费者端都加上,这儿服务器端同样也加上
channel.basic_qos(prefetch_count=1)
# 监听rpc_queue队列,一收到来自客户端的消息则促发回调on_request
channel.basic_consume('rpc_queue', on_request)
print("服务器端正在等待客户端往rpc_queue放消息......")
channel.start_consuming()