Celery

https://www.cnblogs.com/pyedu/p/12461819.html
https://www.cnblogs.com/wdliu/p/9517535.html
https://www.cnblogs.com/wdliu/p/9530219.html

Celery 的作用

  • 异步任务
  • 定时任务

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

异步任务

多任务结构

目录结构

  1. ls celery_tasks
  2. celery_task.py
  3. task01.py
  4. task02.py
  5. produce_task.py
  6. check_result.py

celery_task.py

用来配置连接的redis、定义的任务

  1. from celery import Celery
  2. cel = Celery('celery_demo',
  3. broker='redis://api-bj.top:6358/1',
  4. backend='redis://api-bj.top:6358/2',
  5. # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
  6. include=['celery_tasks.task01',
  7. 'celery_tasks.task02',
  8. ])
  9. # 时区
  10. cel.conf.timezone = 'Asia/Shanghai'
  11. # 是否使用UTC
  12. cel.conf.enable_utc = False
  13. # 存储结果过期时间,过期后自动删除 单位为秒
  14. result_expires = 60 * 60 * 24

task01.py、task02.py

任务的详细配置

  1. ''' task01.py '''
  2. import time
  3. from celery_tasks.celery_task import cel
  4. @cel.task
  5. def send_email(res):
  6. print('完成向%s发送邮件任务' % res )
  7. time.sleep(5)
  8. return "完成向%s发送邮件任务" % res
  9. ''' task02.py '''
  10. import time
  11. from celery_tasks.celery_task import cel
  12. @cel.task
  13. def send_msg(res):
  14. print('完成向%s发送短信任务' % res )
  15. time.sleep(5)
  16. return "完成向%s发送短信任务" % res

启动task

eventlet 是一个三方包 pip安装一下 控制并发数量

  1. celery -A celery_tasks.celery_task worker -l info -P eventlet -c 5

produce_task.py

消费者 进行异步调用消费 处理结果会通过k/v类型 存储在redis中

  1. from celery_tasks.task01 import send_email
  2. from celery_tasks.task02 import send_msg
  3. res = send_email.delay('mail')
  4. # 返回的ID 方便后面基于id 进行结果查询
  5. print('mail Id:', res.id)
  6. res1 = send_msg.delay('msg')
  7. print('msg Id:', res1.id)

check_result.py

查看任务执行结果 原理应该就是去redis的库里查key的值

  1. from celery.result import AsyncResult
  2. from celery_tasks.celery_task import cel
  3. async_result = AsyncResult(id="b42381c8-f39c-44b7-bf85-198e215cfb6f", app=cel)
  4. if async_result.successful():
  5. result = async_result.get()
  6. print(async_result.status,result)
  7. # result.forget() # 将结果删除,执行完成,结果不会自动删除
  8. # async.revoke(terminate=True) # 无论现在是什么时候,都要终止
  9. # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
  10. elif async_result.failed():
  11. print('执行失败')
  12. elif async_result.status == 'PENDING':
  13. print('任务等待中被执行')
  14. elif async_result.status == 'RETRY':
  15. print('任务异常后正在重试')
  16. elif async_result.status == 'STARTED':
  17. print('任务已经开始被执行')

定时任务

  1. ls scheduler_task
  2. celery_task.py
  3. task01.py
  4. task02.py

task01.py、task02.py

  1. # task01.py
  2. import time
  3. from scheduler_task.celery_task import cel
  4. @cel.task
  5. def send_email(res):
  6. print('完成向%s发送邮件任务' % res )
  7. time.sleep(5)
  8. return "完成向%s发送邮件任务" % res
  9. # task02.py
  10. import time
  11. from scheduler_task.celery_task import cel
  12. @cel.task
  13. def send_msg(res):
  14. print('完成向%s发送短信任务' % res )
  15. time.sleep(5)
  16. return "完成向%s发送短信任务" % res

celery_task.py

  1. from celery import Celery
  2. from datetime import timedelta
  3. from celery.schedules import crontab
  4. cel = Celery('celery_demo',
  5. broker='redis://api-bj.top:6358/1',
  6. backend='redis://api-bj.top:6358/2',
  7. # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
  8. include=['scheduler_task.task01',
  9. 'scheduler_task.task02',
  10. ])
  11. # 时区
  12. cel.conf.timezone = 'Asia/Shanghai'
  13. # 是否使用UTC
  14. cel.conf.enable_utc = False
  15. cel.conf.beat_schedule = {
  16. # 名字随意命名
  17. 'add-every-10-seconds': {
  18. # 执行tasks1下的test_celery函数
  19. 'task': 'scheduler_task.task01.send_email',
  20. # 每隔2秒执行一次
  21. # 'schedule': 2.0,
  22. # 每一分钟执行一次
  23. # 'schedule': crontab(minute="*/1"),
  24. # 每隔6秒钟
  25. 'schedule': timedelta(seconds=6),
  26. # 传递参数
  27. 'args': ('张三',)
  28. },
  29. # 'add-every-12-seconds': {
  30. # 'task': 'celery_tasks.task01.send_email',
  31. # 每年4月11号,8点42分执行
  32. # 'schedule': crontab(minut、;e=42, hour=8, day_of_month=11, month_of_year=4),
  33. # 'args': ('张三',)
  34. # },
  35. }

启动定时任务

  1. # 首先启动work (启动后会连接redis 去broker里去找对应的 list celery 消费里面的数据)
  2. celery -A scheduler_task.celery_task worker -l info
  3. # 监听定时任务 会按照任务的格式 定时把任务放到 broker 的 celery list中 给work消费
  4. celery -A scheduler_task.celery_task beat

Django 定时任务

首先创建一个可以访问的视图 略

在Django项目下创建一个目录 mycelery

  1. tree mycelery
  2. mycelery
  3. ├── config.py # redis信息配置文件
  4. ├── main.py # 主程序 对django的文件进行加载
  5. └── sms # 存放任务的目录
  6. └── tasks.py # 名称固定的

sms/tasks.py

  1. # celery的任务必须写在tasks.py的文件中,别的文件名称不识别!!!
  2. from mycelery.main import app
  3. import time
  4. import logging
  5. log = logging.getLogger("django")
  6. @app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
  7. def send_sms(mobile):
  8. """发送短信"""
  9. print("向手机号%s发送短信成功!"%mobile)
  10. time.sleep(5)
  11. return "send_sms OK"
  12. @app.task # name表示设置任务的名称,如果不填写,则默认使用函数名做为任务名
  13. def send_sms2(mobile):
  14. print("向手机号%s发送短信成功!" % mobile)
  15. time.sleep(5)
  16. return "send_sms2 OK"

config.py

  1. broker_url = 'redis://api-bj.top:6358/15'
  2. result_backend = 'redis://api-bj.top:6358/14'

main.py

  1. # 主程序
  2. import os
  3. from celery import Celery
  4. # 创建celery实例对象
  5. app = Celery("mycelery")
  6. # 把celery和django进行组合,识别和加载django的配置文件
  7. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celeryPros.settings.dev')
  8. # 通过app对象加载配置
  9. app.config_from_object("mycelery.config")
  10. # 加载任务
  11. # 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
  12. # app.autodiscover_tasks(["任务1","任务2"])
  13. # app.autodiscover_tasks(["mycelery.sms","mycelery.email"]) # 例子
  14. app.autodiscover_tasks(["mycelery.sms",])
  15. # 启动Celery的命令
  16. # 强烈建议切换目录到mycelery根目录下启动
  17. # celery -A mycelery.main worker --loglevel=info

启动celery

  1. celery -A mycelery.main worker --loglevel=info

Django views视图调用

访问视图url 触发celery;

  1. from django.shortcuts import render,HttpResponse
  2. from mycelery.sms.tasks import send_sms,send_sms2
  3. from datetime import datetime,timedelta
  4. # Create your views here.
  5. def test(resqust):
  6. # 异步调用
  7. # send_sms.delay('110')
  8. # send_sms2.delay('120')
  9. # 延时任务
  10. ctime = datetime.now()
  11. # 默认用utc时间
  12. utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
  13. time_delay = timedelta(seconds=10)
  14. task_time = utc_ctime + time_delay
  15. result = send_sms.apply_async(["911", ], eta=task_time)
  16. print(result.id)
  17. return HttpResponse('OK')

Django 定时任务

在异步任务的基础上 添加以下内容

  1. app.conf.update(
  2. CELERYBEAT_SCHEDULE={
  3. 'sum-task': {
  4. 'task': 'mycelery.sms.tasks.send_sms',
  5. 'schedule': timedelta(seconds=6),
  6. 'args': ('张三',)
  7. },
  8. 'sum-task1': {
  9. 'task': 'mycelery.sms.tasks.send_sms2',
  10. 'schedule': timedelta(seconds=6),
  11. 'args': ('李四',)
  12. },
  13. }
  14. )

启动

  1. # 启动work
  2. celery -A mycelery.main worker --loglevel=info
  3. # 启动定时任务
  4. celery -A mycelery.main beat