注意点:
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
所以在学习celery的时候不推荐在Windows下
图解:

使用场景
异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务,比如每天数据统计
from celery import Celerybroker = 'redis://127.0.0.1:6379/1' #任务队列backend = 'redis://127.0.0.1:6379/2' #结构存储app = Celery(backend=backend,broker=broker)
启动:
celery5.0启动命令和之前不一样celery --app=celery_test worker -l INFOWindows下该命令无法启动请安装eventlet再执行celery --app=celery_test worker -l INFO -P eventletcelery_test是自己写的py文件的名字
装饰任务
from .celery import appimport time@app.taskdef add(n, m):print(n)print(m)time.sleep(10)print('n+m的结果:%s' % (n + m))return n + m@app.taskdef low(n, m):print(n)print(m)print('n-m的结果:%s' % (n - m))return n - m
添加任务到broker
from celery_task import tasks# 添加立即执行任务t1 = tasks.add.delay(10, 20)t2 = tasks.low.delay(100, 50)print(t1.id)# 添加延迟任务from datetime import datetime, timedeltaeta=datetime.utcnow() + timedelta(seconds=10)tasks.low.apply_async(args=(200, 50), eta=eta)
从backend拿数据
from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'if __name__ == '__main__':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')
上面的基本使用,实际工作中基本不会用到,就当看着玩儿吧
多任务结构
project├── celery_task # celery包│ ├── __init__.py # 包文件│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py│ └── tasks.py # 所有任务函数├── add_task.py # 添加任务└── get_result.py # 获取结果
三种任务
from celery_task import task1,task2#异步任务,立即执行task1.add.delay(1,2)task2.mul.delay(3,4)#延时任务from datetime import datetime, timedeltaeta=datetime.utcnow() + timedelta(seconds=10)task1.add.apply_async(args=(1,2),eta=eta)task2.mul.apply_async(args=(3,4),eta=eta)#定时任务#在celery.py中配置定时任务#启动命令 celery --app=celery_task beat -l info
celery.py
#from celery import Celerybroker = 'redis://127.0.0.1:6379/1' #任务队列backend = 'redis://127.0.0.1:6379/2' #结构存储app = Celery(main='celery_test',backend=backend,broker=broker,include=['celery_task.task1','celery_task.task2'])# 时区app.conf.timezone = 'Asia/Shanghai'# 是否使用UTCapp.conf.enable_utc = False# 任务的定时配置from datetime import timedeltafrom celery.schedules import crontabapp.conf.beat_schedule = {'add-task': {'task': 'celery_task.task1.add','schedule': timedelta(seconds=3),# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点'args': (2, 2),}}
