1 简介

RabbitMq 是用Erlang 语言编写的, 实现了高级消息队列协议(AMQP)的开源的消息队列中间件。
优点:

  • RabbitMQ 是一个相当轻量级的消息队列,非常容易部署和使用。
  • RabbitMQ 一个比较有特色的功能是支持非常灵活的路由配置,和其他消息队列不同的是,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块,根据配置的路由规则将生产者发出的消息分发到不同的队列中。路由的规则也非常灵活,甚至你可以自己来实现路由规则。

缺点:

  • RabbitMQ 对消息堆积的支持并不好, 当大量消息堆积时, 会导致RabbitMQ的性能急剧下降
  • 性能比 RocketMQ, Kafka 差得多

    2 安装RabbitMQ

    (1) docker安装RabbitMQ

    注: 安装 附带 管理端的镜像

    sudo docker pull rabbitmq:3.8-management

image.png
image.png
创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin admin), 指定RabbitMQ可以使用95%的内存(默认是40%)( -e RABBITMQ_VM_MEMORY_HIGH_WATERMARK=0.95)

sudo docker run -dit —name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:3.8-management

输入http://127.0.0.1:15672/, 进入管理后台页面
image.png

(2) pip安装pika模块

pika用来连接RabbitMQ, 并把消息发送给RabbitMQ

pip install pika

查看rabbitMQ日志

cat /var/log/rabbitmq/log/crash.log

3 配置rabbitMQ(不需要)

  1. # 添加用户跟密码
  2. rabbitmqctl add_user ws qwe112233
  3. # 添加虚拟主机
  4. rabbitmqctl add_vhost vh
  5. # 为用户添加标签
  6. rabbitmqctl set_user_tags ws tag1
  7. # 设置用户权限, 具有vh这个虚拟主机中所有资源的 配置、写、读权限
  8. rabbitmqctl set_permissions -p vh ws ".*" ".*" ".*"

image.png

4 代码示例

(1) settings.py

  1. class Config:
  2. # JWT密钥
  3. JWT_SECRET = 'TPmi4aLWRbyVq8zu9v82dWYW17/z+UvRnYTt4P6fAXA'
  4. REDIS_HOST = "r-2ze38obvu9rckcaqeypd.redis.rds.aliyuncs.com"
  5. REDIS_PORT = 6379
  6. # Celery RabbitMQ amqp://用户名:密码@localhost/虚拟主机
  7. CELERY_BROKER_URL = 'amqp://admin:admin@localhost/'

(2) manage.py

  1. from celery import Celery
  2. from settings import Config
  3. def create_app():
  4. pymysql.install_as_MySQLdb()
  5. app = Flask(__name__)
  6. # 加载配置
  7. app.config.from_object(Config)
  8. def make_celery(app):
  9. celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
  10. celery.conf.update(app.config)
  11. class ContextTask(celery.Task):
  12. def __call__(self, *args, **kwargs):
  13. with app.app_context():
  14. return self.run(*args, **kwargs)
  15. celery.Task = ContextTask
  16. return celery
  17. app = create_app()
  18. celery = make_celery(app)

(3) async_tasks.py

  1. from celery import current_app as celery
  2. from models import OctData, CameraData
  3. from utils.oss_tool import fs
  4. @celery.task
  5. def upload_oct(user_pk: str, file_name: str, file_content: str):
  6. try:
  7. # 先把str转回bytes
  8. file_content = bytes.fromhex(file_content)
  9. # 设置状态为上传中
  10. OctData.update(status='上传中').where(OctData.oct_file_name == file_name).execute()
  11. # 核心: 上传到OSS
  12. ret = fs.file_upload(user_pk=str(user_pk), category='oct_data', file_name=file_name,
  13. file_content=file_content)
  14. # 设置oct文件url和状态
  15. OctData.update(status='上传成功', oct_file_url=ret['file_url']).where(
  16. OctData.oct_file_name == file_name).execute()
  17. except:
  18. OctData.update(status='上传失败', oct_file_name=None).where(OctData.oct_file_name == file_name).execute()

(4) 运行celery

celery -A manage.celery worker -l info

image.png