引言 Celery是一个处理大量消息的分布式系统,用于实时处理异步任务队列,支持任务调度。Celery不提供消息队列的处理,因此可结合中间件(RabbitMQ、Redis)进行使用。处理的结果可以存储在mysql、redis中。
1.概述
1.Celery的优点:
| 优点 | 说明 |
|---|---|
| 简单 | celery使用和维护简单,可以用简单的配置文件 |
| 高可用 | 当worker和client发生【网络连接丢失或失败】——>可以自动进行重试 |
| 快速,处理数量级高 | 单个celery进程: - 每分钟可以处理百万级任务, - (当配置rabbitMQ、librabbitMQ和优化设置时)只需要毫秒级的往返延迟 |
| 灵活 | celery的每个部分度可扩展使用【自定义池使用、序列化、压缩方案、日志记录、调度器、生产者、消费者、broker传输等】 |
2.使用场景:
- 异步任务:将耗时操作任务提交给Celery去异步执行,比如:发送短信、邮件、消息推送、音视频处理等等。
- 定时任务:类似于crontab,比如:每日数据统计。
2.Celery工作架构
2.1 Celery的三大构成
2.2 Celery的工作流
3.Celery的安装
pipenv install celery
1.扩展安装
pipenv install redis pymysql sqlalchemy eventlet flask
【说明】 eventlet :只用于在windows下运行celery
4.【代码实践】Celery最简结构
4.1 【生产者】Celery实例化、创建任务
【前提】请确保已安装”celery redis pymysql sqlalchemy eventlet”
#【celery最简结构组成】from celery import Celery#【消息队列】rabbitMQ方式mq_broker="amqp://user:james加密@124.隐藏.184:5672/"#【消息队列】rabbitMQ方式redis_broker="redis://:james加密@42.隐藏.45:6379/5"#【结果存储】mysql方式mysql_backend="db+mysql+pymysql://admin:James加密@124.隐藏.77.184:3306/mock"#【结果存储】redis方式redis_backend="redis://:james加密@42.隐藏.53.45:6379/6"#【step1】创建celery实例对象celeryApp=Celery("tasks",broker=mq_broker,backend=mysql_backend)# 【step2】创建“异步任务”[words的反向输出]# [name="reverseWords",非必填【作用:别名】@celeryApp.task(name="reverseWords")def resWords(words):print("开始执行words的反向输出")return words[::-1]#【windows】在tasks.py所在的目录执行---> celery -A task workers -l info -P eventlet
| 分类 | 方式 | 语法 | 示例 | 说明 |
|---|---|---|---|---|
| 创建Celery示例 | celeryApp=Celery(‘taskName’,broker,backend) | celeryApp=Celery(“tasks”,broker=mq_broker,backend=mysql_backend) | 必填 broker,backend | |
| broker 【消息队列】 |
rabbitMQ | mq_broker=”amqp://用户名:密码@ip:port/“ | mq_broker=”amqp://user:james加密@124.隐藏.184:5672/“ | 配置broker【消息队列】 |
| Rdies | redis_broker=”redis://:密码@ip:port/redis分区“ | redis_broker=”redis://:james加密@42.隐藏.45:6379/5” | ||
| backend 【任务结果存储】 |
mysql | mysql_backendr=”db+mysql+pymysql://用户名:密码@ip:port/database“ | mysql_backendr=”db+mysql+pymysql://admin:James加密@124.隐藏.77.184:3306/mock” | 配置backend【任务结果存储】 |
| Rdies | redis_backend=”redis://:密码@ip:port/redis分区“ | redis_backend=”redis://:james加密@42.隐藏.53.45:6379/6” | ||
| 异步任务 | @clelery实例装饰器 def 异步任务函数(): 函数体 |
# 【step2】创建“异步任务”[words的反向输出] @celeryApp.task(name=“reverseWords”) def resWords(words): print(“开始执行words的反向输出”) return words[::-1] |
注意 必须使用clelery实例装饰器 | |
| 定时任务 | @clelery实例装饰器 def 定时任务函数(): 函数体 |
暂无,后续补充 | ||
| 启动celery | windows | celery -A task workers -l info** -P eventlet** | 注释【windows】在tasks.py所在的目录执行 celery -A task workers -l info -P eventlet |
注意 windows必须安装使用eventlet【即加参数 -P eventlet】 |
| linux/maxOS | celery -A task workers -l info** ** | 注释【_linux/maxOS】在tasks.py所在的目录执行 celery -A task workers -l info _ |
不需要eventlet |
# -*- coding: utf-8 -*-"""====================================@File Name :tasks.py@Time : 2022/10/30 22:50@Program IDE :PyCharm@Create by Author : 一一Cooling====================================@描述【celery最简结构组成】===================================="""from celery import Celery#【消息队列】rabbitMQ方式mq_broker="amqp://user:隐藏@124.隐藏.77.184:5672/"#【消息队列】rabbitMQ方式redis_broker="redis://:隐藏@42.隐藏.53.45:6379/5"#【结果存储】mysql方式mysql_backend="db+mysql+pymysql://admin:隐藏@124.隐藏.77.184:3306/mock"#【结果存储】redis方式redis_backend="redis://:隐藏@42.隐藏.53.45:6379/6"#【step1】创建celery实例对象celeryApp=Celery("tasks",broker=mq_broker,backend=mysql_backend)# 【step2】使用clelery实例装饰器--》创建“异步任务”[words的反向输出]@celeryApp.task(name="reverseWords")def resWords(words):print("开始执行words的反向输出")return words[::-1]#【windows】在tasks.py所在的目录执行---> celery -A task workers -l info -P eventlet
4.2 启动celery服务
| 分类 | 方式 | 语法 | 示例 | 说明 |
|---|---|---|---|---|
| 启动celery | windows | celery -A task workers -l info** -P eventlet** | 注释【windows】在tasks.py所在的目录执行 celery -A task workers -l info -P eventlet |
注意 windows必须安装使用eventlet【即加参数 -P eventlet】 |
| linux/maxOS | celery -A task workers -l info** ** | 注释【_linux/maxOS】在tasks.py所在的目录执行 celery -A task workers -l info _ |
不需要eventlet |
# -*- coding: utf-8 -*-"""====================================@File Name :runTasks.py@Time : 2022/10/30 23:12@Program IDE :PyCharm@Create by Author : 一一Cooling====================================@描述【启动celery服务】===================================="""import os,sys# 判断当前的OS信息def getOS():myOS = sys.platform# print(myOS)return myOS# 启动--celery-taskdef runTasks():myOS=getOS()win_shell="celery -A tasks worker -l info -P eventlet"linux_shell = "celery -A tasks worker -l info"# print(sy)if (myOS == "win32"):print("[windows]开始celery启动")os.system(win_shell)else:print("[maxOS/Linux]开始celery启动")os.system(linux_shell)if __name__ == '__main__':runTasks()
4.3 【消费者】worker执行task
在celery中,对“task函数(即本文4.1中的 resWords(words))添加delay()”就等同于“task函数交给celery进行调用”。
# -*- coding: utf-8 -*-"""====================================@File Name :callTask.py@Time : 2022/10/30 23:10@Program IDE :PyCharm@Create by Author : 一一Cooling====================================@描述===================================="""#导入celery生产者的“任务”from tasks import resWords#执行:异步任务【resWords()】def call_resWords():myWords = "hello celery-windows"# 异步任务函数【resWords】使用delay()--->转化为celery可以调用result = resWords.delay(myWords)# 输出:task结果idprint("[taskID]:\t", result.id)if __name__ == '__main__':call_resWords()
4.3.1 任务结果—常见属性
前提 res=taskFunction.delay()
| 属性 | 说明 | 示例 |
|---|---|---|
| id | 获取task的id | resID=res.id |
| ready() | 检测是否已经处理完毕 返回值 True、False |
res_ready=res.ready() |
| get() | res_get=resget() |
4.4 执行结果
结果 异步任务执行成功,且任务结果成功写入mysql
