基本使用

  1. # 第一步:定义一个py文件(名字随意,我们叫celery_tase)
  2. from celery import Celery
  3. backend = 'redis://127.0.0.1:6379/1'
  4. broker = 'redis://127.0.0.1:6379/2'
  5. app = Celery(__name__,broker=broker,backend=backend)
  6. # 被它修饰,就变成了celery的任务
  7. @app.task
  8. def add(a,b):
  9. return a+b
  10. #第二步:提交任务(新建一个py文件:submit_task)
  11. from celery_task import add
  12. # 异步调用
  13. # 只是把任务提交到了redis中,但是没有执行,返回一个唯一标识,后期使用唯一标识去看任务执行结果
  14. res=add.delay(33,41)
  15. print(res)
  16. #第三步:启动worker
  17. #celery_task py文件的名字
  18. #-l info日志输出级别是info
  19. # -P eventlet 在win平台需要加上
  20. celery -A celery_task worker -l info -P eventlet
  21. #如果队列里有任务,就会执行,如果没有任务,worker就等在这
  22. # 第四步:查询结果是否执行完成
  23. from celery_task import app
  24. from celery.result import AsyncResult
  25. id = 'ed85b97a-a231-4d71-ba11-e5f6450d0b47'
  26. if __name__ == '__main__':
  27. a = AsyncResult(id=id, app=app)
  28. if a.successful():
  29. result = a.get()
  30. print(result)
  31. elif a.failed():
  32. print('任务失败')
  33. elif a.status == 'PENDING':
  34. print('任务等待中被执行')
  35. elif a.status == 'RETRY':
  36. print('任务异常后正在重试')
  37. elif a.status == 'STARTED':
  38. print('任务已经开始被执行')

8.2 包管理结构

  1. 1 包结构
  2. celery_task
  3. __init__.py
  4. celery.py
  5. course_task.py
  6. home_task.py
  7. user_task.py
  8. 2 celery.py
  9. from celery import Celery
  10. backend = 'redis://127.0.0.1:6379/1'
  11. broker = 'redis://127.0.0.1:6379/2'
  12. app = Celery(__name__, broker=broker, backend=backend,
  13. include=['celery_task.course_task', 'celery_task.user_task', 'celery_task.home_task'])
  14. 3 home_task.py
  15. from .celery import app
  16. @app.task
  17. def add(a,b):
  18. import time
  19. time.sleep(10)
  20. return a+b
  21. 4 在任意项目中使用
  22. -导入,之间使用,例如
  23. def test(request):
  24. # 提交一个计算 90+80的任务
  25. from celery_task.home_task import add
  26. res=add.delay(90,80)
  27. return HttpResponse('任务已经提,任务id为:%s'%str(res))