注意点:
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
所以在学习celery的时候不推荐在Windows下
图解:
使用场景
异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务,比如每天数据统计
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' #任务队列
backend = 'redis://127.0.0.1:6379/2' #结构存储
app = Celery(backend=backend,broker=broker)
启动:
celery5.0启动命令和之前不一样
celery --app=celery_test worker -l INFO
Windows下该命令无法启动请安装eventlet
再执行
celery --app=celery_test worker -l INFO -P eventlet
celery_test是自己写的py文件的名字
装饰任务
from .celery import app
import time
@app.task
def add(n, m):
print(n)
print(m)
time.sleep(10)
print('n+m的结果:%s' % (n + m))
return n + m
@app.task
def low(n, m):
print(n)
print(m)
print('n-m的结果:%s' % (n - m))
return n - m
添加任务到broker
from celery_task import tasks
# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)
# 添加延迟任务
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
tasks.low.apply_async(args=(200, 50), eta=eta)
从backend拿数据
from celery_task.celery import app
from celery.result import AsyncResult
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
上面的基本使用,实际工作中基本不会用到,就当看着玩儿吧
多任务结构
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
三种任务
from celery_task import task1,task2
#异步任务,立即执行
task1.add.delay(1,2)
task2.mul.delay(3,4)
#延时任务
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(seconds=10)
task1.add.apply_async(args=(1,2),eta=eta)
task2.mul.apply_async(args=(3,4),eta=eta)
#定时任务
#在celery.py中配置定时任务
#启动命令 celery --app=celery_task beat -l info
celery.py
#
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' #任务队列
backend = 'redis://127.0.0.1:6379/2' #结构存储
app = Celery(main='celery_test',backend=backend,broker=broker,include=['celery_task.task1','celery_task.task2'])
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-task': {
'task': 'celery_task.task1.add',
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (2, 2),
}
}