前提
本文写于2021年1月17日
需要安装的依赖有:
redis,python3.8
pip list:
Django 3.1.x
django-celery-beat 2.1.0
django-celery-results 2.0.0
—————————————-
不需要安装django-celery 不需要不需要不需要!!!
init.py
from __future__ import absolute_import, unicode_literalsfrom .celery_ import app as celery_app__all__ = ['celery_app']
celery_.py
from __future__ import absolute_import, unicode_literalsimport osfrom celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DjangoCelery.settings') # 设置django环境app = Celery('DjangoCelery')app.config_from_object('django.conf:settings', namespace='CELERY') # 使用CELERY_ 作为前缀,在settings中写配置app.autodiscover_tasks() # 发现任务文件每个app下的task.py@app.task(bind=True)def debug_task(self):print('Request: {0!r}'.format(self.request))
tasks.py
users/tasks.py
from __future__ import absolute_import, unicode_literalsfrom celery import shared_taskimport datetime@shared_taskdef add(x, y):res = x + ytime_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')print('当前时间为:' + time_format + ' ,两个数相加的结果为:')print(res)return res@shared_taskdef mul(x, y):res = x * ytime_format = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')print('当前时间为:' + time_format + ' ,两个数相乘的结果为:')print(res)return res
settings.py
from celery.schedules import crontabfrom datetime import timedeltaCELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/0' # 设置redis为消息队列CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}CELERY_RESULT_BACKEND = 'django-db://root:Admin123!@81.69.46.120:3306/HiMoProject' # BACKEND配置,这里使用MySQlCELERY_TASK_SERIALIZER = 'json'CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案CELERY_ENABLE_UTC = False # 如果配置了时间为TIME_ZONE = 'Asia/Shanghai'这里必须要设置falseDJANGO_CELERY_BEAT_TZ_AWARE = FalseCELERY_BEAT_SCHEDULE = {# 每十秒执行一次add方法'add-every-10-seconds': {'task': 'users.tasks.add',# 多长时间执行一次'schedule': 1.0, # 支持直接用数字表示秒数# 'schedule': timedelta(seconds=10), # 可以用timedelta对象# 必要的参数,这里指add()的参数'args': (16, 26)},# 每个周一的20:57分执行一次mul方法'add-every-monday-morning': {'task': 'users.tasks.mul','schedule': crontab(hour=20, minute=57, day_of_week=1),'args': (16, 16),},}
配置完了之后千万不要忘记执行python3 manage.py migrate
测试
在命令行内输入以下两段代码(需要开启两个终端)celery -A DjangoCelery worker -l debug debug为输出的日志级别。一般设置为info,这里是开启workercelery -A DjangoCelery beat -l debug 这里是开启定时任务,django settings 里设置的定时任务(CELERY_BEAT_SCHEDULE)
终端2
包含beat语句
终端1

Admin设置定时任务
进入django自带的admin
具体怎么配置就不写了,翻译成中文就都看得懂了
ORM
from django_celery_results.backends.database import TaskResult# 以下是官方源码class TaskResult(models.Model):"""Task result/status."""task_id = models.CharField(max_length=getattr(settings,'DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH',255),unique=True, db_index=True,verbose_name=_('Task ID'),help_text=_('Celery ID for the Task that was run'))task_name = models.CharField(null=True, max_length=255, db_index=True,verbose_name=_('Task Name'),help_text=_('Name of the Task which was run'))task_args = models.TextField(null=True,verbose_name=_('Task Positional Arguments'),help_text=_('JSON representation of the positional arguments ''used with the task'))task_kwargs = models.TextField(null=True,verbose_name=_('Task Named Arguments'),help_text=_('JSON representation of the named arguments ''used with the task'))status = models.CharField(max_length=50, default=states.PENDING, db_index=True,choices=TASK_STATE_CHOICES,verbose_name=_('Task State'),help_text=_('Current state of the task being run'))worker = models.CharField(max_length=100, db_index=True, default=None, null=True,verbose_name=_('Worker'), help_text=_('Worker that executes the task'))content_type = models.CharField(max_length=128,verbose_name=_('Result Content Type'),help_text=_('Content type of the result data'))content_encoding = models.CharField(max_length=64,verbose_name=_('Result Encoding'),help_text=_('The encoding used to save the task result data'))result = models.TextField(null=True, default=None, editable=False,verbose_name=_('Result Data'),help_text=_('The data returned by the task. ''Use content_encoding and content_type fields to read.'))date_created = models.DateTimeField(auto_now_add=True, db_index=True,verbose_name=_('Created DateTime'),help_text=_('Datetime field when the task result was created in UTC'))date_done = models.DateTimeField(auto_now=True, db_index=True,verbose_name=_('Completed DateTime'),help_text=_('Datetime field when the task was completed in UTC'))traceback = models.TextField(blank=True, null=True,verbose_name=_('Traceback'),help_text=_('Text of the traceback if the task generated one'))meta = models.TextField(null=True, default=None, editable=False,verbose_name=_('Task Meta Information'),help_text=_('JSON meta information about the task, ''such as information on child tasks'))objects = managers.TaskResultManager()class Meta:"""Table information."""ordering = ['-date_done']verbose_name = _('task result')verbose_name_plural = _('task results')def as_dict(self):return {'task_id': self.task_id,'task_name': self.task_name,'task_args': self.task_args,'task_kwargs': self.task_kwargs,'status': self.status,'result': self.result,'date_done': self.date_done,'traceback': self.traceback,'meta': self.meta,'worker': self.worker}def __str__(self):return '<Task: {0.task_id} ({0.status})>'.format(self)
orm中的序列化什么的我就不写了,记住如果改了模型要执行manage.py makemigtations 和 migrate
