1 简介
RabbitMq 是用Erlang 语言编写的, 实现了高级消息队列协议(AMQP)的开源的消息队列中间件。
优点:
- RabbitMQ 是一个相当轻量级的消息队列,非常容易部署和使用。
- RabbitMQ 一个比较有特色的功能是支持非常灵活的路由配置,和其他消息队列不同的是,它在生产者(Producer)和队列(Queue)之间增加了一个 Exchange 模块,根据配置的路由规则将生产者发出的消息分发到不同的队列中。路由的规则也非常灵活,甚至你可以自己来实现路由规则。
缺点:
- RabbitMQ 对消息堆积的支持并不好, 当大量消息堆积时, 会导致RabbitMQ的性能急剧下降
- 性能比 RocketMQ, Kafka 差得多
2 安装RabbitMQ
(1) docker安装RabbitMQ
注: 安装 附带 管理端的镜像sudo docker pull rabbitmq:3.8-management
创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin admin), 指定RabbitMQ可以使用95%的内存(默认是40%)( -e RABBITMQ_VM_MEMORY_HIGH_WATERMARK=0.95)
sudo docker run -dit —name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:3.8-management
输入http://127.0.0.1:15672/, 进入管理后台页面
(2) pip安装pika模块
pika用来连接RabbitMQ, 并把消息发送给RabbitMQ
pip install pika
查看rabbitMQ日志
cat /var/log/rabbitmq/log/crash.log
3 配置rabbitMQ(不需要)
# 添加用户跟密码
rabbitmqctl add_user ws qwe112233
# 添加虚拟主机
rabbitmqctl add_vhost vh
# 为用户添加标签
rabbitmqctl set_user_tags ws tag1
# 设置用户权限, 具有vh这个虚拟主机中所有资源的 配置、写、读权限
rabbitmqctl set_permissions -p vh ws ".*" ".*" ".*"
4 代码示例
(1) settings.py
class Config:
# JWT密钥
JWT_SECRET = 'TPmi4aLWRbyVq8zu9v82dWYW17/z+UvRnYTt4P6fAXA'
REDIS_HOST = "r-2ze38obvu9rckcaqeypd.redis.rds.aliyuncs.com"
REDIS_PORT = 6379
# Celery RabbitMQ amqp://用户名:密码@localhost/虚拟主机
CELERY_BROKER_URL = 'amqp://admin:admin@localhost/'
(2) manage.py
from celery import Celery
from settings import Config
def create_app():
pymysql.install_as_MySQLdb()
app = Flask(__name__)
# 加载配置
app.config.from_object(Config)
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
app = create_app()
celery = make_celery(app)
(3) async_tasks.py
from celery import current_app as celery
from models import OctData, CameraData
from utils.oss_tool import fs
@celery.task
def upload_oct(user_pk: str, file_name: str, file_content: str):
try:
# 先把str转回bytes
file_content = bytes.fromhex(file_content)
# 设置状态为上传中
OctData.update(status='上传中').where(OctData.oct_file_name == file_name).execute()
# 核心: 上传到OSS
ret = fs.file_upload(user_pk=str(user_pk), category='oct_data', file_name=file_name,
file_content=file_content)
# 设置oct文件url和状态
OctData.update(status='上传成功', oct_file_url=ret['file_url']).where(
OctData.oct_file_name == file_name).execute()
except:
OctData.update(status='上传失败', oct_file_name=None).where(OctData.oct_file_name == file_name).execute()
(4) 运行celery
celery -A manage.celery worker -l info