1 简介
RabbitMq 是用Erlang 语言编写的, 实现了高级消息队列协议(AMQP)的开源的消息队列中间件。
优点:
- RabbitMQ 是一个相当轻量级的消息队列,非常容易部署和使用。
 - RabbitMQ 一个比较有特色的功能是支持非常灵活的路由配置,和其他消息队列不同的是,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块,根据配置的路由规则将生产者发出的消息分发到不同的队列中。路由的规则也非常灵活,甚至你可以自己来实现路由规则。
 
缺点:
- RabbitMQ 对消息堆积的支持并不好, 当大量消息堆积时, 会导致RabbitMQ的性能急剧下降
 - 性能比 RocketMQ, Kafka 差得多
2 安装RabbitMQ
(1) docker安装RabbitMQ
注: 安装 附带 管理端的镜像sudo docker pull rabbitmq:3.8-management
 


创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin admin), 指定RabbitMQ可以使用95%的内存(默认是40%)( -e RABBITMQ_VM_MEMORY_HIGH_WATERMARK=0.95)
sudo docker run -dit —name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:3.8-management
输入http://127.0.0.1:15672/, 进入管理后台页面
(2) pip安装pika模块
pika用来连接RabbitMQ, 并把消息发送给RabbitMQ
pip install pika
查看rabbitMQ日志
cat /var/log/rabbitmq/log/crash.log
3 配置rabbitMQ(不需要)
# 添加用户跟密码rabbitmqctl add_user ws qwe112233# 添加虚拟主机rabbitmqctl add_vhost vh# 为用户添加标签rabbitmqctl set_user_tags ws tag1# 设置用户权限, 具有vh这个虚拟主机中所有资源的 配置、写、读权限rabbitmqctl set_permissions -p vh ws ".*" ".*" ".*"
4 代码示例
(1) settings.py
class Config:# JWT密钥JWT_SECRET = 'TPmi4aLWRbyVq8zu9v82dWYW17/z+UvRnYTt4P6fAXA'REDIS_HOST = "r-2ze38obvu9rckcaqeypd.redis.rds.aliyuncs.com"REDIS_PORT = 6379# Celery RabbitMQ amqp://用户名:密码@localhost/虚拟主机CELERY_BROKER_URL = 'amqp://admin:admin@localhost/'
(2) manage.py
from celery import Celeryfrom settings import Configdef create_app():pymysql.install_as_MySQLdb()app = Flask(__name__)# 加载配置app.config.from_object(Config)def make_celery(app):celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])celery.conf.update(app.config)class ContextTask(celery.Task):def __call__(self, *args, **kwargs):with app.app_context():return self.run(*args, **kwargs)celery.Task = ContextTaskreturn celeryapp = create_app()celery = make_celery(app)
(3) async_tasks.py
from celery import current_app as celeryfrom models import OctData, CameraDatafrom utils.oss_tool import fs@celery.taskdef upload_oct(user_pk: str, file_name: str, file_content: str):try:# 先把str转回bytesfile_content = bytes.fromhex(file_content)# 设置状态为上传中OctData.update(status='上传中').where(OctData.oct_file_name == file_name).execute()# 核心: 上传到OSSret = fs.file_upload(user_pk=str(user_pk), category='oct_data', file_name=file_name,file_content=file_content)# 设置oct文件url和状态OctData.update(status='上传成功', oct_file_url=ret['file_url']).where(OctData.oct_file_name == file_name).execute()except:OctData.update(status='上传失败', oct_file_name=None).where(OctData.oct_file_name == file_name).execute()
(4) 运行celery
celery -A manage.celery worker -l info

