引言 Celery是一个处理大量消息的分布式系统,用于实时处理异步任务队列,支持任务调度。Celery不提供消息队列的处理,因此可结合中间件(RabbitMQ、Redis)进行使用。处理的结果可以存储在mysql、redis中。
1.概述
1.Celery的优点:
优点 | 说明 |
---|---|
简单 | celery使用和维护简单,可以用简单的配置文件 |
高可用 | 当worker和client发生【网络连接丢失或失败】——>可以自动进行重试 |
快速,处理数量级高 | 单个celery进程: - 每分钟可以处理百万级任务, - (当配置rabbitMQ、librabbitMQ和优化设置时)只需要毫秒级的往返延迟 |
灵活 | celery的每个部分度可扩展使用【自定义池使用、序列化、压缩方案、日志记录、调度器、生产者、消费者、broker传输等】 |
2.使用场景:
- 异步任务:将耗时操作任务提交给Celery去异步执行,比如:发送短信、邮件、消息推送、音视频处理等等。
- 定时任务:类似于crontab,比如:每日数据统计。
2.Celery工作架构
2.1 Celery的三大构成
2.2 Celery的工作流
3.Celery的安装
pipenv install celery
1.扩展安装
pipenv install redis pymysql sqlalchemy eventlet flask
【说明】 eventlet :只用于在windows下运行celery
4.【代码实践】Celery最简结构
4.1 【生产者】Celery实例化、创建任务
【前提】请确保已安装”celery redis pymysql sqlalchemy eventlet”
#【celery最简结构组成】
from celery import Celery
#【消息队列】rabbitMQ方式
mq_broker="amqp://user:james加密@124.隐藏.184:5672/"
#【消息队列】rabbitMQ方式
redis_broker="redis://:james加密@42.隐藏.45:6379/5"
#【结果存储】mysql方式
mysql_backend="db+mysql+pymysql://admin:James加密@124.隐藏.77.184:3306/mock"
#【结果存储】redis方式
redis_backend="redis://:james加密@42.隐藏.53.45:6379/6"
#【step1】创建celery实例对象
celeryApp=Celery("tasks",broker=mq_broker,backend=mysql_backend)
# 【step2】创建“异步任务”[words的反向输出]
# [name="reverseWords",非必填【作用:别名】
@celeryApp.task(name="reverseWords")
def resWords(words):
print("开始执行words的反向输出")
return words[::-1]
#【windows】在tasks.py所在的目录执行---> celery -A task workers -l info -P eventlet
分类 | 方式 | 语法 | 示例 | 说明 |
---|---|---|---|---|
创建Celery示例 | celeryApp=Celery(‘taskName’,broker,backend) | celeryApp=Celery(“tasks”,broker=mq_broker,backend=mysql_backend) | 必填 broker,backend | |
broker 【消息队列】 |
rabbitMQ | mq_broker=”amqp://用户名:密码@ip:port/“ | mq_broker=”amqp://user:james加密@124.隐藏.184:5672/“ | 配置broker【消息队列】 |
Rdies | redis_broker=”redis://:密码@ip:port/redis分区“ | redis_broker=”redis://:james加密@42.隐藏.45:6379/5” | ||
backend 【任务结果存储】 |
mysql | mysql_backendr=”db+mysql+pymysql://用户名:密码@ip:port/database“ | mysql_backendr=”db+mysql+pymysql://admin:James加密@124.隐藏.77.184:3306/mock” | 配置backend【任务结果存储】 |
Rdies | redis_backend=”redis://:密码@ip:port/redis分区“ | redis_backend=”redis://:james加密@42.隐藏.53.45:6379/6” | ||
异步任务 | @clelery实例装饰器 def 异步任务函数(): 函数体 |
# 【step2】创建“异步任务”[words的反向输出] @celeryApp.task(name=“reverseWords”) def resWords(words): print(“开始执行words的反向输出”) return words[::-1] |
注意 必须使用clelery实例装饰器 | |
定时任务 | @clelery实例装饰器 def 定时任务函数(): 函数体 |
暂无,后续补充 | ||
启动celery | windows | celery -A task workers -l info** -P eventlet** | 注释【windows】在tasks.py所在的目录执行 celery -A task workers -l info -P eventlet |
注意 windows必须安装使用eventlet【即加参数 -P eventlet】 |
linux/maxOS | celery -A task workers -l info** ** | 注释【_linux/maxOS】在tasks.py所在的目录执行 celery -A task workers -l info _ |
不需要eventlet |
# -*- coding: utf-8 -*-
"""
====================================
@File Name :tasks.py
@Time : 2022/10/30 22:50
@Program IDE :PyCharm
@Create by Author : 一一Cooling
====================================
@描述
【celery最简结构组成】
====================================
"""
from celery import Celery
#【消息队列】rabbitMQ方式
mq_broker="amqp://user:隐藏@124.隐藏.77.184:5672/"
#【消息队列】rabbitMQ方式
redis_broker="redis://:隐藏@42.隐藏.53.45:6379/5"
#【结果存储】mysql方式
mysql_backend="db+mysql+pymysql://admin:隐藏@124.隐藏.77.184:3306/mock"
#【结果存储】redis方式
redis_backend="redis://:隐藏@42.隐藏.53.45:6379/6"
#【step1】创建celery实例对象
celeryApp=Celery("tasks",broker=mq_broker,backend=mysql_backend)
# 【step2】使用clelery实例装饰器--》创建“异步任务”[words的反向输出]
@celeryApp.task(name="reverseWords")
def resWords(words):
print("开始执行words的反向输出")
return words[::-1]
#【windows】在tasks.py所在的目录执行---> celery -A task workers -l info -P eventlet
4.2 启动celery服务
分类 | 方式 | 语法 | 示例 | 说明 |
---|---|---|---|---|
启动celery | windows | celery -A task workers -l info** -P eventlet** | 注释【windows】在tasks.py所在的目录执行 celery -A task workers -l info -P eventlet |
注意 windows必须安装使用eventlet【即加参数 -P eventlet】 |
linux/maxOS | celery -A task workers -l info** ** | 注释【_linux/maxOS】在tasks.py所在的目录执行 celery -A task workers -l info _ |
不需要eventlet |
# -*- coding: utf-8 -*-
"""
====================================
@File Name :runTasks.py
@Time : 2022/10/30 23:12
@Program IDE :PyCharm
@Create by Author : 一一Cooling
====================================
@描述【启动celery服务】
====================================
"""
import os,sys
# 判断当前的OS信息
def getOS():
myOS = sys.platform
# print(myOS)
return myOS
# 启动--celery-task
def runTasks():
myOS=getOS()
win_shell="celery -A tasks worker -l info -P eventlet"
linux_shell = "celery -A tasks worker -l info"
# print(sy)
if (myOS == "win32"):
print("[windows]开始celery启动")
os.system(win_shell)
else:
print("[maxOS/Linux]开始celery启动")
os.system(linux_shell)
if __name__ == '__main__':
runTasks()
4.3 【消费者】worker执行task
在celery中,对“task函数(即本文4.1中的 resWords(words))添加delay()”就等同于“task函数交给celery进行调用”。
# -*- coding: utf-8 -*-
"""
====================================
@File Name :callTask.py
@Time : 2022/10/30 23:10
@Program IDE :PyCharm
@Create by Author : 一一Cooling
====================================
@描述
====================================
"""
#导入celery生产者的“任务”
from tasks import resWords
#执行:异步任务【resWords()】
def call_resWords():
myWords = "hello celery-windows"
# 异步任务函数【resWords】使用delay()--->转化为celery可以调用
result = resWords.delay(myWords)
# 输出:task结果id
print("[taskID]:\t", result.id)
if __name__ == '__main__':
call_resWords()
4.3.1 任务结果—常见属性
前提 res=taskFunction.delay()
属性 | 说明 | 示例 |
---|---|---|
id | 获取task的id | resID=res.id |
ready() | 检测是否已经处理完毕 返回值 True、False |
res_ready=res.ready() |
get() | res_get=resget() |
4.4 执行结果
结果 异步任务执行成功,且任务结果成功写入mysql