基本使用
# 第一步:定义一个py文件(名字随意,我们叫celery_tase)
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__,broker=broker,backend=backend)
# 被它修饰,就变成了celery的任务
@app.task
def add(a,b):
return a+b
#第二步:提交任务(新建一个py文件:submit_task)
from celery_task import add
# 异步调用
# 只是把任务提交到了redis中,但是没有执行,返回一个唯一标识,后期使用唯一标识去看任务执行结果
res=add.delay(33,41)
print(res)
#第三步:启动worker
#celery_task py文件的名字
#-l info日志输出级别是info
# -P eventlet 在win平台需要加上
celery -A celery_task worker -l info -P eventlet
#如果队列里有任务,就会执行,如果没有任务,worker就等在这
# 第四步:查询结果是否执行完成
from celery_task import app
from celery.result import AsyncResult
id = 'ed85b97a-a231-4d71-ba11-e5f6450d0b47'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
result = a.get()
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
8.2 包管理结构
1 包结构
celery_task
__init__.py
celery.py
course_task.py
home_task.py
user_task.py
2 celery.py
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery(__name__, broker=broker, backend=backend,
include=['celery_task.course_task', 'celery_task.user_task', 'celery_task.home_task'])
3 home_task.py
from .celery import app
@app.task
def add(a,b):
import time
time.sleep(10)
return a+b
4 在任意项目中使用
-导入,之间使用,例如
def test(request):
# 提交一个计算 90+80的任务
from celery_task.home_task import add
res=add.delay(90,80)
return HttpResponse('任务已经提,任务id为:%s'%str(res))