启动rabbitMQ虚拟机
dockerfile
# 映射端口
docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq
#每次重启后使用
docker start myrabbitmq
# 切换到docker中cmd
docker exec -it myrabbitmq /bin/bash
docker-compose
划重点:为了实现docker中rabbitmq数据能够正常映射出来,需要注意两个点:
一是需要设置hostname,二是数据都存放在mnesia盘中,所以docker-compose.yml文件要写出如下形式:
version: '2'
services:
rabbitmq:
hostname: 'myrabbit'
image: rabbitmq:3.7.7-management
restart: always
ports:
- "5672:5672"
- "15672:15672"
volumes:
- "./data/rabbitmq:/var/lib/rabbitmq/mnesia"
environment:
- RABBITMQ_DEFAULT_VHOST=/
- RABBITMQ_DEFAULT_USER=test
- RABBITMQ_DEFAULT_PASS=1qaz@WSX
参考资料:
docker 方式下的数据持久化问题
要实现数据持久化,涉及到3个因素:
- docker数据的持久化,可以通过volumes映射盘来解决
- 队列的持久化,重启docker后消息队列还在
- 消息的持久化,重启docker后消息还在
其中消息持久化依赖于队列的持久化
参考资料:
- 示例docker-compose文件 https://stackoverflow.com/questions/51578794/docker-rabbitmq-how-to-expose-port-and-reuse-container-with-a-docker-file
- 队列持久化和消息持久化 https://www.cnblogs.com/jiagoushi/p/8678871.html
启动Web 管理口
rabbitmq自带管理后台,安装后需要配置开启
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl start_app
打开http://localhost:15672/即可看到管理后台
用户名密码均为guest
使用web管理口可以方便检查MQ的状态
工作模式
消费者模式 vs 订阅者模式
https://www.rabbitmq.com/tutorials/tutorial-two-python.html
在rabbitmq的示例中,消费者模式是server直接发送消息到queue中(不指定exchange),多个client绑定queue。
订阅者模式是server发送消息到exchange中,多个client端绑定queue到exchange中,设置了queue 属性排他性exclusive=True(强制订阅,防止出现消费的情况)。
思考1:按照rabbitmq的核心思想,server只和exchange通信,不需要知道存在哪些queue,所以如果采用exchange,如何实现消费者模式?
答案:如果设置exclusive为False,则允许多个client bind到同一个queue,则实现了消费者模式。总结,exchange负责订阅,queue负责分发。
思考2:对于某些复杂情况,同一个数据可能即需要实现订阅(不同模块),又需要实现消费(相同模块),如何实现?
回答:第一种实现方式,是分别实现负责订阅的队列和消费的队列,第二种方式,是按照思考1一样,多个queue bind到exchange上,需要订阅的bind到不同的queue,需要消费的bind到相同的queue。
消费者模式
server.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
client.py
#!/usr/bin/env python
import pika # rabbitmq client
import time
import datetime
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True) # set queue name , set queue durable
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body): # define the func
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag) # send ack back ,rabbitmq server default set On already
channel.basic_qos(prefetch_count=1) # run one task each time
channel.basic_consume(queue='task_queue', on_message_callback=callback) #define witch func to run when get new message
channel.start_consuming()
订阅者模式
publish.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.ex
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
client.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout') # fanout
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
混合模式
场景设计:server发布消息,client分为client1-4,client1和client2为group1,client3和client4为group2,group内部需要实现消费,两个group都需要订阅。
server.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()
client1.py & client2.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout') # fanout
result = channel.queue_declare(queue='test_queue1', exclusive=False)
queue_name = result.method.queue
print(queue_name)
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
client3.py & client4.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout') # fanout
result = channel.queue_declare(queue='test_queue2', exclusive=False)
queue_name = result.method.queue
print(queue_name)
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
setting priority
setting the queue to support priority
server.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
# test property = 1
def low_priority():
message = ' '.join(sys.argv[1:]) or "info: low priority!"
properties = pika.BasicProperties(
priority = 1
)
channel.basic_publish(exchange='logs', routing_key='', body=message, properties=properties)
print(" [x] Sent %r" % message)
# connection.close()
def mid_priority():
message = ' '.join(sys.argv[1:]) or "info: mid priority"
#important to setting message the properties
properties = pika.BasicProperties(
priority = 2
)
channel.basic_publish(exchange='logs', routing_key='', body=message, properties=properties)
print(" [x] Sent %r" % message)
# connection.close()
def high_priority():
message = ' '.join(sys.argv[1:]) or "info: high priority"
properties = pika.BasicProperties(
priority = 3
)
channel.basic_publish(exchange='logs', routing_key='', body=message, properties=properties)
print(" [x] Sent %r" % message)
# connection.close()
for i in range(5):
mid_priority()
for i in range(5):
low_priority()
for i in range(5):
high_priority()
connection.close()
client.py
tips: need set auto_ack =False
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_qos(prefetch_count=1)
# setting the arguments
result = channel.queue_declare(queue='', exclusive=True, arguments={"x-max-priority":10})
# queue_
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
time.sleep(5)
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=False)
channel.start_consuming()
output
$ python test_priority_client.py
[*] Waiting for logs. To exit press CTRL+C
[x] b'info: mid priority'
[x] b'info: high priority'
[x] b'info: high priority'
[x] b'info: high priority'
[x] b'info: high priority'
[x] b'info: high priority'
[x] b'info: mid priority'
[x] b'info: mid priority'
[x] b'info: mid priority'
[x] b'info: mid priority'
[x] b'info: low priority!'
[x] b'info: low priority!'
[x] b'info: low priority!'
[x] b'info: low priority!'
[x] b'info: low priority!'