启动rabbitMQ虚拟机

dockerfile

  1. # 映射端口
  2. docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq
  3. #每次重启后使用
  4. docker start myrabbitmq
  5. # 切换到docker中cmd
  6. docker exec -it myrabbitmq /bin/bash

docker-compose

划重点:为了实现docker中rabbitmq数据能够正常映射出来,需要注意两个点:
一是需要设置hostname,二是数据都存放在mnesia盘中,所以docker-compose.yml文件要写出如下形式:

  1. version: '2'
  2. services:
  3. rabbitmq:
  4. hostname: 'myrabbit'
  5. image: rabbitmq:3.7.7-management
  6. restart: always
  7. ports:
  8. - "5672:5672"
  9. - "15672:15672"
  10. volumes:
  11. - "./data/rabbitmq:/var/lib/rabbitmq/mnesia"
  12. environment:
  13. - RABBITMQ_DEFAULT_VHOST=/
  14. - RABBITMQ_DEFAULT_USER=test
  15. - RABBITMQ_DEFAULT_PASS=1qaz@WSX

参考资料:


docker 方式下的数据持久化问题

要实现数据持久化,涉及到3个因素:

  1. docker数据的持久化,可以通过volumes映射盘来解决
  2. 队列的持久化,重启docker后消息队列还在
  3. 消息的持久化,重启docker后消息还在

其中消息持久化依赖于队列的持久化

参考资料:

启动Web 管理口

rabbitmq自带管理后台,安装后需要配置开启

  1. rabbitmq-plugins enable rabbitmq_management
  2. rabbitmqctl start_app

打开http://localhost:15672/即可看到管理后台
用户名密码均为guest
使用web管理口可以方便检查MQ的状态

工作模式

消费者模式 vs 订阅者模式

https://www.rabbitmq.com/tutorials/tutorial-two-python.html

在rabbitmq的示例中,消费者模式是server直接发送消息到queue中(不指定exchange),多个client绑定queue。
订阅者模式是server发送消息到exchange中,多个client端绑定queue到exchange中,设置了queue 属性排他性exclusive=True(强制订阅,防止出现消费的情况)。

思考1:按照rabbitmq的核心思想,server只和exchange通信,不需要知道存在哪些queue,所以如果采用exchange,如何实现消费者模式?

答案:如果设置exclusive为False,则允许多个client bind到同一个queue,则实现了消费者模式。总结,exchange负责订阅,queue负责分发。

思考2:对于某些复杂情况,同一个数据可能即需要实现订阅(不同模块),又需要实现消费(相同模块),如何实现?

回答:第一种实现方式,是分别实现负责订阅的队列和消费的队列,第二种方式,是按照思考1一样,多个queue bind到exchange上,需要订阅的bind到不同的queue,需要消费的bind到相同的queue。

消费者模式

server.py

  1. #!/usr/bin/env python
  2. import pika
  3. import sys
  4. connection = pika.BlockingConnection(
  5. pika.ConnectionParameters(host='localhost'))
  6. channel = connection.channel()
  7. channel.queue_declare(queue='task_queue', durable=True)
  8. message = ' '.join(sys.argv[1:]) or "Hello World!"
  9. channel.basic_publish(
  10. exchange='',
  11. routing_key='task_queue',
  12. body=message,
  13. properties=pika.BasicProperties(
  14. delivery_mode=2, # make message persistent
  15. ))
  16. print(" [x] Sent %r" % message)
  17. connection.close()

client.py

  1. #!/usr/bin/env python
  2. import pika # rabbitmq client
  3. import time
  4. import datetime
  5. connection = pika.BlockingConnection(
  6. pika.ConnectionParameters(host='localhost'))
  7. channel = connection.channel()
  8. channel.queue_declare(queue='task_queue', durable=True) # set queue name , set queue durable
  9. print(' [*] Waiting for messages. To exit press CTRL+C')
  10. def callback(ch, method, properties, body): # define the func
  11. print(" [x] Received %r" % body)
  12. time.sleep(body.count(b'.'))
  13. print(" [x] Done")
  14. ch.basic_ack(delivery_tag=method.delivery_tag) # send ack back ,rabbitmq server default set On already
  15. channel.basic_qos(prefetch_count=1) # run one task each time
  16. channel.basic_consume(queue='task_queue', on_message_callback=callback) #define witch func to run when get new message
  17. channel.start_consuming()

订阅者模式

publish.py

  1. #!/usr/bin/env python
  2. import pika
  3. import sys
  4. connection = pika.BlockingConnection(
  5. pika.ConnectionParameters(host='localhost'))
  6. channel = connection.channel()
  7. channel.exchange_declare(exchange='logs', exchange_type='fanout')
  8. channel.ex
  9. message = ' '.join(sys.argv[1:]) or "info: Hello World!"
  10. channel.basic_publish(exchange='logs', routing_key='', body=message)
  11. print(" [x] Sent %r" % message)
  12. connection.close()

client.py

  1. #!/usr/bin/env python
  2. import pika
  3. connection = pika.BlockingConnection(
  4. pika.ConnectionParameters(host='localhost'))
  5. channel = connection.channel()
  6. channel.exchange_declare(exchange='logs', exchange_type='fanout') # fanout
  7. result = channel.queue_declare(queue='', exclusive=True)
  8. queue_name = result.method.queue
  9. channel.queue_bind(exchange='logs', queue=queue_name)
  10. print(' [*] Waiting for logs. To exit press CTRL+C')
  11. def callback(ch, method, properties, body):
  12. print(" [x] %r" % body)
  13. channel.basic_consume(
  14. queue=queue_name, on_message_callback=callback, auto_ack=True)
  15. channel.start_consuming()

混合模式

场景设计:server发布消息,client分为client1-4,client1和client2为group1,client3和client4为group2,group内部需要实现消费,两个group都需要订阅。

server.py

  1. #!/usr/bin/env python
  2. import pika
  3. import sys
  4. connection = pika.BlockingConnection(
  5. pika.ConnectionParameters(host='localhost'))
  6. channel = connection.channel()
  7. channel.exchange_declare(exchange='logs', exchange_type='fanout')
  8. message = ' '.join(sys.argv[1:]) or "info: Hello World!"
  9. channel.basic_publish(exchange='logs', routing_key='', body=message)
  10. print(" [x] Sent %r" % message)
  11. connection.close()

client1.py & client2.py

  1. #!/usr/bin/env python
  2. import pika
  3. connection = pika.BlockingConnection(
  4. pika.ConnectionParameters(host='localhost'))
  5. channel = connection.channel()
  6. channel.exchange_declare(exchange='logs', exchange_type='fanout') # fanout
  7. result = channel.queue_declare(queue='test_queue1', exclusive=False)
  8. queue_name = result.method.queue
  9. print(queue_name)
  10. channel.queue_bind(exchange='logs', queue=queue_name)
  11. print(' [*] Waiting for logs. To exit press CTRL+C')
  12. def callback(ch, method, properties, body):
  13. print(" [x] %r" % body)
  14. channel.basic_consume(
  15. queue=queue_name, on_message_callback=callback, auto_ack=True)
  16. channel.start_consuming()

client3.py & client4.py

  1. #!/usr/bin/env python
  2. import pika
  3. connection = pika.BlockingConnection(
  4. pika.ConnectionParameters(host='localhost'))
  5. channel = connection.channel()
  6. channel.exchange_declare(exchange='logs', exchange_type='fanout') # fanout
  7. result = channel.queue_declare(queue='test_queue2', exclusive=False)
  8. queue_name = result.method.queue
  9. print(queue_name)
  10. channel.queue_bind(exchange='logs', queue=queue_name)
  11. print(' [*] Waiting for logs. To exit press CTRL+C')
  12. def callback(ch, method, properties, body):
  13. print(" [x] %r" % body)
  14. channel.basic_consume(
  15. queue=queue_name, on_message_callback=callback, auto_ack=True)
  16. channel.start_consuming()

setting priority

setting the queue to support priority

server.py

  1. #!/usr/bin/env python
  2. import pika
  3. import sys
  4. connection = pika.BlockingConnection(
  5. pika.ConnectionParameters(host='localhost'))
  6. channel = connection.channel()
  7. channel.exchange_declare(exchange='logs', exchange_type='fanout')
  8. # message = ' '.join(sys.argv[1:]) or "info: Hello World!"
  9. # test property = 1
  10. def low_priority():
  11. message = ' '.join(sys.argv[1:]) or "info: low priority!"
  12. properties = pika.BasicProperties(
  13. priority = 1
  14. )
  15. channel.basic_publish(exchange='logs', routing_key='', body=message, properties=properties)
  16. print(" [x] Sent %r" % message)
  17. # connection.close()
  18. def mid_priority():
  19. message = ' '.join(sys.argv[1:]) or "info: mid priority"
  20. #important to setting message the properties
  21. properties = pika.BasicProperties(
  22. priority = 2
  23. )
  24. channel.basic_publish(exchange='logs', routing_key='', body=message, properties=properties)
  25. print(" [x] Sent %r" % message)
  26. # connection.close()
  27. def high_priority():
  28. message = ' '.join(sys.argv[1:]) or "info: high priority"
  29. properties = pika.BasicProperties(
  30. priority = 3
  31. )
  32. channel.basic_publish(exchange='logs', routing_key='', body=message, properties=properties)
  33. print(" [x] Sent %r" % message)
  34. # connection.close()
  35. for i in range(5):
  36. mid_priority()
  37. for i in range(5):
  38. low_priority()
  39. for i in range(5):
  40. high_priority()
  41. connection.close()

client.py

tips: need set auto_ack =False

  1. #!/usr/bin/env python
  2. import pika
  3. import time
  4. connection = pika.BlockingConnection(
  5. pika.ConnectionParameters(host='localhost'))
  6. channel = connection.channel()
  7. channel.exchange_declare(exchange='logs', exchange_type='fanout')
  8. channel.basic_qos(prefetch_count=1)
  9. # setting the arguments
  10. result = channel.queue_declare(queue='', exclusive=True, arguments={"x-max-priority":10})
  11. # queue_
  12. queue_name = result.method.queue
  13. channel.queue_bind(exchange='logs', queue=queue_name)
  14. print(' [*] Waiting for logs. To exit press CTRL+C')
  15. def callback(ch, method, properties, body):
  16. print(" [x] %r" % body)
  17. time.sleep(5)
  18. channel.basic_ack(delivery_tag=method.delivery_tag)
  19. channel.basic_consume(
  20. queue=queue_name, on_message_callback=callback, auto_ack=False)
  21. channel.start_consuming()

output

  1. $ python test_priority_client.py
  2. [*] Waiting for logs. To exit press CTRL+C
  3. [x] b'info: mid priority'
  4. [x] b'info: high priority'
  5. [x] b'info: high priority'
  6. [x] b'info: high priority'
  7. [x] b'info: high priority'
  8. [x] b'info: high priority'
  9. [x] b'info: mid priority'
  10. [x] b'info: mid priority'
  11. [x] b'info: mid priority'
  12. [x] b'info: mid priority'
  13. [x] b'info: low priority!'
  14. [x] b'info: low priority!'
  15. [x] b'info: low priority!'
  16. [x] b'info: low priority!'
  17. [x] b'info: low priority!'